In [None]:
import csv, re
import numpy as np
import os
import json
from sklearn.preprocessing import normalize
import pickle
import math
from random import shuffle

## Read data

In [None]:
def parse_table_metadata(file_name="imdb-metadata.json"):
    f = open(file_name)
    columns_all = []
    tables_all = []
    for line in f:
        metadata = json.loads(line)
        table_name = metadata['name']
        columns = metadata['columns']
        tables_all.append((table_name, metadata['length'], metadata['size']))
        for k in columns.keys():
            columns_all.append((table_name + '.' + k, columns[k]))

    # normalize columns
    numerical = []
    minValues = []
    maxValues = []
    nullss = []
    distincts = []
    for _,i in columns_all:
        numerical.append(i['numerical'])
        minValues.append(i['minValue'])
        maxValues.append(i['maxValue'])
        nullss.append(i['nulls'])
        distincts.append(i['distinct'])
    minValues = normalize([minValues], norm="max")
    maxValues = normalize([maxValues], norm="max")
    nullss = normalize([nullss], norm="max")
    distincts = normalize([distincts], norm="max")

    normalizes_column = {}
    for i, (k, _) in enumerate(columns_all):
        if(numerical[i] == 1):
            normalizes_column[k] = np.array((numerical[i], minValues[0][i], maxValues[0][i], nullss[0][i], distincts[0][i]))
        else:
            normalizes_column[k] = np.array((numerical[i], 0, 1, nullss[0][i], distincts[0][i]))
            
    normalized_table = normalize([x[1:] for x in tables_all], axis=0)
    normalizes_table = {}
    for i in range(len(normalized_table)):
        normalizes_table[tables_all[i][0]] = np.array((normalized_table[i][0],normalized_table[i][1]))
    return (normalizes_column, normalizes_table)

normalizes_column, normalizes_table = parse_table_metadata("imdb-metadata.json")
table_column_dict = {}
for i in normalizes_column.keys():
    table_column_dict[i.split(".")[1]] = i.split(".")[0]
    
table_column_dict # column -> table

In [None]:
condition_dim = 20

data_dirs = ['../../imdb-6-16-2-3-48-one-round-1/', '../../imdb-6-16-2-3-48-one-round-missing-1/','../../imdb-6-16-2-3-48-one-round-2/', '../../imdb-6-16-2-3-48-one-round-missing-2/']
dir_resources = [{'n_executor': 6, 'g_mem': 16, 'n_core':2, 'n_worker':3, 'parallelism': 48},{'n_executor': 6, 'g_mem': 16, 'n_core':2, 'n_worker':3, 'parallelism': 48},\
{'n_executor': 6, 'g_mem': 16, 'n_core':2, 'n_worker':3, 'parallelism': 48},{'n_executor': 6, 'g_mem': 16, 'n_core':2, 'n_worker':3, 'parallelism': 48},\
    {'n_executor': 6, 'g_mem': 16, 'n_core':2, 'n_worker':3, 'parallelism': 48},{'n_executor': 6, 'g_mem': 16, 'n_core':2, 'n_worker':3, 'parallelism': 48}]

## Change to the data generated under your own environment

query_ids = []
plan_ids = []
physical_plans = []
times = []
resources = []
group_idxs = [] # record the dir that a data is from

valid = [] # to check if the query is successfully executed, otherwise assign it a time of 1.5 * max
for idx, data_dir in enumerate(data_dirs):
    for file in os.listdir(data_dir):
        csvreader = csv.reader(open(data_dir + file))
        for row in csvreader:
            if len(row) > 3:
                valid.append((row[0], row[1]))

for idx, data_dir in enumerate(data_dirs):
    r = re.compile(r'.*?([a-zA-Z].*)')
    for file in os.listdir(data_dir):
        csvreader = csv.reader(open(data_dir + file))
        for row in csvreader:
            if len(row) > 3:
                physical_plans.append(row[4])
                query_ids.append(row[0])
                plan_ids.append(row[1])
                times.append(row[3])        
                resources.append(dir_resources[idx])
                group_idxs.append(idx)
            else:
                if not (row[0], row[1]) in valid:
                    print(row[0], row[1])
                    physical_plans.append(row[2])
                    query_ids.append(row[0])
                    plan_ids.append(row[1])
                    times.append(-1)        
                    resources.append(dir_resources[idx])
                    group_idxs.append(idx)                   

