In [None]:
import os
import sys

ROOT_PATH = os.path.dirname(os.path.dirname(os.getcwd()))
BTCNN_PATH = os.path.join(ROOT_PATH, "btcnn/src/btcnn")
HBO_BENCH_PATH = os.path.join(ROOT_PATH, "hbo_bench/src/hbo_bench")

sys.path.insert(0, ROOT_PATH)

EXPERIMENT_PATH = os.getcwd()
ARTIFACTS_PATH = os.path.join(EXPERIMENT_PATH, "artifacts")

In [None]:
from collections import defaultdict
from json import load, dumps, dump
from tqdm import tqdm
import numpy as np
import pandas as pd

from hbo_bench.oracle import Oracle, OracleRequest, TIMEOUT
from hbo_bench.data_config import HINTSETS, DOPS, HINTS, DEFAULT_HINTSET, DEFAULT_DOP
from hbo_bench.utils import get_logical_tree, get_full_plan

In [None]:
cached_oracles = {
    "JOB": Oracle(f"{HBO_BENCH_PATH}/data/processed/JOB"),
    "TPCH": Oracle(f"{HBO_BENCH_PATH}/data/processed/tpch_10gb"),
    "SQ": Oracle(f"{HBO_BENCH_PATH}/data/processed/sample_queries"),
}

# Local Search

Given the vastness of the search space, even with boolean hints ($2^{\# \text{hintsets}}$), and our desire to manage the degree of parallelism (`DOP` value), we are compelled to find ways to reduce the search space. 

Initially, a **greedy algorithm** was considered, which sequentially disables hintsets. However, it was observed in practice that disabling a single operation is sometimes insufficient to achieve the desired result. This led to the decision to implement additional actions such as "disable `BMS` & `IS` & `NL`," and parameterisze action space itself for allowing balancing between exploration speed and performance. It made the search algorithm looks like a local search algorithm.

Since it is not predetermined which actions are most promising, we **parameterize** the general algorithm scheme and **empirically determine the most suitable combinations of actions for each scenario**. It is evident that the more actions we add, the more extensively we will explore, thereby potentially finding better solutions, but at the cost of increased search expenses.

Given that a) often very good solutions exist near the initial state, b) the number of unique plans is relatively small, and c) the quality of `SearchingState` is determined solely by the plan, we propose the following techniques to reduce training time:
- limit the number of iterations in local search.
- pre-plan neighbors and avoid executing the same plans repeatedly.
- use timeouts when exploring neighbors (there is no point in waiting for a request to complete if it will take longer than the best known solution).
- implement aggressive timeout
- use only subset of moves (i.e., consider only a specific part of the neighborhood, for example, limit to just turning off `INL` or decreasing `DOP`)

In [None]:
OFF_INL_HINT = 64 | 8 | 2
N_SCANS = 4
N_JOINS = 3
assert N_SCANS + N_JOINS == len(HINTS)

In [None]:
from query_explorer import QueryExplorer, SearchingSettings, SearchingState

In [None]:
from collections import namedtuple

# we added 2 additional parameters to show that we need them
ExtendedSearchingSettings = namedtuple(
    "ExtendedSearchingSettings",
    SearchingSettings._fields + ("avoid_duplicates", "prioritize_neighbors"),
    defaults=tuple(SearchingSettings._field_defaults.values()) + (False, False)
)

In [None]:
settings_pool = []
for disable_scans in [False, True]:
    for disable_joins in [False, True]:
        for decrease_dop in [False, True]:
            for disable_inl in [False, True]:
                for relative_boost_threshold in [1.0, 1.1, 1.2, 1.5, 2.0]:   
                    for max_iter in [1, 2, float("inf")]:
                        for avoid_duplicates in [False, True]:
                            for use_joined_search in [False, True]:
                                for prioritize_neighbors in [False, True]:
                                    settings = ExtendedSearchingSettings(
                                        disable_scans=disable_scans,
                                        disable_joins=disable_joins,
                                        decrease_dop=decrease_dop, 
                                        disable_inl=disable_inl, 
                                        relative_boost_threshold=relative_boost_threshold,
                                        max_iter=max_iter,
                                        avoid_duplicates=avoid_duplicates,
                                        use_joined_search=use_joined_search,
                                        prioritize_neighbors=prioritize_neighbors,
                                    )
                                    settings_pool.append(settings)

