In [1]:
import pandas as pd
import numpy as np
import copy
import sys
import os
sys.path.append("../")
from parser.utils import load_json, dfs_cardinality, estimate_scan_in_mb
from models.feature.single_xgboost_feature import find_top_k_operators, featurize_one_plan, get_top_k_table_by_size
from utils.load_brad_trace import load_trace, create_concurrency_dataset, load_trace_all_version
from models.concurrency.utils import pre_info_train_test_seperation
import torch
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence, pad_sequence
from models.single.stage import SingleStage
from models.concurrency.complex_models import ConcurrentRNN
np.set_printoptions(suppress=True)

In [2]:
import pandas as pd
import numpy as np
from typing import Optional, Tuple
from utils.load_brad_trace import (
    load_trace,
    create_concurrency_dataset,
    load_trace_all_version,
)
from models.single.stage import SingleStage
from models.concurrency.complex_models import ConcurrentRNN
from scheduler.base_scheduler import BaseScheduler


class QueryBank:
    def __init__(
        self, sql_query_file: str, query_runtime_path: str, seed: int = 0
    ) -> None:
        with open(sql_query_file, "r") as f:
            sql_queries = f.readlines()
        query_runtime = np.load(query_runtime_path)
        assert len(sql_queries) == len(query_runtime)
        idx = np.argsort(query_runtime)
        self.query_runtime = query_runtime[idx]
        self.sql_queries = [sql_queries[i] for i in idx]
        self.query_len = len(self.query_runtime)
        np.random.seed(seed)

    def random_sample(self) -> (str, float):
        # make a random sample of the query
        idx = np.random.randint(self.query_len)
        return self.sql_queries[idx], self.query_runtime[idx]

    def sample_by_runtime(self, runtime: float) -> (str, float):
        # sample a query that best matches the runtime
        idx = np.searchsorted(self.query_runtime, runtime)
        idx = max(idx, self.query_len - 1)
        return self.sql_queries[idx], self.query_runtime[idx]


class Simulator:
    def __init__(
        self, scheduler: BaseScheduler, query_bank: Optional[QueryBank] = None, pause_wait_s: float = 1.0
    ):
        self.scheduler = scheduler
        self.query_bank = query_bank
        self.pause_wait_s = pause_wait_s

    def replay_one_query(self, start_time: float, next_query_start_time: Optional[float] = None,
                         query_str: Optional[int] = None, query_idx: Optional[int] = None):
        # Todo: this logical should go to the scheduler
        should_immediate_re_ingest, should_pause_and_re_ingest = self.scheduler.ingest_query_simulation(
            start_time, query_str=query_str, query_idx=query_idx
        )
        print(should_immediate_re_ingest, should_pause_and_re_ingest)
        if should_immediate_re_ingest:
            # the scheduler schedules one query at a time even if there are multiple queries in the queue, so need to call again
            self.replay_one_query(start_time + 0.001)
        if should_pause_and_re_ingest:
            if next_query_start_time is not None and next_query_start_time <= start_time + self.pause_wait_s:
                return
            self.replay_one_query(start_time + self.pause_wait_s)

    def replay_workload(self, directory: str) -> Tuple[np.ndarray, np.ndarray]:
        all_raw_trace, all_trace = load_trace(directory, 8, concat=True)
        concurrency_df = create_concurrency_dataset(all_trace, engine=None, pre_exec_interval=200)
        concurrency_df = concurrency_df.sort_values(by=['start_time'], ascending=True)
        original_predictions = self.scheduler.make_original_prediction(concurrency_df)
        assert len(concurrency_df) == len(original_predictions)
        original_runtime = []
        all_start_time = concurrency_df["start_time"].values
        all_query_idx = concurrency_df["query_idx"].values
        for i in range(len(concurrency_df)):
            original_runtime.append(original_predictions[i])
            # replaying the query one-by-one
            if i < len(concurrency_df):
                next_query_start_time = all_start_time[i + 1]
            else:
                next_query_start_time = None
            self.replay_one_query(all_start_time[i], next_query_start_time, i, all_query_idx[i])
        # finish all queries
        self.scheduler.finish_query(np.infty)
        new_runtime = []
        for i in range(len(concurrency_df)):
            new_runtime.append(self.scheduler.all_query_runtime[i])
        original_runtime = np.asarray(original_runtime)
        new_runtime = np.asarray(new_runtime)
        return original_runtime, new_runtime


