In [None]:
from modules.engine import *
from modules.log_reader import read_qsym_log
import logging
import pandas as pd
from dataclasses import asdict
from tqdm import tqdm

In [None]:
logging.basicConfig(level=logging.INFO)

In [None]:
queries = read_qsym_log("./quicksort_medium_export.log")
print("Largest query length:", max(len(q.constraints) for q in queries))
len(queries)

In [None]:
perform_assumption_assertion = False
if perform_assumption_assertion:
    for i in tqdm(range(len(queries))):
        for j in range(i + 1, len(queries)):
            first = queries[i]
            second = queries[j]
            if first.dependencies != second.dependencies:
                continue

            shorter = first if len(first.path) < len(second.path) else second
            longer = first if shorter is second else second
            if shorter.path != longer.path[:len(shorter.path)]:
                continue

            assert shorter.constraints == longer.constraints[:len(shorter.constraints)], f"{shorter}, {longer}"
    
    print("Assumption of same prefices generate same constraints holds.")

### Strategies
Solver selection strategy  is the component that determines how solvers are used or generated for queries.

#### Basic
Generates a new solver for each dependency set and doesn't upgrade it. So it is always an empty solver.

##### Rationale
Works as a base case (with the benefit of dependency sets).

#### Negation
Generates a new solver for each branch negation. By negation we mean the negation of the last constraint that symbolic/concolic execution engines check to make a diverging test case.

##### Rationale
The longer queries of the current program execution, as well as those generated by the new diverging one with share a common prefix.

#### Exact Path
Almost for each query, generates a new solver. 

##### Rationale
Only useful when there are lots of exactly repeated queries. Also, can be used to measure the repeated query count and the sanity of concolic executions.


In [None]:
class NegationSolverSelectionStrategy(SolverSelectionStrategy):
    def __init__(self, use_copy=False, log_level=logging.DEBUG) -> None:
        super().__init__(log_level=log_level)
        self.use_copy = use_copy
        self.exact_strategy = ExactPathSolverSelectionStrategy(use_copy, log_level)

    def get_solver(self, found_solver: MySolver | None, query: Query) -> MySolver:
        self._log("Found solver for query with id = %d: %s", query.id, found_solver)

        if query.path[-1][1] == BranchAction.OPTIMISTIC:
            return self.exact_strategy.get_solver(found_solver, query)

        if found_solver is None:
            self._log("No solver available, Creating a new one")
            solver = self.create_empty_solver()
            solver.upgrade(query.path[:-1], query.constraints[:-1])
            self._log(solver)
            return solver

        solver = found_solver
        if found_solver.stack_path_len < len(query.path) - 1:
            self._log("Creating a new solver for a longer path")
            solver = self.create_empty_solver()
            if self.use_copy:
                solver.copy_from(found_solver)
                solver.upgrade(query.path[solver.stack_path_len:-1],
                               query.constraints[solver.constraint_count:-1])
            else:
                solver.upgrade(query.path[:-1], query.constraints[:-1])
        else:
            self._log("Solver is appropriate")
        
        self._log(solver)
        return solver

    def __setattr__(self, __name: str, __value):
        if __name == "create_empty_solver":
            self.exact_strategy.__setattr__(__name, __value)
        return super().__setattr__(__name, __value)

class ExactPathSolverSelectionStrategy(SolverSelectionStrategy):
    def __init__(self, use_copy=False, log_level=logging.DEBUG) -> None:
        super().__init__(log_level=log_level)
        self.use_copy = use_copy

    def get_solver(self, found_solver: MySolver | None, query: Query) -> MySolver:
        if found_solver is None:
            self._log("No solver available, Creating a new one")
            solver = self.create_empty_solver()
            solver.upgrade(query.path, query.constraints)
            return solver

        solver = found_solver
        if found_solver.stack_path_len < len(query.path):
            self._log("Creating a new solver for a longer path")
            solver = self.create_empty_solver()
            if self.use_copy:
                solver.copy_from(found_solver)
                solver.upgrade(query.path[solver.stack_path_len:],
                               query.constraints[solver.constraint_count:])
            else:
                solver.upgrade(query.path, query.constraints)
        else:
            self._log("Solver is appropriate")
        
        return solver


In [None]:
# Strategies correctness
perform_strategy_assertions = False

logging.getLogger().setLevel(level=logging.INFO)