In [None]:
def get_full_plan(plan: "ExplainPlan") -> "str":
    res = []
    def recurse(node: "ExplainNode") -> "None":
        res.append(
            f"{node.node_type} (Rel={node.relation_name}|Index={node.index_name}|Cards={node.estimated_cardinality})"
        )
        res.append("[")
        for child in node.plans:
            recurse(child)
        res.append("]")
    recurse(node=plan.plan)
    return " ".join(res)

In [None]:
class SequentialQueryExplorer(QueryExplorer):
    def run(self) -> "SearchingState":

        self.sequential_planning_time = 0
        self.sequential_e2e_time = 0
        self.seen_plans = set()

        def_state = SearchingState(self.settings.default_hintset, self.settings.default_dop)
        prev_state, record_state, record_time = None, def_state, float("inf")
        it = 0
        while it < self.settings.max_iter and prev_state != record_state:
            timeout, prev_state = record_time / self.settings.relative_boost_threshold, record_state
            neighbors = list(filter(lambda st: st not in self.tried_states, self.get_neighbors(state=record_state)))
            if not neighbors:
                break  # pragma: no cover
            best_ngb_time, best_ngb = self.explore_sequentially(neighbors, timeout)
            if best_ngb_time < timeout:
                record_state, record_time = best_ngb, best_ngb_time
            it += 1
        assert self.get_e2e_time(record_state) <= self.get_e2e_time(def_state), (self.query_name, record_state)
        return record_state

    def explore_sequentially(self, neighbors: "List[SearchingState]", timeout: "Time") -> "Tuple[Time, SearchingState]":

        def_state = SearchingState(self.settings.default_hintset, self.settings.default_dop)
        record_time, record_state = float("inf"), neighbors[0]

        if self.settings.prioritize_neighbors:
            neighbors = sorted(neighbors, key=lambda st: self.get_e2e_time(st))

        for ngb_state in neighbors:
            self.tried_states.add(ngb_state)

            saved_timeout, timeout = timeout, TIMEOUT if ngb_state == def_state else timeout

            ngb_planning_time = min(self.get_planning_time(ngb_state), timeout)
            self.sequential_planning_time += ngb_planning_time
            if ngb_planning_time == timeout:
                self.sequential_e2e_time += ngb_planning_time
                continue
            ngb_plan = self._get_explain_plan(state=ngb_state)
            
            ngb_e2e_time = self.get_e2e_time(ngb_state)
            if self.settings.avoid_duplicates and get_full_plan(ngb_plan) in self.seen_plans:
                self.sequential_e2e_time += ngb_planning_time                                
            else:
                self.sequential_e2e_time += min(ngb_e2e_time, timeout)
            
            self.seen_plans.add(get_full_plan(ngb_plan))
              
            timeout = min(saved_timeout, ngb_e2e_time)

            if ngb_e2e_time < record_time:
                record_state, record_time = ngb_state, ngb_e2e_time
                if self.settings.prioritize_neighbors:
                    break             

        return record_time, record_state

## Step 1. Collecting performance for each settings

In [None]:
e2e_times = defaultdict(dict)
learning_times = defaultdict(dict)

for settings in tqdm(settings_pool):
    for bench_name, oracle in cached_oracles.items():
        e2e_time, learning_time = 0, 0
        for query_name in oracle.get_query_names():
            explorer = SequentialQueryExplorer(query_name=query_name, oracle=oracle, settings=settings)
            hintset, dop = explorer.run()
            learning_time += explorer.sequential_e2e_time
            request = OracleRequest(query_name=query_name, hintset=hintset, dop=dop)
            e2e_time += explorer.get_e2e_time(SearchingState(hintset, dop))
        e2e_times[bench_name][settings] = round(e2e_time, 3)
        learning_times[bench_name][settings] = round(learning_time, 3)

## Step 2. Collecting info for baseline and ideal case