In [16]:
import pandas as pd
import numpy as np
import torch
from typing import Optional, Tuple, List, Union, MutableMapping
from models.single.stage import SingleStage
from models.concurrency.complex_models import ConcurrentRNN


def reverse_index_list(lst: List, pop_index: List[int]) -> List:
    return [
        lst[i] for i in range(len(lst)) if i not in pop_index
    ]


class BaseScheduler:
    def __init__(
        self,
        stage_model: SingleStage,
        predictor: ConcurrentRNN,
        max_concurrency_level: int = 5,
    ):
        self.stage_model = stage_model
        self.predictor = predictor
        self.max_concurrency_level = max_concurrency_level

        self.existing_query_features: List[np.ndarray] = []
        self.existing_query_concur_features: List[Optional[torch.Tensor]] = []
        self.existing_pre_info_length: List[int] = []
        self.existing_start_time: List[float] = []
        self.existing_finish_time: List[float] = []
        self.existing_runtime_prediction_dict: MutableMapping[Union[str, int], float] = dict()
        self.existing_runtime_prediction: List[float] = []
        self.queued_query_features: List[np.ndarray] = []
        self.current_time = 0
        self.running_queries: Union[List[str], List[int]] = []
        self.queued_queries: Union[List[str], List[int]] = []
        self.existing_enter_time: List[float] = []
        self.queued_queries_enter_time: List[float] = []
        self.all_query_runtime: MutableMapping[Union[str, int], float] = dict()

    def make_original_prediction(self, trace: pd.DataFrame) -> np.ndarray:
        all_pred, _ = self.predictor.predict(trace, return_per_query=False)
        return all_pred

    def ingest_query(self, start_t: float, query_idx: int):
        return None

    def print_state(self):
        print("current time: ", self.current_time)
        print("running_queries: ", list(zip(self.running_queries, self.existing_runtime_prediction)))
        print("queued_queries: ", self.queued_queries)

    def submit_query(
        self,
        pos_in_queue: int,
        query_rep: Union[str, int],
        pred_runtime: float,
        query_feature: np.ndarray,
        submit_time: float,
        enter_time: float,
        finish_t: float,
        query_concur_features: Optional[torch.Tensor],
        pre_info_length: int,
        new_existing_finish_time: Optional[List[float]] = None,
        new_existing_runtime_prediction: Optional[List[float]] = None,
        new_existing_query_concur_features: Optional[List[Optional[torch.Tensor]]] = None
    ):
        # first upload the prediction on existing runtime when a new query is submitted
        if new_existing_finish_time is not None:
            self.existing_finish_time = new_existing_finish_time
        if new_existing_runtime_prediction is not None:
            self.existing_runtime_prediction = new_existing_runtime_prediction
        if new_existing_query_concur_features is not None:
            self.existing_query_concur_features = new_existing_query_concur_features
        self.running_queries.append(query_rep)
        self.existing_query_features.append(query_feature)
        self.existing_start_time.append(submit_time)
        self.existing_finish_time.append(finish_t)
        self.existing_query_concur_features.append(query_concur_features)
        self.existing_pre_info_length.append(pre_info_length)
        self.existing_enter_time.append(enter_time)
        self.existing_runtime_prediction.append(pred_runtime)
        self.queued_queries.pop(pos_in_queue)
        self.queued_query_features.pop(pos_in_queue)
        self.queued_queries_enter_time.pop(pos_in_queue)


    def finish_query(self, current_time: float = None) -> None:
        if current_time is not None:
            self.current_time = current_time
        pop_index = []
        for i, finish_t in enumerate(self.existing_finish_time):
            if finish_t <= self.current_time:
                pop_index.append(i)
                query_str = self.running_queries[i]
                self.all_query_runtime[query_str] = (
                    finish_t - self.existing_enter_time[i]
                )
        if len(pop_index) == 0:
            return
        length = len(self.existing_finish_time)
        self.running_queries = reverse_index_list(self.running_queries, pop_index)
        self.existing_enter_time = reverse_index_list(self.existing_enter_time, pop_index)
        self.existing_query_features = reverse_index_list(self.existing_query_features, pop_index)
        self.existing_runtime_prediction = reverse_index_list(self.existing_runtime_prediction, pop_index)
        self.existing_start_time = reverse_index_list(self.existing_start_time, pop_index)
        self.existing_finish_time = reverse_index_list(self.existing_finish_time, pop_index)
        # Todo: the last two needs change when we remove a query from its pre info,
        #  or we train with sufficient squence length
        self.existing_query_concur_features = [
            self.existing_query_concur_features[i]
            for i in range(length)
            if i not in pop_index
        ]
        self.existing_pre_info_length = [
            self.existing_pre_info_length[i]
            for i in range(length)
            if i not in pop_index
        ]

    def ingest_query_simulation(
        self,
        start_t: float,
        query_str: Optional[Union[str, int]] = None,
        query_idx: Optional[int] = None,
    ) -> Tuple[bool, bool]:
        """We work on planning the currently queued queries if quert_str is None (i.e., no query submitted)"""
        self.current_time = start_t
        self.finish_query()
        should_immediate_re_ingest = False
        should_pause_and_re_ingest = False
        if query_str is not None:
            self.queued_queries.append(query_str)
            self.queued_queries_enter_time.append(start_t)
            query_feature = self.stage_model.featurize_online(query_idx)
            self.queued_query_features.append(query_feature)

        if len(self.queued_query_features) == 0:
            # nothing to do when there is no query in the queue
            return should_immediate_re_ingest, should_pause_and_re_ingest

        predictions, global_x, global_pre_info_length = self.predictor.online_inference(
            self.existing_query_features,
            self.existing_query_concur_features,
            self.existing_pre_info_length,
            self.queued_query_features,
            self.existing_start_time,
            start_t,
        )

        predictions = predictions.reshape(-1).detach().numpy()
        # Todo: add algorithms to decide whether to put in queue or directly for execution
        if len(self.running_queries) == 0:
            # submit up to self.max_concurrency_level number of queries in queue when there is no query running
            # Todo: this is not optimal
            assert len(predictions) == len(self.queued_queries)
            sort_idx = np.argsort(predictions)
            if len(sort_idx) >= self.max_concurrency_level:
                sort_idx = sort_idx[: self.max_concurrency_level]
            submit_query_str = []
            submit_query_feature = []
            submit_enter_time = []
            submit_pred_runtime = []
            for i in sort_idx:
                submit_query_str.append(self.queued_queries[i])
                submit_query_feature.append(self.queued_query_features[i])
                submit_enter_time.append(self.queued_queries_enter_time[i])
                submit_pred_runtime.append(float(predictions[i]))
            for i, idx in enumerate(sort_idx):
                finish_t = float(predictions[idx]) + start_t
                query_str = submit_query_str[i]
                query_feature = submit_query_feature[i]
                enter_t = submit_enter_time[i]
                pred_runtime = submit_pred_runtime[i]
                self.submit_query(
                    idx,
                    query_str,
                    pred_runtime,
                    query_feature,
                    start_t,
                    enter_t,
                    finish_t,
                    None,
                    int(global_pre_info_length[idx]),
                )
            return should_immediate_re_ingest, should_pause_and_re_ingest
        elif len(self.running_queries) >= self.max_concurrency_level:
            # when the server is running at its full capacity, should pause and retry
            should_pause_and_re_ingest = True
            return should_immediate_re_ingest, should_pause_and_re_ingest
        else:
            # Todo: implement some better algos
            # Todo: add another logic: if the currently queued queries are all "bad" for the system load, pause and retry
            all_new_existing_pred = []
            all_curr_pred = []
            all_delta_sum = []
            all_query_concur_feature = []
            all_global_pre_info_length = []
            all_existing_query_concur_feature = []
            for i in range(len(self.queued_queries)):
                pred_idx = i * (1 + len(self.existing_query_concur_features))
                all_global_pre_info_length.append(global_pre_info_length[pred_idx])
                curr_pred = predictions[pred_idx]
                curr_concur_feature = global_x[pred_idx]
                all_curr_pred.append(curr_pred)
                old_existing_pred = np.asarray(self.existing_runtime_prediction)
                new_existing_pred = predictions[(pred_idx + 1): (pred_idx + len(self.existing_query_concur_features) + 1)]
                curr_existing_query_concur_feature = []
                for j in range(pred_idx + 1, pred_idx + len(self.existing_query_concur_features) + 1):
                    curr_existing_query_concur_feature.append(global_x[j])
                all_new_existing_pred.append(new_existing_pred)
                all_query_concur_feature.append(curr_concur_feature)
                all_existing_query_concur_feature.append(curr_existing_query_concur_feature)
                # realistically, should be a positive number, the smaller, the better
                delta = new_existing_pred - old_existing_pred
                all_delta_sum.append(np.sum(delta))
            # Heuristic to submit the query that incur minimal delta on the existing queries, then resubmit the next
            selected_idx = np.argmin(all_delta_sum)
            finish_t = all_curr_pred[selected_idx] + start_t
            new_existing_finish_time = []
            for i in range(len(self.existing_start_time)):
                new_existing_finish_time.append(all_new_existing_pred[selected_idx][i] + self.existing_start_time[i])
            self.submit_query(
                selected_idx,
                self.queued_queries[selected_idx],
                all_curr_pred[selected_idx],
                self.queued_query_features[selected_idx],
                start_t,
                self.queued_queries_enter_time[selected_idx],
                finish_t,
                all_query_concur_feature[selected_idx],
                int(global_pre_info_length[selected_idx]),
                new_existing_finish_time,
                list(all_new_existing_pred[selected_idx]),
                all_existing_query_concur_feature[selected_idx]
            )
            # immediately resubmit the next
            should_immediate_re_ingest = True
            return should_immediate_re_ingest, should_pause_and_re_ingest