physical_plans = [[i for i in p.split("\n") if not i.startswith("==")] for p in physical_plans]

print("=== Number of valid physical plans: {}".format(len(valid)))
print("=== Number of physical plans: {}".format(len(physical_plans)))

zipped = list(zip(physical_plans, query_ids, plan_ids, times, resources, group_idxs))

d = {}
for i in zipped:
    d[(i[1], i[2])] = i 

trains = []
vals= []
tests = []
dir = 'imdb-sets/'

with open(dir+'train.txt','r') as f:
    for i in f.readlines():
        q_id = i.strip().split(" ")[0]
        p_id = i.strip().split(" ")[1]
        trains.append(d[(q_id, p_id)])

with open(dir+'val.txt','r') as f:
    for i in f.readlines():
        q_id = i.strip().split(" ")[0]
        p_id = i.strip().split(" ")[1]
        vals.append(d[(q_id, p_id)])
        
with open(dir+'test.txt','r') as f:
    for i in f.readlines():
        q_id = i.strip().split(" ")[0]
        p_id = i.strip().split(" ")[1]
        tests.append(d[(q_id, p_id)])

zipped = trains + vals + tests 
physical_plans, query_ids, plan_ids, times, resources, group_idxs = list(zip(*zipped))

In [None]:
for i, e in enumerate(times):
    if e != -1:
        print(i)
        break
print("== Example:")
print("Query Id: {}".format(query_ids[i]))
print("Plan Id: {}".format(plan_ids[i]))
print("Time： {}".format(times[i]))
print("Resources: {}".format(resources[i]))
print("Group Idx: {}".format(group_idxs[i]))
for e in physical_plans[i]:
    print(e)


In [None]:
def split_plan(operations): # operations: list of string
    # split skeleton and details
    skeleton = []
    detail = []
    flag = False
    for o in operations:
        if(len(o) > 0):
            if(o[0] == "("):
                flag = True
            if(flag == False):
                skeleton.append(o)
            else:
                detail.append(o)
    
    return skeleton, detail

class TreeNode:
  def __init__(self, value, idx):
    self.value = value 
    self.children = [] 
    self.idx = idx
 
  def add_child(self, child_node):
    self.children.append(child_node) 

  def __repr__(self, level=0):
    ret = "  "*level+repr(self.value)+"\n"
    for child in self.children:
        ret += child.__repr__(level+1)
    return ret

# build a tree from the strings  
def parse_skeleton(skeleton):
    nodes = []
    for (i, o) in enumerate(skeleton):
        # each line is a node, find its parent by back-tracking ":"
        level = 0
        for (j, l) in enumerate(o):
            if(l == "-"):
                level = j
                break
        # construct node
        name = re.sub(r"[:* +-]", '', o)
        idx = int(name.split("(")[-1][:-1]) - 1 # the index starts from 1, make it start from 0
        name = name.split("(")[0]
        node = TreeNode(name, idx)
        # find its parent
        if(o.strip(" :").startswith("+-")):
            x = i-1
            for x in range(i-1, 0, -1):
                if(skeleton[x][level-1] != ":"):
                    break
            nodes[x].add_child(node)          
        elif(o.strip(" :").startswith("-")):
            nodes[-1].add_child(node)
        # add node to nodes
        nodes.append(node)
    return nodes[0], nodes # the root of the tree and a list of all nodes

# generate the structure matrix
def gen_struct_matrix(nodes):
    nodes_len = len(nodes)
    matrix = np.zeros([nodes_len, nodes_len])
    for node in nodes:
        for child in node.children:
            matrix[node.idx, child.idx] = 1 # is parent
            matrix[child.idx, node.idx] = -1 # is child
    return matrix