In [None]:
best_e2e_times = {}
def_times = {}

all_states = [SearchingState(hintset, dop) for dop in DOPS for hintset in HINTSETS] # here

for bench_name, oracle in cached_oracles.items():
    best_e2e_time, def_time = 0, 0
    for query_name in oracle.get_query_names():
        explorer = SequentialQueryExplorer(query_name=query_name, oracle=oracle, settings=settings)
        best_hintset, best_dop = min(all_states, key=lambda st: explorer.get_e2e_time(st))
        best_e2e_time += explorer.get_e2e_time(SearchingState(best_hintset, best_dop))
        def_time +=  explorer.get_e2e_time(SearchingState(DEFAULT_HINTSET, DEFAULT_DOP))

    best_e2e_times[bench_name] = round(best_e2e_time, 3)
    def_times[bench_name] = round(def_time, 3)

## Step 3. Measuring the performance for default settings

In [None]:
def evaluate_settings(settings, bench_name):
    e2e_time = e2e_times[bench_name][settings]
    boost = def_times[bench_name] - e2e_times[bench_name][settings]
    max_boost = def_times[bench_name] - best_e2e_times[bench_name]
    boost_percentage = 100 * boost / max_boost
    learning_time = learning_times[bench_name][settings]
    
    return {
        "`disable_scans`": settings.disable_scans,
        "`disable_joins`": settings.disable_joins,
        "`decrease_dop`": settings.decrease_dop,
        "`disable_inl`": settings.disable_inl,
        "`use_joined_search`": settings.use_joined_search,
        "E2E Time (sec)": round(e2e_time, 1),
        "Boost (% of optimum)": round(boost_percentage, 1),
        "Learning Time (sec)": round(learning_time, 1)
    }

In [None]:
MAX_ITER = float("inf")

for bench_name in cached_oracles:
    settings_list = [
        ExtendedSearchingSettings(disable_scans=True, max_iter=MAX_ITER),
        ExtendedSearchingSettings(disable_joins=True, max_iter=MAX_ITER),
        ExtendedSearchingSettings(disable_scans=True, disable_joins=True, max_iter=MAX_ITER),
        ExtendedSearchingSettings(disable_inl=True, max_iter=MAX_ITER),
        ExtendedSearchingSettings(decrease_dop=True, max_iter=MAX_ITER),
        ExtendedSearchingSettings(disable_scans=True, decrease_dop=True, max_iter=MAX_ITER),
        ExtendedSearchingSettings(disable_joins=True, decrease_dop=True, max_iter=MAX_ITER),
        ExtendedSearchingSettings(disable_scans=True, disable_joins=True, decrease_dop=True, max_iter=MAX_ITER),
        ExtendedSearchingSettings(disable_inl=True, decrease_dop=True, max_iter=MAX_ITER),
        ExtendedSearchingSettings(disable_joins=True, disable_inl=True, decrease_dop=True, max_iter=MAX_ITER),
        ExtendedSearchingSettings(disable_joins=True, disable_scans=True, disable_inl=True, decrease_dop=True, max_iter=MAX_ITER),
        ExtendedSearchingSettings(disable_joins=True, disable_scans=True, use_joined_search=True, decrease_dop=True, max_iter=MAX_ITER),
        ExtendedSearchingSettings(disable_inl=True, use_joined_search=True, decrease_dop=True, max_iter=MAX_ITER),
        ExtendedSearchingSettings(disable_joins=True, use_joined_search=True, decrease_dop=True, max_iter=MAX_ITER),
        ExtendedSearchingSettings(disable_joins=True, disable_inl=True, use_joined_search=True, decrease_dop=True, max_iter=MAX_ITER),
        ExtendedSearchingSettings(disable_scans=True, disable_joins=True, disable_inl=True, decrease_dop=True, use_joined_search=True, max_iter=MAX_ITER),
    ]

    results = []
    for settings in settings_list:
        result = evaluate_settings(settings, bench_name)
        results.append(result)
    pd.DataFrame(results).to_csv(f"{ARTIFACTS_PATH}/{bench_name}_basic_settings.csv", index=False)

# *Which moves are the most important to get high boost*?

