In [1]:
import pandas as pd
import numpy as np
import json
from sklearn.preprocessing import OneHotEncoder
from sklearn.cluster import KMeans
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.neural_network import MLPRegressor, MLPClassifier
# import lightgbm as lgb
# from xgboost import XGBClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error

import seaborn as sns
import matplotlib.pyplot as plt 

def json_dump(filename, data):
    with open(filename, 'w') as f:
        json.dump(data, f)

In [2]:
import ast
from selection.index_selection_evaluation import DBMSYSTEMS
from selection.what_if_index_creation import WhatIfIndexCreation
from selection.data_preparation import read_csv, index_conversion
from selection.table_generator import TableGenerator
from selection.workload import Query, Index, Column, Table

config_file = "config.json"
with open(config_file) as f:
    config = json.load(f)

In [3]:
config_file = "config.json"
with open(config_file) as f:
    config = json.load(f)
dbms_class = DBMSYSTEMS[config["database_system"]]
generating_connector = dbms_class(None, autocommit=True)
table_generator = TableGenerator(config["benchmark_name"], config["scale_factor"], generating_connector)

In [4]:
def convert_configuration_to_obj(confg_string):
    configs, tables = [], set()
    for config_s in confg_string:
        if config_s == "[]": continue
        config = []
        indexes_s = config_s.split('I')
        for index_s in indexes_s:
            if index_s == '': continue
            table_columns = index_s.split('C')
            columns = []
            for table_column in table_columns:
                table_column = table_column.strip('(), ')
                if table_column == '': continue
                table = table_column.split('.')[0]
                column = Column(table_column.split('.')[-1])
                column.table = Table(table)
                column.table.add_column(column)
                columns.append(column)
                tables.add(column.table)
            config.append(Index(columns))
        configs.append(config)
    return configs, list(tables)

In [5]:
data_table_info = read_csv("../tpcds50trow.csv")[2:-2]
tables = {}
for table_info in data_table_info:
    table_info_tuple = table_info[0].split('|')
    if len(table_info_tuple) < 2: continue
    table_name = table_info_tuple[0].strip()
    row_count = float(table_info_tuple[1].strip())
    table = Table(table_name)
    table.set_row_count(row_count)
    tables[table_name] = table

In [6]:
TPC_DS_TABLE_PREFIX = {
    "dv": "dbgen_version",
    "ca": "customer_address",
    "cd": "customer_demographics",
    "d": "date_dim",
    "w": "warehouse",
    "sm": "ship_mode",
    "t": "time_dim",
    "r": "reason",
    "ib": "income_band",
    "i": "item",
    "s": "store",
    "cc": "call_center",
    "c": "customer",
    "web": "web_site",
    "sr": "store_returns",
    "hd": "household_demographics",
    "wp": "web_page",
    "p": "promotion",
    "cp": "catalog_page",
    "inv": "inventory",
    "cr": "catalog_returns",
    "wr": "web_returns",
    "ws": "web_sales",
    "cs": "catalog_sales",
    "ss": "store_sales"
}

data_column_info = read_csv("../tpcds50stats.csv")[2:-2]
columns = []
for column_info in data_column_info:
    column_info_tuple = column_info[0].split('|')
    if len(column_info_tuple) < 3: continue
    column_name = column_info_tuple[0].strip()
    n_distinct = float(column_info_tuple[2].strip())
    column = Column(column_name)
    column.set_cardinality(n_distinct)
    if (prefix := column_name.split('_')[0]) in TPC_DS_TABLE_PREFIX.keys():
        table_name = TPC_DS_TABLE_PREFIX[prefix]
        if table_name in tables.keys():
            column.table = tables[table_name]
            column.table.add_column(column)      
    columns.append(column)

In [7]:
# data_list[i][0]: Query ID and Query text for the i-th query
# data_list[i][1]: Index configurations for the i-th query
# data_list[i][2]: Average cost of each configuration for the i-th query
# data_list[i][3]: Query execution plan of each configuration for the i-th query
# data_list[i][4]: Details execution costs (each query is executed 4 times and the last 3 times are recorded) of each configuration for the i-th query