In [None]:
max([int(i) for i in times])

## Parse physical plans

In [None]:
from torchtext.vocab import vocab
from collections import Counter, OrderedDict

class Operation:
    def __init__(self, s):
        idx, operator = self.get_idx_operator(s)
        self.idx = idx
        self.operator = operator
        self.auxi = self.get_auxi(s)
    def __repr__(self):
        return "({}) {} \n  {}\n".format(self.idx, self.operator, self.auxi)
    def get_idx_operator(self, s):
        idx = int(s[0].split(")")[0][1:])
        operator = s[0].split(")")[1].split("[")[0].strip()
        return idx, operator
    def get_auxi(self, s):
        auxi_dict = {}
        for i in s[1:]:
            k = re.sub(r'[^a-zA-Z]', '', i.split(":")[0])
            v = ":".join(i.split(":")[1:]).strip() 
            v = re.sub(r'#[0-9]+', '', v.replace("[","").replace("]","")) # remove "#xx" and "[]"
            if k!="Condition": 
                v = v.split(", ") # take the key words
            else: v = [v]
            auxi_dict[k] = v
            
        return auxi_dict
    
def gen_operator_vocab(operations):
    operators = []
    for o in operations:
        operators += [i.operator for i in o]
    counter = Counter(operators)
    sorted_by_freq_tuples = sorted(counter.items(), key=lambda x: x[1], reverse=True)
    ordered_dict = OrderedDict(sorted_by_freq_tuples)
    v = vocab(ordered_dict, specials=["<unk>"])
    v.set_default_index(v["<unk>"])
    return v

def gen_detail_vocab(operations):
    words = []
    for o in operations:
        for s in o:
            for k,v in s.auxi.items():
                words.append(k)
                for x in v:
                    words.append(x)
    counter = Counter(words)
    sorted_by_freq_tuples = sorted(counter.items(), key=lambda x: x[1], reverse=True)
    ordered_dict = OrderedDict(sorted_by_freq_tuples)
    v = vocab(ordered_dict, specials=["<unk>"])
    v.set_default_index(v["<unk>"])
    return v

def gen_all_vocab(operations):
    words = []
    for o in operations:
        words += [i.operator for i in o]
        for s in o:
            for k,v in s.auxi.items():
                words.append(k)
                for x in v:
                    words.append(x)
    counter = Counter(words)
    sorted_by_freq_tuples = sorted(counter.items(), key=lambda x: x[1], reverse=True)
    ordered_dict = OrderedDict(sorted_by_freq_tuples)
    v = vocab(ordered_dict, specials=["<unk>"])
    v.set_default_index(v["<unk>"])
    return v

# parse details
def parse_detail(detail):
    operations = [[detail[0]]]
    parsed_operations = []
    for line in detail[1:]:
        if(line.startswith("(")):
            operations.append([line])
        else:
            last = operations[-1]
            operations[:-1].append(last.append(line))
    for operation in operations:
        parsed_operations.append(Operation(operation))
    return parsed_operations # return a list of operations, each of which contains operator and auxiliary info

# operation to words
def op_to_vector(operation, v):
    words = [v.lookup_indices([operation.operator])]
    for i in operation.auxi.keys():
        words.append(v.lookup_indices([str(i)]))
        words.append(v.lookup_indices(operation.auxi[i]))

    words = [item for sublist in words for item in sublist]
    return words

In [None]:
# test all
# take the information of each operator (not formatted)
structures = []
details = []
skeleton_lens = []
for plan in physical_plans:
    skeleton, detail = split_plan(plan)
    _, nodes = parse_skeleton(skeleton)
    skeleton_lens.append(len(skeleton))
    structures.append(gen_struct_matrix(nodes))
    details.append(parse_detail(detail))
print(len(structures))
print(len(details))

v = gen_operator_vocab(details)
all_v = gen_all_vocab(details)
print("operator vocab length: {}".format(len(v)))
print(v.get_itos())
print("all vocab length: {}".format(len(all_v)))

from copy import deepcopy
details_copy =deepcopy(details)