if perform_strategy_assertions:
    target_queries = queries[:300]
    pool = SolverPool()
    pool.selection_strategy = BasicSolverSelectionStrategy()
    base_results = tuple(pool.solve(q) for q in tqdm(target_queries))

    strategies = [
        NegationSolverSelectionStrategy(False),
        NegationSolverSelectionStrategy(True),
        ExactPathSolverSelectionStrategy(),
    ]
    for strategy in strategies:
        name = type(strategy).__name__
        logging.info("Asserting %s", name)
        pool = SolverPool()
        pool.selection_strategy = strategy
        pool.enable_solve_prefix_assertions()
        for i, q in enumerate(tqdm(target_queries)):
            result = pool.solve(q)
            assert result == base_results[i], f"Different result found for the {i}th query, in {name} strategy.\nDetails:\nQuery: {q.to_json_serializable()},\nStrategy Result: {result}, Base Result: {base_results[i]}"


In [None]:
pool = SolverPool(max_solvers=1000)
# pool.selection_strategy = BasicSolverSelectionStrategy()
pool.selection_strategy = NegationSolverSelectionStrategy()
# pool.selection_strategy = ExactPathSolverSelectionStrategy()

logging.getLogger().setLevel(level=logging.WARN)

for q in queries[:1000]:
    pool.solve(q)

logging.getLogger().setLevel(level=logging.INFO)

In [None]:
def expand_times_column(df: pd.DataFrame) -> pd.DataFrame:
    return df.drop(columns=["times"], inplace=False).join(
        df.apply(lambda x: pd.Series(x["times"].values(), index=x["times"].keys()), axis=1))

def describe(df: pd.DataFrame) -> pd.DataFrame:
    return pd.concat([df.describe(), pd.Series(df.sum()).to_frame("sum").transpose()])

In [None]:
print("Solver tree count:", len(pool._solver_trees))
print("Total alive solver count:", len(pool._solvers.items))
print("Most recently used solver:", list(pool._solvers.items.items())[-1][1]._solver)
print("Solver cache statistics:", pool._solvers.statistics)
solver_data = pd.DataFrame([asdict(stats) for stats in pool.statistics.solvers.values()])
solver_data = expand_times_column(solver_data)
describe(solver_data)
pd.concat([describe(solver_data), describe(solver_data)]).groupby(level=0).mean()

In [None]:
tree_data = pd.DataFrame([asdict(stats) for stats in pool.statistics.trees.values()])
tree_data = expand_times_column(tree_data)
describe(tree_data)

In [None]:
def evaluate(queries, strategy, max_solvers=200, repeat_count=5):
    with tqdm(total=repeat_count * len(queries)) as pb:
        datas = list()
        for _ in range(repeat_count):
            pool = SolverPool(max_solvers=max_solvers)
            pool.selection_strategy = strategy
            for q in queries:
                pool.solve(q)
                pb.update()

            solvers_counts = pd.DataFrame(
                [stats.times for stats in pool.statistics.solvers.values()])

            trees_stats = pd.DataFrame(
                [stats.times for stats in pool.statistics.trees.values()])

            datas.append((describe(solvers_counts), describe(trees_stats)))

    def average(datas):
        return pd.concat(datas).groupby(level=0).mean().transpose().drop(columns=["count"], inplace=False)

    solvers_times = average([data[0] for data in datas])
    trees_times = average([data[1] for data in datas])

    # Counting statistics is same for each run.
    cache_stats = pool.statistics.cache
    solvers_counts = pd.DataFrame([asdict(stats) for stats in pool.statistics.solvers.values()])\
        .drop(columns=["times"], inplace=False)
    solvers_counts = describe(solvers_counts)

    del pool
    return {
        "cache": cache_stats,
        "solvers_counts": solvers_counts,
        "solvers_times": solvers_times,
        "trees_times": trees_times,
    }


def evaluate_and_save(name, queries, strategy, max_solvers=200, repeat_count=5):
    result = evaluate(queries, strategy, max_solvers, repeat_count)
    to_write = {
        "cache": asdict(result["cache"]),
        "solvers_counts": result["solvers_counts"].to_dict(),
        "solvers_times": result["solvers_times"].to_dict(),
        "trees_times": result["trees_times"].to_dict(),
    }

    from pathlib import Path
    Path(f"./evaluations").mkdir(exist_ok=True)
    with open(f"./evaluations/{name}.json", "w") as f:
        import json
        json.dump(to_write, f)


In [None]:
evaluate_and_save("negnocopy__quicksort_medium__400", queries[:100], NegationSolverSelectionStrategy(), max_solvers=400, repeat_count=3)