ds_data_list_string = read_csv("../TPC_DS_50GB.csv")
def read_data(data_list_string):
    data, queries = [], []
    for i in range(len(data_list_string)):
        data_list_string[i][0] = ast.literal_eval(data_list_string[i][0])
        query = Query(data_list_string[i][0][0], data_list_string[i][0][1])
        query.columns = [column for column in columns if column.name in query.text]
        queries.append(query)
        indexes_string = index_conversion(data_list_string[i][1])
        index_configuration = convert_configuration_to_obj(indexes_string)
        average_cost = ast.literal_eval(data_list_string[0][2])
        data.append([query, index_configuration, average_cost, ast.literal_eval(data_list_string[i][3])])
    return data, queries

ds_data, queries = read_data(ds_data_list_string)

[]
I(C date_dim.d_year,C date_dim.d_week_seq)
[]
I(C date_dim.d_year)
I(C store_sales.ss_sold_date_sk,C store_sales.ss_item_sk)
I(C item.i_manufact_id,C item.i_item_sk)
[]
I(C date_dim.d_date,C date_dim.d_date_sk)
I(C catalog_sales.cs_sold_date_sk)
I(C store_returns.sr_returned_date_sk)
I(C catalog_returns.cr_returned_date_sk)
I(C date_dim.d_date_sk,C date_dim.d_date)
I(C store_sales.ss_sold_date_sk)
I(C web_site.web_site_id)
[]
I(C customer_demographics.cd_education_status,C customer_demographics.cd_marital_status)
I(C date_dim.d_year,C date_dim.d_date_sk)
I(C item.i_item_id)
I(C promotion.p_promo_sk)
[]
I(C customer_address.ca_address_sk,C customer_address.ca_zip)
I(C date_dim.d_year)
[]
I(C web_sales.ws_sold_date_sk)
I(C date_dim.d_date_sk,C date_dim.d_date)
[]
I(C customer_address.ca_state)
I(C household_demographics.hd_dep_count)
I(C date_dim.d_year,C date_dim.d_date_sk)
I(C customer_demographics.cd_demo_sk)
[]
I(C date_dim.d_date_sk)
I(C catalog_sales.cs_sold_date_sk)
[]
I(C cust

In [8]:
import data_gen
query_plans_with_index, query_costs_with_index, query_plans = data_gen.run()

### Index Filter

#### Labels

In [9]:
from selection.cost_evaluation import CostEvaluation
from selection.index_selection_evaluation import DBMSYSTEMS

db_connector = DBMSYSTEMS["postgres"]("indexselection_tpch___10")
cost_evaluation = CostEvaluation(db_connector)

# workload_cost_with_index = []
# for index in all_syntactically_relevant_indexes:
#     cost = cost_evaluation.calculate_cost(workload, set([index]))
#     workload_cost_with_index.append(round(cost, 2))
#     cost_evaluation._unsimulate_or_drop_index(index)

# print(workload_cost_with_index)

In [10]:
PHYISCAL_TO_LOGICAL_OPERATOR_MAP = {
    "Seq Scan": "Scan",
    "Bitmap Index Scan": "Scan",
    "Bitmap Heap Scan": "Scan",
    "Index Scan": "Scan",
    "Index Only Scan": "Scan",
    "CTE Scan": "Scan",
    "Subquery Scan": "Scan",
    "Sort": "Sort",
    "Incremental Sort": "Sort",
    "Hash Join": "Join",
    "Merge Join": "Join",
    "Nested Loop": "Join",
    "Aggregate": "Aggregate",
    "WindowAgg": "Aggregate",
    "Group": "Aggregate",
    "Gather Merge": "",
    "Gather": "",
    "BitmapOr": "",
    "BitmapAnd": "",
    "Limit": "",
    "Hash": "",
    "Result": "",
    "SetOp": "",
    "Append": "",
    "Materialize": "",
    "Unique": "",
    "Merge Append": "",
    
}

LOGICAL_OPERATORS = ["Scan", "Join", "Aggregate", "Sort"]

In [11]:
def has_child_node(query_plan):
    return "Plans" in query_plan.keys()

def has_filtering_property(query_plan):
    if "Filter" in query_plan.keys():
        return query_plan["Filter"]
    if "Hash Cond" in query_plan.keys():
        return query_plan["Hash Cond"]
    if "Join Filter" in query_plan.keys():
        return query_plan["Join Filter"]
    return ""

def is_join_operator(operator):
    return PHYISCAL_TO_LOGICAL_OPERATOR_MAP[operator] == "Join"

def is_sort_operator(operator):
    return PHYISCAL_TO_LOGICAL_OPERATOR_MAP[operator] == "Sort"

def is_aggregate_operator(operator):
    return PHYISCAL_TO_LOGICAL_OPERATOR_MAP[operator] == "Aggregate"

def is_scan_operator(operator):
    return PHYISCAL_TO_LOGICAL_OPERATOR_MAP[operator] == "Scan"

def check_indexed_column_in_condition(index, condition):
    for column in index.columns:
        if column.name in condition:
            return True

def get_table_from_plan_node(query_plan):
    table = ""
    if "Relation Name" in query_plan.keys():
        table = query_plan["Relation Name"]
    return table

In [12]:
physical_operators = set()

def collect_physical_operators(physical_operators, query_plan): 
    physical_operators.add(query_plan["Node Type"])
    if has_child_node(query_plan):
        for child_node in query_plan["Plans"]:
            collect_physical_operators(physical_operators, child_node)
            
# for _, query_plan in query_plans_with_index.items():
#     collect_physical_operators(physical_operators, query_plan) 

for sample in ds_data:
    query_plans = sample[3]
    for query_plan in query_plans:
        collect_physical_operators(physical_operators, query_plan)

physical_operators = list(physical_operators)  
print(physical_operators)

['Index Only Scan', 'Result', 'BitmapAnd', 'Gather', 'Materialize', 'WindowAgg', 'Merge Join', 'BitmapOr', 'Merge Append', 'Limit', 'Bitmap Heap Scan', 'Aggregate', 'CTE Scan', 'Nested Loop', 'Unique', 'Hash Join', 'Subquery Scan', 'Hash', 'Append', 'Incremental Sort', 'Group', 'Bitmap Index Scan', 'Gather Merge', 'Index Scan', 'Sort', 'SetOp', 'Seq Scan']


#### Feautures

In [13]:
# get cardinatlity statistics
def generate_cardinality_statistics(db_connector, tables):
    for table in tables:
        row_count = db_connector.table_row_count(table.name)
        table.set_row_count(row_count)
        for column in table.columns:
            card = db_connector.get_column_cardinality(column)
            column.set_cardinality(-card * row_count if card < 0 else card)

In [14]:
# signal 1
def estimate_index_utility(index, original_query_plan, indexed_query_plan):
    total_cost = 0
    if has_child_node(original_query_plan):
        for original_child_node, indexed_child_node in zip(indexed_query_plan["Plans"], indexed_query_plan["Plans"]):
            total_cost += estimate_index_utility(index, original_child_node, indexed_child_node)
    current_operator = indexed_query_plan["Node Type"]
    current_cost = original_query_plan["Total Cost"]
    if (condition := has_filtering_property(indexed_query_plan)) != "":
        if is_join_operator(current_operator):
            join_output_rows = indexed_query_plan["Plan Rows"]
            left_input_rows = indexed_query_plan["Plans"][0]["Plan Rows"]
            right_input_rows = indexed_query_plan["Plans"][1]["Plan Rows"]
            if check_indexed_column_in_condition(index, condition):    
                current_cost = (1-np.sqrt(join_output_rows/(left_input_rows*right_input_rows)))*original_query_plan["Total Cost"]
        else:
            selectivities = [indexed_query_plan["Plan Rows"]/column.table.row_count for column in index.columns if column.name in condition]
            average_selectivity = sum(selectivities)/len(selectivities) if len(selectivities) > 0 else 0
            current_cost = (1-average_selectivity)*original_query_plan["Total Cost"]
    elif is_sort_operator(current_operator) and "Sort Key" in indexed_query_plan.keys():
        sort_conditions = indexed_query_plan["Sort Key"]
        for sort_condition in sort_conditions:
            if check_indexed_column_in_condition(index, sort_condition):
                current_cost = indexed_query_plan["Total Cost"]
    elif is_aggregate_operator(current_operator) and "Group Key" in indexed_query_plan.keys():
        group_conditions = indexed_query_plan["Group Key"]
        for group_condition in group_conditions:
            if check_indexed_column_in_condition(index, group_condition):
                current_cost = indexed_query_plan["Total Cost"]
    return total_cost+current_cost

In [25]:
# signal 2
def extract_shape_of_query_and_index(index, original_query_plan, indexed_query_plan):
    query_shape, index_shape = {}, []
    _extract_query_shape(query_shape, original_query_plan)
    visited = set()
    _extract_index_shape(index_shape, index, indexed_query_plan, visited)
    return query_shape, index_shape

def _extract_query_shape(query_shape, query_plan):
    current_operator = query_plan["Node Type"]
    logical_operator = PHYISCAL_TO_LOGICAL_OPERATOR_MAP[current_operator]
    if is_scan_operator(current_operator):
        table = get_table_from_plan_node(query_plan)
        if table in query_shape.keys():
            query_shape[table].append(logical_operator)
        else:
            query_shape[table] = [logical_operator]
        return table
    
    tables = []    
    if has_child_node(query_plan):
        for child_node in query_plan["Plans"]:
            table = _extract_query_shape(query_shape, child_node)
            if table and logical_operator:
                tables.append(table)
                query_shape[table].append(logical_operator)
    return tables[0] if 0<len(tables)<2 else ""
    
    
def convert_query_shape_to_df(query_shape):
    data = []
    table_names = []
    for table_name, shape in query_shape.values():
        data.append(shape)
        table_names.append(table_name)
    return pd.DataFrame(data, index = table_names)

def _extract_index_shape(index_shape, index, query_plan, visited):
    current_operator = query_plan["Node Type"]
    logical_operator = PHYISCAL_TO_LOGICAL_OPERATOR_MAP[current_operator]
    if has_child_node(query_plan):
        for child_node in query_plan["Plans"]:
            _extract_index_shape(index_shape, index, child_node, visited)
            
    if (condition := has_filtering_property(query_plan)) != "":
        for column in index.columns:
            if column in visited: continue
            elif column.name in condition:
                index_shape.append(logical_operator)
                visited.add(column)
    elif is_sort_operator(current_operator) and "Sort Key" in query_plan.keys():
        sort_conditions = query_plan["Sort Key"]
        for sort_condition in sort_conditions:
            for column in index.columns:
                if column in visited: continue
                elif column.name in sort_condition:
                    index_shape.append(logical_operator)
                    visited.add(column)
    elif is_aggregate_operator(current_operator) and "Group Key" in query_plan.keys():
        aggregate_conditions = query_plan["Group Key"]
        for aggregate_condition in aggregate_conditions:
            for column in index.columns:
                if column in visited: continue
                elif column.name in aggregate_condition:
                    index_shape.append(logical_operator)
                    visited.add(column)

In [16]:
# signal 3
def evaluate_operator_relevance(operator_relevance, index, query_plan):
    if has_child_node(query_plan):
        for child_node in query_plan["Plans"]:
            evaluate_operator_relevance(operator_relevance, index, child_node)
            
    current_operator = query_plan["Node Type"]
    relevance = 0
    if (condition := has_filtering_property(query_plan)) != "":
        selectivities = [query_plan["Plan Rows"]/column.table.row_count for column in index.columns if column.name in condition]
        relevance = sum(selectivities)/len(selectivities) if len(selectivities) > 0 else 0
    elif is_sort_operator(current_operator) and "Sort Key" in query_plan.keys():
        densities = []
        conditions = query_plan["Sort Key"]
        for condition in conditions:
            for column in index.columns:
                if column.name in condition:
                    densities.append(column.cardinality/column.table.row_count)
        relevance = sum(densities)/len(densities) if len(densities) > 0 else 0
    elif is_aggregate_operator(current_operator) and "Group Key" in query_plan.keys():
        densities = []
        conditions = query_plan["Group Key"]
        for condition in conditions:
            for column in index.columns:
                if column.name in condition:
                    densities.append(column.cardinality/column.table.row_count)
        relevance = sum(densities)/len(densities) if len(densities) > 0 else 0
    if current_operator not in operator_relevance: 
        operator_relevance[current_operator] = []
    operator_relevance[current_operator].append(relevance)

In [17]:
# signal 4
def get_number_of_pages(query_plan):
    return query_plan["Shared Hit Blocks"] + query_plan["Shared Read Blocks"] + query_plan["Local Hit Blocks"] + query_plan["Local Read Blocks"]

# seems not supported by postgres:
# https://stackoverflow.com/questions/20410444/postgres-ignoring-clustered-index-on-date-query
def count_clustered_index(db_connector, table_name):
    count = db_connector.count_clustered_indexes(table_name)
    return count

def check_using_bitmap(query_plan):
    current_operator = query_plan["Node Type"]
    use = False
    if is_scan_operator(current_operator):
        use = "Bitmap" in current_operator
    
    if has_child_node(query_plan):
        for child_node in query_plan["Plans"]:
            use |= check_using_bitmap(child_node)
    return use

In [18]:
## testing
# import pandas as pd
# from sklearn.preprocessing import OneHotEncoder
# # create an example dataframe to work with
# df = pd.DataFrame([
#     ["Scan", "Join"],
#     ["Scan", "Aggregate", "Join"],
#     ["Aggregate", "Sort"],
#     ["Sort", "Join"],
#     ["Aggregate", "Scan"]
# ], columns=["operator1", "operator2", "operator3"])

# # create a OneHotEncoder that ignores (0 encodes) unseen categories
# # and encode the categorical features for the example dataframe
# encoder = OneHotEncoder(sparse_output=False)
# X_encoded = encoder.fit_transform(df)
# print(X_encoded)
# print(encoder.categories_)

In [86]:
labels = []
feature_columns = ["utility", "num_pages", "use_bitmap"]
operator_relevance_columns = [f"relevance_{operator}" for operator in LOGICAL_OPERATORS]
feature_columns.extend(operator_relevance_columns)
query_shape_columns = [f"query_shape_operator{i}_on_{table.name}" for table in table_generator.tables for i in range(5)]
feature_columns.extend(query_shape_columns)
index_shape_columns = [f"index_shape_operator{i}" for i in range(5)]
feature_columns.extend(index_shape_columns)
features = pd.DataFrame(columns=feature_columns, index=range(len([index for (_, index) in query_plans_with_index.keys() if index])))

i = 0
for (query, index), indexed_query_plan in query_plans_with_index.items():
    if index == None: continue
    labels.append(query_costs_with_index[(query, index)])
    original_query_plan = query_plans_with_index[(query, None)]
    original_query_cost = query_costs_with_index[(query, None)]
    utility = estimate_index_utility(index, original_query_plan, indexed_query_plan)/original_query_cost
    query_shape, index_shape = extract_shape_of_query_and_index(index, original_query_plan, indexed_query_plan)
    for table, operator_seq in query_shape.items():
        for j, operator in enumerate(operator_seq):
            features.iloc[i][f"query_shape_operator{j}_on_{table}"] = operator
    for j, operator in enumerate(index_shape):
        features.iloc[i][f"index_shape_operator{i}"] = operator
    result = {}
    evaluate_operator_relevance(result, index, original_query_plan)
    for operator in LOGICAL_OPERATORS:
        if operator in result: features.iloc[i][f"relevance_{operator}"] = sum(result[operator])/len(result[operator])
        else: features.iloc[i][f"relevance_{operator}"] = 0
    num_pages = get_number_of_pages(indexed_query_plan)
    use_bitmap = check_using_bitmap(indexed_query_plan)
    features.iloc[i]["utility"] = utility
    features.iloc[i]["num_pages"] = num_pages
    features.iloc[i]["use_bitmap"] = int(use_bitmap)
    i+=1
    
features[["utility", "num_pages"]+operator_relevance_columns] = features[["utility", "num_pages"]+operator_relevance_columns].apply(pd.to_numeric)
features["use_bitmap"] = features["use_bitmap"].astype('int')

print("labels:\n", labels)
features


labels:
 [10637611.65, 19835309.08, 13060075.58, 18130059.14, 6758734.57]


Unnamed: 0,utility,num_pages,use_bitmap,relevance_Scan,relevance_Join,relevance_Aggregate,relevance_Sort,query_shape_operator0_on_nation,query_shape_operator1_on_nation,query_shape_operator2_on_nation,...,query_shape_operator0_on_lineitem,query_shape_operator1_on_lineitem,query_shape_operator2_on_lineitem,query_shape_operator3_on_lineitem,query_shape_operator4_on_lineitem,index_shape_operator0,index_shape_operator1,index_shape_operator2,index_shape_operator3,index_shape_operator4
0,1.0,1125158,0,0,0,0.0,0.0,,,,...,Scan,Aggregate,Sort,,,,,,,
1,0.999975,1422550,0,0,0,6.666612e-08,3.333306e-08,,,,...,Scan,Join,Sort,Aggregate,,,Sort,,,
2,0.999563,1386556,0,0,0,0.0,0.0,,,,...,Scan,,,,,,,Join,,
3,0.999936,1424810,0,0,0,0.0,0.0,Scan,Join,,...,Scan,Join,Join,Join,Sort,,,,Join,
4,0.998083,1125144,0,0,0,0.0,0.0,,,,...,Scan,Aggregate,,,,,,,,Scan


In [94]:
encoder = OneHotEncoder(sparse_output=False)
features_cat = features.select_dtypes(include="object")
features_num = features.select_dtypes(exclude="object")
features_cat_encoded = encoder.fit_transform(features_cat)
features_encoded = np.concatenate((features_num.to_numpy(), features_cat_encoded), axis=1)


#### Modeling

##### Random Forest

In [96]:
regr = RandomForestRegressor(max_depth=2, random_state=0).fit(features_encoded, labels)

##### MLP

### Index Cost Model

In [None]:
filtered_query_index_pairs = []
configurations = {}
for (query, index) in filtered_query_index_pairs:
    if query not in configurations:
        configurations[query] = set()
    configurations[query].add(index)

In [None]:
def calculate_parameter_selectivity(query_plan):
    param_selectivities = []
    calculate_parameter_selectivity(param_selectivities, query_plan)
    return param_selectivities
    
def _calculate_parameter_selectivity(param_selectivities, query_plan):
    if has_child_node(original_query_plan):
        for child_node in query_plan["Plans"]:
            _calculate_parameter_selectivity(child_node)
    if (condition := has_filtering_property(query_plan)) != "":
        for column in index.columns:
            if column.name in condition:
                param_selectivities.append(column, query_plan["Plan Rows"]/column.table.row_count)

def evaluate_configuration(configuration_features, configuration, query_plan):
    current_operator = query_plan["Node Type"]
    if has_child_node(original_query_plan):
        for child_node in query_plan["Plans"]:
            evaluate_configuration(configuration_features, configuration, child_node)
    if (condition := has_filtering_property(query_plan)) != "":
        for index in configuration:
            for column in index.columns:
                if column in condition:
                    configuration_features[f"selectivity_{column.name}"] = query_plan["Plan Rows"]/column.table.row_count
                    if column.name not in configuration_features.keys():
                        configuration_features[column.name] = []
                    configuration_features[column.name].append(current_operator)
    elif is_aggregate_operator(current_operator) and "Group Key" in query_plan.keys():
        conditions = query_plan["Group Key"]
        for index in configuration:
            for column in index.columns:
                for condition in conditions:
                    if column in condition:
                        if column.name not in configuration_features.keys():
                            configuration_features[column.name] = []
                        configuration_features[column.name].append(current_operator)
    elif is_sort_operator(current_operator) and "Sort Key" in query_plan:
        conditions = query_plan["Sort Key"]
        for index in configuration:
            for column in index.columns:
                for condition in conditions:
                    if column in condition:
                        if column.name not in configuration_features.keys():
                            configuration_features[column.name] = []
                        configuration_features[column.name].append(current_operator)

In [None]:
# training 

# parameters
error_threshold = 0.05
training_sample_size_alpha = None
training_sample_size_beta = None
stopping_threshold = None

In [None]:
features = []