print(min(skeleton_lens))
print(max(skeleton_lens))

In [None]:
# extract key information and reformat it
# key information: table, column, predicate
class Condition: 
    def __init__(self, column, operator, operand):
        self.column = column
        self.operator = operator
        self.operand = operand
    def __repr__(self):
        return "Condition <{} {} {}>".format(self.column, self.operator, self.operand)

def reformat_scanparquet(operation):
    auxi = operation.auxi
    table = auxi["Location"][0].split("/")[-1]
    columns = auxi["Output"]
    columns_w_table = [table+"."+c for c in columns]
    new_auxi = {}
    new_auxi["Table"] = [table]
    new_auxi["Columns"] = columns_w_table
    conditions = []
    if auxi.get("PushedFilters"):
        for i in auxi["PushedFilters"]:
            if len(i.split("(")) > 1:
                operator = i.split("(")[0]
                objects = i.split("(")[1][:-1].split(",")
                o = table+"."+objects[0]
                if(len(objects) > 1):
                    operand = objects[1]
                    if(operand.isnumeric()):
                        operand = float(operand)
                    elif(operand in columns):
                        operand = table+"."+operand
                    else:
                        pass
                        # print("unsupported operand: {}".format(operand))
                else: 
                    operand = ''
                conditions.append(Condition(o, operator, operand))
        # new_auxi["Condition"] = conditions
        new_auxi["Condition"] = []
        for i in auxi["PushedFilters"]:
            new_auxi["Condition"] += i.split(", ")
    operation.auxi = new_auxi
    return operation

def reformat_scancsv(operation):
    auxi = operation.auxi
    table = auxi["Location"][0].split("/")[-1].split(".")[0]
    columns = [i.replace("L","") for i in auxi["Output"]]
    columns_w_table = [table+"."+c for c in columns]
    new_auxi = {}
    new_auxi["Table"] = [table]
    new_auxi["Columns"] = columns_w_table
    conditions = []
    
    if auxi.get('PushedFilters'):
        # hard code for wrongly split
        pushedfilters = []
        i = 0
        while i < len(auxi["PushedFilters"]):
            e = auxi["PushedFilters"][i]
            if e.startswith("In"): 
                pushedfilters.append(e+","+auxi["PushedFilters"][i+1])
                i += 2
            else: 
                pushedfilters.append(e)
                i += 1
                
        # print(pushedfilters)
        for i in pushedfilters:
            operator = i.split("(")[0]
            objects = i.split("(")[1][:-1].split(",")
            o = table+"."+objects[0]
            if(len(objects) > 1):
                operand = objects[1]
                if(operand.isnumeric()):
                    operand = float(operand)
                elif(operand in columns):
                    operand = table+"."+operand
                else:
                    pass
                    # print("unsupported operand: {}".format(operand))
            else: 
                operand = ''
            conditions.append(Condition(o, operator, operand))
        # new_auxi["Condition"] = conditions
        new_auxi["Condition"] = []
        for i in auxi["PushedFilters"]:
            new_auxi["Condition"] += i.split(", ")
    operation.auxi = new_auxi
    return operation

def reformat_logicalrelation(operation, parents):
    auxi = operation.auxi
    columns = [i for i in auxi['Arguments'] if not i in ['parquet', 'true', 'false']]
    columns = [i[:-1] for i in columns if i.endswith("L")]
    tables = [table_column_dict[i] for i in columns]
    auxi['Table'] = list(dict.fromkeys(tables))
    auxi['Columns'] = [table_column_dict[i]+'.'+i for i in columns]
    return operation

def reformat_filter(operation, parents):
    parent = parents[0]
    table = parent.auxi["Table"]
    auxi = operation.auxi
    auxi["Table"] = table
    if auxi.get('Input'):
        columns = [i.replace("L","") for i in auxi["Input"]]
        auxi['Columns'] = [table[0]+"."+i for i in columns]
    else:
        auxi['Columns'] = parent.auxi["Columns"]
    # todo parse conditions
    if auxi.get("Condition"):
        auxi['Condition'] = [i.replace("L", "") for i in auxi['Condition']]
    else:
        auxi['Condition'] = []
    return operation