In [4]:
parsed_queries_path = "/Users/ziniuw/Desktop/research/Data/AWS_trace/mixed_aurora/aurora_mixed_parsed_queries.json"
plans = load_json(parsed_queries_path, namespace=False)
folder_name = "mixed_aurora"
directory = f"/Users/ziniuw/Desktop/research/Data/AWS_trace/{folder_name}/"
all_raw_trace, all_trace = load_trace_all_version(directory, 8, concat=True)
all_concurrency_df = []
for trace in all_trace:
    concurrency_df = create_concurrency_dataset(trace, engine=None, pre_exec_interval=200)
    all_concurrency_df.append(concurrency_df)
concurrency_df = pd.concat(all_concurrency_df, ignore_index=True)
train_trace_df_sep, eval_trace_df_sep = pre_info_train_test_seperation(concurrency_df)
print(len(train_trace_df_sep), len(eval_trace_df_sep))
np.random.seed(0)
train_idx = np.random.choice(len(concurrency_df), size=int(0.8 * len(concurrency_df)), replace=False)
test_idx = [i for i in range(len(concurrency_df)) if i not in train_idx]
train_trace_df = copy.deepcopy(concurrency_df.iloc[train_idx])
eval_trace_df = concurrency_df.iloc[test_idx]
eval_trace_df = copy.deepcopy(eval_trace_df[eval_trace_df['num_concurrent_queries'] > 0])
print(len(train_trace_df), len(eval_trace_df))