To determine which of these techniques are the most effective, we will introduce a *score* for the search parameters $x$ similar to the F-Score -- this metric balances between the proportion of the boost obtained from the maximum possible acceleration and the proportion of saved training time:


$$\text{score}_{\beta}(\text{x}) = F_{\beta}(\text{boost\_coeff}(\text{x}), \text{learning\_coeff}(\text{x}))$$

где $\text{boost\_coeff}(\text{x}) = \frac{\text{max\_possible\_boost}}{\text{learning\_time(x)}}$ и  $\text{learning\_coeff}(\text{x}) = \frac{\text{max\_possible\_time} - \text{learning\_time}(x)}{\text{max\_possible\_time}}$

In [None]:
def find_best_settings(bench_name, condition=None, beta=2):
    max_learning_time = max(learning_times[bench_name].values())
    max_speedup = def_times[bench_name] - best_e2e_times[bench_name]
    best_score, best_settings = float("-inf"), None
    
    for settings in settings_pool:
        if condition and not condition(settings):
            continue
        saved_learning_time = max_learning_time - learning_times[bench_name][settings]
        learning_coef = saved_learning_time / max_learning_time
        boost = def_times[bench_name] - e2e_times[bench_name][settings]
        assert boost >= 0, (boost, settings)
        boost_coef = boost / max_speedup
        score = (1 + beta ** 2) * learning_coef * boost_coef / (beta ** 2 * learning_coef + boost_coef)
        if score > best_score:
            best_score, best_settings = score, settings

    speedup = def_times[bench_name] - e2e_times[bench_name][best_settings]
    speedup_coef = speedup / max_speedup 
    learning_time = learning_times[bench_name][best_settings]

    n_tried_states, n_seen_plans, n_plans = 0, 0, 0
    all_states = [SearchingState(hintset, dop) for dop in DOPS for hintset in HINTSETS]    
    oracle = cached_oracles[bench_name]
    queries = oracle.get_query_names()

    for query_name in queries:
        explorer = SequentialQueryExplorer(query_name=query_name, oracle=oracle, settings=best_settings)
        _ = explorer.run()
        n_tried_states += len(set(explorer.tried_states))
        n_seen_plans += len(explorer.seen_plans)
        n_plans += len(set([get_full_plan(explorer._get_explain_plan(st)) for st in all_states]))

    best_settings = best_settings._asdict()
    return {
        "Beta": beta,
        "Boost (% of optimum)": round(100 * speedup_coef, 1),
        "Learning Time (sec)": round(learning_time, 1),
        "Visited States": f"{n_tried_states}/{len(queries) * len(all_states)}",
        "Visited Plans": f"{n_seen_plans}/{n_plans}",
        "`disable_joins`": best_settings["disable_joins"],
        "`disable_scans`": best_settings["disable_scans"],
        "`decrease_dop`": best_settings["decrease_dop"],
        "`use_joined_search`": best_settings["use_joined_search"],
        "`disable_inl`": best_settings["disable_inl"],
        "`relative_boost_threshold`": best_settings["relative_boost_threshold"],
        "`max_iter`": best_settings["max_iter"],
        "`avoid_duplicates`": best_settings["avoid_duplicates"],
    }

In [None]:
for bench_name in cached_oracles:
    results = []
    condition = lambda el: not el.prioritize_neighbors
    for beta in [1/10, 1/5, 1, 2, 10]:
        result = find_best_settings(bench_name, condition=condition, beta=beta)
        results.append(result)
    pd.DataFrame(results).to_csv(f"{ARTIFACTS_PATH}/{bench_name}_well_balanced_settings.csv", index=False)

# *How well are we reducing search space via local search procedure?*

In [None]:
for bench_name in cached_oracles:
    results = []
    condition = lambda el: el.prioritize_neighbors
    for beta in [1/10, 1/5, 1, 2, 10]:
        result = find_best_settings(bench_name, condition=condition, beta=beta)
        results.append(result)
    pd.DataFrame(results).to_csv(f"{ARTIFACTS_PATH}/{bench_name}_well_balanced_settings_with_priority.csv", index=False)