def reformat_project(operation, parents): 
    # project acts like glue - it records the tables and comlumns of the ancestors 
    tables = []
    columns = []
    for parent in parents:
        tables += parent.auxi["Table"]
        columns += parent.auxi["Columns"]
    auxi = operation.auxi
    auxi["Table"] = tables
    auxi['Columns'] = []
    output =  auxi["Output"] if auxi.get("Output") else auxi["Arguments"]
    for i in output:
        for j in columns:
            if i.replace("L", "") in j:
                auxi['Columns'].append(j)
                break
    return operation

def reformat_exchange(operation, parents):
    parent = parents[0]
    table = parent.auxi["Table"]
    auxi = operation.auxi
    auxi["Table"] = table
    auxi['Columns'] = [table[0]+"."+i.strip('L') for i in auxi["Input"]]
    return operation

def reformat_reusedexchange(operation, parents):
    auxi = operation.auxi
    columns = [i.strip('L') for i in auxi["Output"]]
    tables = [table_column_dict[i] for i in columns]
    auxi['Table'] = list(dict.fromkeys(tables))
    auxi['Columns'] = [table_column_dict[i]+'.'+i for i in columns]
    return operation

def reformat_hashagg(operation, parents):
    tables = []
    columns = []
    for parent in parents:
        tables += parent.auxi["Table"]
        columns += parent.auxi["Columns"]
    auxi = operation.auxi
    auxi["Table"] = tables
    auxi['Columns'] = columns
    auxi['Condition'] = [i.replace("L", "") for i in [" AND ".join([i for i in \
        auxi['Functions'] + auxi['AggregateAttributes']])]]
    return operation    

def reformat_sortagg(operation, parents):
    tables = []
    columns = []
    for parent in parents:
        tables += parent.auxi["Table"]
        columns += parent.auxi["Columns"]
    auxi = operation.auxi
    auxi["Table"] = tables
    auxi['Columns'] = columns
    auxi['Condition'] = [i.replace("L", "") for i in [" AND ".join([i for i in \
        auxi['Functions'] + auxi['AggregateAttributes']])]]
    return operation      

def reformat_agg(operation, parents):
    tables = []
    columns = []
    for parent in parents:
        tables += parent.auxi["Table"]
        columns += parent.auxi["Columns"]
    auxi = operation.auxi
    auxi["Table"] = tables
    auxi['Columns'] = columns
    auxi['Condition'] = []
    return operation  

def reformat_sort(operation, parents):
    # print(parents)
    parent = parents[0]
    table = parent.auxi["Table"]
    auxi = operation.auxi
    columns = [i.replace("L","") for i in auxi["Input"]]
    auxi["Table"] = table
    auxi['Columns'] = [table[0]+"."+i for i in columns]
    auxi["Condition"] = [" AND ".join(auxi['Arguments'])]
    return operation  

def reformat_smjoin(operation, parents):
    tables = []
    columns = []
    for parent in parents:
        tables += parent.auxi["Table"]
        columns += parent.auxi["Columns"]
    auxi = operation.auxi
    auxi["Table"] = tables
    auxi['Columns'] = []
    for i in auxi["Leftkeys"] + auxi["Rightkeys"]:
        for j in columns:
            if i.replace("L","") in j:
                auxi['Columns'].append(j)
    return operation

def reformat_join(operation, parents):  
    tables = []
    columns = []
    for parent in parents:
        tables += parent.auxi["Table"]
        columns += parent.auxi["Columns"]
    auxi = operation.auxi
    auxi["Table"] = tables
    auxi['Columns'] = []
    c = [i.replace("L","").replace("(","").replace(")","") for i in auxi['Arguments'][1].split(" = ")] # todo: currently hardcoded
    for i in c:
        for j in columns:
            if i in j:
                auxi['Columns'].append(j)
                break
    return operation 
    