28925 25561
43967 10907


In [5]:
concurrency_df = create_concurrency_dataset(all_trace[0], engine=None, pre_exec_interval=200)
len(concurrency_df)

3157

In [6]:
ss = SingleStage(use_table_features=True, true_card=True)
#df = ss.featurize_data(train_trace_df, parsed_queries_path)
df = ss.featurize_data(concurrency_df, parsed_queries_path)
ss.train(df)
rnn = ConcurrentRNN(ss, 
                    input_size=len(ss.all_feature[0]) * 2 + 7,
                    embedding_dim=128,
                    hidden_size=256,
                    num_layers=2,
                    loss_function="q_loss",
                    last_output=True,
                    use_seperation=False
                   )
rnn.load_model("checkpoints")
preds, labels = rnn.predict(eval_trace_df_sep, use_pre_info_only=False)

Top 20 operators contains 0.9650782102582758 total operators


100%|████████████████████████████████████████| 200/200 [00:01<00:00, 160.91it/s]


50% absolute error is 0.7304885387420654, q-error is 1.622496247291565
90% absolute error is 7.360652923583984, q-error is 4.764204025268555
95% absolute error is 17.918014526367188, q-error is 7.269602298736572


In [24]:
concurrency_df.head(5)

Unnamed: 0,index,query_idx,runtime,start_time,end_time,pre_exec_info,concur_info,num_concurrent_queries,concur_info_train,num_concurrent_queries_train
0,0,29,110.210543,0.0,110.210543,[],"[(143, 0.0, 3.958360195159912), (135, 0.887088...",24,[],0
1,1,143,3.95836,0.0,3.95836,[],"[(29, 0.0, 110.210542678833), (135, 0.88708800...",2,"[(29, 0.0, 110.210542678833)]",1
2,2,135,3.47903,0.887088,4.366118,[],"[(29, 0.0, 110.210542678833), (143, 0.0, 3.958...",2,"[(29, 0.0, 110.210542678833), (143, 0.0, 3.958...",2
3,3,75,63.228388,6.449771,69.678159,"[(143, 0.0, 3.958360195159912), (135, 0.887088...","[(29, 0.0, 110.210542678833), (36, 6.515793, 7...",9,"[(29, 0.0, 110.210542678833)]",1
4,4,36,0.853417,6.515793,7.36921,"[(143, 0.0, 3.958360195159912), (135, 0.887088...","[(29, 0.0, 110.210542678833), (75, 6.449771, 6...",2,"[(29, 0.0, 110.210542678833), (75, 6.449771, 6...",2