def reformat_bchjoin(operation, parents):
    tables = []
    columns = []
    for parent in parents:
        tables += parent.auxi["Table"]
        columns += parent.auxi["Columns"]
    auxi = operation.auxi
    auxi["Table"] = tables
    auxi['Columns'] = []
    for i in auxi["Leftkeys"] + auxi["Rightkeys"]:
        i = i.replace("L", "")
        for j in columns:
            if i in j:
                auxi['Columns'].append(j)
                break
    return operation 

def reformat_schjoin(operation, parents):
    tables = []
    columns = []
    for parent in parents:
        tables += parent.auxi["Table"]
        columns += parent.auxi["Columns"]
    auxi = operation.auxi
    auxi["Table"] = tables
    auxi['Columns'] = []
    for i in auxi["Leftkeys"] + auxi["Rightkeys"]:
        i = i.replace("L", "")
        for j in columns:
            if i in j:
                auxi['Columns'].append(j)
                break
    return operation 

def reformat_cartesianproduct(operation, parents):
    tables = []
    columns = []
    for parent in parents:
        tables += parent.auxi["Table"]
        columns += parent.auxi["Columns"]
    auxi = operation.auxi
    auxi["Table"] = tables
    auxi['Columns'] = []
    c = [i.strip() for i in parent.auxi["Columns"][0].replace("L","").replace("(","").replace(")","").split("=")]
    for i in c:
        for j in columns:
            if i in j:
                auxi['Columns'].append(j)
                break
    return operation

def reformat_bcexchange(operation, parents):
    tables = []
    columns = []
    for parent in parents:
        tables += parent.auxi["Table"]
        columns += parent.auxi["Columns"]
    auxi = operation.auxi
    auxi["Table"] = tables
    auxi['Columns'] = []
    for i in auxi["Input"]:
        for j in columns:
            if i in j:
                auxi['Columns'].append(j)
                break
    return operation


def reformat(operation, structure):
    parent_indices = [i + 1 for i, x in enumerate(structure[operation.idx - 1]) if x == 1]
    parent_operations = [operation_dict[i] for i in parent_indices]

    if(operation.operator) == "Scan csv":
        reformat_scancsv(operation)
    elif(operation.operator) == "Scan parquet":
        reformat_scanparquet(operation)
    elif(operation.operator) == "Filter":
        reformat_filter(operation, parent_operations)
    elif(operation.operator) == "Project":
        reformat_project(operation, parent_operations)
    elif(operation.operator) == "Exchange":
        reformat_exchange(operation, parent_operations)
    elif(operation.operator) == "ReusedExchange":
        reformat_reusedexchange(operation, parent_operations)
    elif(operation.operator) == "HashAggregate":
        reformat_hashagg(operation, parent_operations)
    elif(operation.operator) == "SortAggregate":
        reformat_sortagg(operation, parent_operations)
    elif(operation.operator) == "Aggregate":
        reformat_agg(operation, parent_operations)
    elif(operation.operator) == "Sort":
        reformat_sort(operation, parent_operations)
    elif(operation.operator) == "SortMergeJoin":
        reformat_smjoin(operation, parent_operations)
    elif(operation.operator) == "BroadcastExchange":
        reformat_bcexchange(operation, parent_operations)
    elif(operation.operator) == "BroadcastHashJoin":
        reformat_bchjoin(operation, parent_operations)
    elif(operation.operator) == "ShuffledHashJoin":
        reformat_schjoin(operation, parent_operations)
    elif(operation.operator) == "LogicalRelation":
        reformat_logicalrelation(operation, parent_operations)
    elif(operation.operator) == "Join":
        reformat_join(operation, parent_operations)
    elif(operation.operator) == "CartesianProduct":
        reformat_cartesianproduct(operation, parent_operations)       
    else:
        print("Unseen operation: {}".format(operation.operator))
        print(operation)
        

details =deepcopy(details_copy)
for i, o in enumerate(details):
    operation_dict = {}
    for d in o:
        operation_dict[d.idx] = d # cannot combine the two, otherwise cannot find parents sometimes
    for d in o:
        reformat(d, structures[i])

In [None]:
# test
print(len(details))
for d in details:
    for i in d:
        if i.auxi.get('Columns'):
            c = i.auxi['Columns']
            for j in c:
                if 'L' in j:
                    print(j)

## Word2Vec for predicate keywords

In [None]:
model_dir = "condition_word2vec_imdb.model"

from os.path import exists

# build a word2vec model for keywords in a predicate
def gen_condition_sentences(queries):
    trimmed_conditionss  = []
    for q in queries:
        conditions = [i.auxi.get("Condition") for i in q]
        conditions = [' '.join(i).replace("AND","").replace("OR", "").replace("(", " ").replace(")", " ") for i in conditions if i!=None]
        conditions = [re.sub(' +', ' ', i).split(" ") for i in conditions]
        for c in conditions:
            for i in c:
                if(i == ""): c.remove(i)
        trimmed_conditionss +=  conditions
    return trimmed_conditionss

sentences = gen_condition_sentences(details[:int(0.8* len(details))])
print(len(sentences))
print(len(details))
# print([i for i in details])
print(sentences[0])

from gensim.test.utils import common_texts
from gensim.models import Word2Vec

model = Word2Vec(sentences=sentences, vector_size=20, window=5, min_count=10, workers=4)
model.save(model_dir)



In [None]:
from gensim.models import FastText
ft = FastText(sentences, min_count=1, vector_size=condition_dim)

from fse import Average, IndexedList
from fse.models import uSIF
sif_model = uSIF(ft)
sif_model.train(IndexedList(sentences))

sif_model.save("usif_imdb")

## One-hot to encode operators, tables, and columns

In [None]:
# index the keywords

def onehot(l):
    length = len(l)
    arr = np.zeros([length, length])
    for i in range(length):
        arr[i][i] = 1
    d = {}
    for idx, i in enumerate(l):
        d[i] = arr[idx]
    return d
    
tables = []
columns = []
operations = []
predicates = []
for o in details:
    for operation in o:
        operations += [operation.operator]
        if (operation.operator) == "Scan csv" or (operation.operator) == "Scan parquet":
            columns = columns + operation.auxi["Columns"]
            tables = tables + operation.auxi["Table"]
        else:
            columns = columns + operation.auxi["Columns"]

columns = (list(set(columns)))
tables = (list(set(tables)))
operations = (list(set(operations)))

print(len(columns))
print(len(tables)) 
print(len(operations))

print(columns)

column_onehot = onehot(columns)
table_onehot = onehot(tables)
operation_onehot = onehot(operations)
table_onehot

In [None]:
# save the results 
import pickle
with open('column_onehot_imdb.pkl', 'wb') as f:
    pickle.dump(column_onehot, f)
with open('table_onehot_imdb.pkl', 'wb') as f:
    pickle.dump(table_onehot, f)
with open('operation_onehot_imdb.pkl', 'wb') as f:
    pickle.dump(operation_onehot, f)

print(len(pickle.load(open('table_onehot_imdb.pkl', 'rb'))))
print(len(pickle.load(open('operation_onehot_imdb.pkl', 'rb'))))
print(len(column_onehot))

In [None]:
normalizes_column, normalizes_table = parse_table_metadata("imdb-metadata.json")

print(len(normalizes_table))
print(len(normalizes_column))

## Encode the entire operator