In [17]:
scheduler = BaseScheduler(ss, rnn)
simulator = Simulator(scheduler)
concurrency_df = concurrency_df.sort_values(by=['start_time'], ascending=True)
original_predictions = scheduler.make_original_prediction(concurrency_df)
assert len(concurrency_df) == len(original_predictions)

100%|███████████████████████████████████████████| 25/25 [00:00<00:00, 64.38it/s]

50% absolute error is 1.7421255111694336, q-error is 1.2857015132904053
90% absolute error is 20.836120605468754, q-error is 3.267798900604248
95% absolute error is 38.862800598144496, q-error is 4.820914173126212





In [18]:

original_runtime = []
all_start_time = concurrency_df["start_time"].values
all_query_idx = concurrency_df["query_idx"].values
for i in range(len(concurrency_df)):
    original_runtime.append(original_predictions[i])
    # replaying the query one-by-one
    if i < len(concurrency_df):
        next_query_start_time = all_start_time[i + 1]
    else:
        next_query_start_time = None
    simulator.replay_one_query(all_start_time[i], next_query_start_time, i, all_query_idx[i])
    print("==============================", i)
    scheduler.print_state()
    if i == 20:
        break

current time:  0.0
running_queries:  [(0, 4.87758731842041)]
queued_queries:  []
current time:  0.001
running_queries:  [(0, 6.818284), (1, 3.0237453)]
queued_queries:  []


RuntimeError: Tensors must have same number of dimensions: got 2 and 1

In [None]:
simulator.replay_one_query(1.87, 2.76, 143, 143)
scheduler.print_state()