In [None]:
import pickle
column_onehot = pickle.load(open('column_onehot_imdb.pkl', 'rb'))
table_onehot = pickle.load(open('table_onehot_imdb.pkl', 'rb'))
operation_onehot = pickle.load(open('operation_onehot_imdb.pkl', 'rb'))
condition_model = Word2Vec.load('condition_word2vec_imdb.model')
normalizes_column, normalizes_table = parse_table_metadata("imdb-metadata.json")
# encode the key information of each operation
def encode_operation(operation, operation_onehot, table_onehot, column_onehot, condition_model, normalizes_column, normalizes_table):
    auxi = operation.auxi
    if (auxi.get('Table')):
        table_v = np.concatenate((table_onehot[auxi["Table"][0]], normalizes_table[auxi["Table"][0]]))
    else:
        table_v = np.zeros(len(table_onehot)+len(list(normalizes_table.values())[0]))
    normalizes_column_size = len(list(normalizes_column.values())[0])
    try:
        column_v = [np.concatenate((column_onehot[i], normalizes_column.get(i, np.zeros(normalizes_column_size)))) for i in auxi["Columns"]]
        column_v = np.concatenate(column_v, axis=0)
        operator_v = operation_onehot[operation.operator]
    except:
        column_v = np.zeros(list(column_onehot.values())[0].size)
        operator_v = np.zeros(list(operation_onehot.values())[0].size)
        print("Dealing with unseen/problematic operator: {}".format(operation))

    if(auxi.get("Condition")):
        condition = auxi["Condition"]
        condition_v = sif_model.infer([(condition, 0)])

    else:
        condition_v = np.zeros(condition_dim)
    return operator_v, table_v, column_v, condition_v

operator_v, table_v, column_v, condition_v = encode_operation(details[0][0], operation_onehot, table_onehot, column_onehot, condition_model, normalizes_column, normalizes_table)
print(operator_v.shape)
print(table_v.shape)
print(column_v.shape)
print(condition_v.shape)

In [None]:
# details to vectors
encoded = []
max_seq_len = max([i.shape[0] for i in structures])
print(max_seq_len)
for query, structure in zip(details, structures):
    q = []
    for i, operation in enumerate(query):
        operator_v, table_v, column_v, condition_v = encode_operation(operation, operation_onehot, table_onehot, column_onehot, condition_model,normalizes_column, normalizes_table)
        connectivity = structure[i]
        connectivity = np.pad(connectivity, (0,max_seq_len - len(connectivity)), "constant", constant_values = (0))
        vs = {"operator": operator_v, "column": column_v, "condition": condition_v, "table": table_v, "structure": connectivity}
        q.append(vs)
    encoded.append(q)


In [None]:
# 0 padding for each operator
column_len = []
for i in encoded:
    for j in i:
        column_len.append(j["column"].shape)
# max_column_len = max(column_len)[0]
max_column_len=100
print(max_column_len)
vs = []
for i, plan_id, query_id, t, r, g in zip(encoded, plan_ids, query_ids, times, resources, group_idxs):
    v = []
    for j in i:
        j["column"] = np.pad(j["column"], (0,max_column_len - len(j["column"])), "constant", constant_values = (0))
        # print(j["condition"])
        v.append(np.concatenate((j["operator"], j["table"], j["column"], j["condition"], j["structure"]), axis=None))
        # print(j["operator"].shape)
        # print(j["table"].shape)
        # print(j["column"].shape)
        # print(j["condition"].shape)
        # print(j["structure"].shape)
    v = np.vstack(v)
    print(v.shape)
    vs.append((plan_id, query_id, v, t, r, g))

In [None]:
len(j["table"])

## save the parsed plans

In [None]:
# import torch
# d = {}
# vs2 = []
# for plan_id, query_id, v, t in vs:
#     v = torch.tensor(v, dtype=torch.float32)
#     d[(int(query_id), int(plan_id))] = (v, float(t))
#     vs2.append(v)
# vs2 = np.array(vs2)
# np.save("encoded_plans_job_2.npy", vs2)
# with open('encoded.pkl_2', 'wb') as f:
#     pickle.dump(d, f)

#     # d: {(qId, pId) -> (v, t)}

In [None]:
import torch
d = {}
vs2 = []
for i, (plan_id, query_id, v, t, r, g) in enumerate(vs):
    v = torch.tensor(v, dtype=torch.float32)
    d[(query_id, plan_id, g, i)] = (v, float(t), r)
    vs2.append(v)
vs2 = np.array(vs2)
np.save("encoded_plans_imdb.npy", vs2)
with open('encoded_imdb.pkl', 'wb') as f:
    pickle.dump(d, f)

    # d: {(qId, pId, group) -> (v, t, resources)}