In [None]:
def read_queries_from_file(file_path):
    queries = []
    current_query = []

    with open(file_path, 'r') as file:
        for line in file:
            line = line.strip()
            
            if line.startswith('--') or not line:
                continue
            
            current_query.append(line)

            if line.endswith(';'):
                queries.append(' '.join(current_query))
                current_query = []  # Reset for the next query

    return queries


queries = read_queries_from_file('tpch-kit/dbgen/tpch-stream.sql')
print(f"Total Queries Read: {len(queries)}")
print(queries)


In [None]:
import psycopg2
import pandas as pd
import json

def get_query_plan(query):
    """Function to get the query execution plan for a given SQL query"""
    try:
        conn = psycopg2.connect(
            database="tpch",
            user="ruiqiwan",
            password="admin",
            host="127.0.0.1",
            port="5432"
        )
        cur = conn.cursor()
        cur.execute(f"EXPLAIN (ANALYZE, FORMAT JSON) {query}")
        plan = cur.fetchone()[0]  # fetchone() returns a tuple
        return plan[0]
    except Exception as e:
        print(f"Error getting plan for query: {query}\n{e}")
        return None
    finally:
        cur.close()
        conn.close()

query_plans = []
for i, query in enumerate(queries):
    plan = get_query_plan(query)
    if plan:
        query_plans.append({"id": i, "json": json.dumps(plan)})
        # query_plans.append({"id": i, "json": json.dumps(plan), "sql": query})


full_train_df = pd.DataFrame(query_plans)

full_train_df.to_csv('query_plans.csv', index=False)


In [None]:
print(full_train_df.shape)

print(full_train_df.head())


In [None]:
import json

# Example of loading JSON to ensure it's correctly formatted
for i, row in full_train_df.iterrows():
    try:
        json_obj = json.loads(row['json'])
        # Optionally, re-serialize the object for standard formatting
        full_train_df.at[i, 'json'] = json.dumps(json_obj)
    except json.JSONDecodeError as e:
        print(f"Error decoding JSON for id {row['id']}: {e}")
        # Optionally, drop the row if the JSON is not valid
        full_train_df.drop(i, inplace=True)


In [None]:
db_alias = {'nation' : 'n',
            'region' : 'r',
            'supplier' : 's',
            'part' : 'p',
            'partsupp' : 'ps',
            'customer' : 'c',
            'orders' : 'o',
            'lineitem' : 'l'}

alias_to_db = {v: k for k, v in db_alias.items()}

column_mapping = {
    "nation": {
        0: "n_nationkey",
        1: "n_name",
        2: "n_regionkey",
        3: "n_comment"
    },
    "region": {
        0: "r_regionkey",
        1: "r_name",
        2: "r_comment"
    },
    "part": {
        0: "p_partkey",
        1: "p_name",
        2: "p_mfgr",
        3: "p_brand",
        4: "p_type",
        5: "p_size",
        6: "p_container",
        7: "p_retailprice",
        8: "p_comment"
    },
    "supplier": {
        0: "s_suppkey",
        1: "s_name",
        2: "s_address",
        3: "s_nationkey",
        4: "s_phone",
        5: "s_acctbal",
        6: "s_comment"
    },
    "partsupp": {
        0: "ps_partkey",
        1: "ps_suppkey",
        2: "ps_availqty",
        3: "ps_supplycost",
        4: "ps_comment"
    },
    "customer": {
        0: "c_custkey",
        1: "c_name",
        2: "c_address",
        3: "c_nationkey",
        4: "c_phone",
        5: "c_acctbal",
        6: "c_mktsegment",
        7: "c_comment"
    },
    "orders": {
        0: "o_orderkey",
        1: "o_custkey",
        2: "o_orderstatus",
        3: "o_totalprice",
        4: "o_orderdate",
        5: "o_orderpriority",
        6: "o_clerk",
        7: "o_shippriority",
        8: "o_comment"
    },
    "lineitem": {
        0: "l_orderkey",
        1: "l_partkey",
        2: "l_suppkey",
        3: "l_linenumber",
        4: "l_quantity",
        5: "l_extendedprice",
        6: "l_discount",
        7: "l_tax",
        8: "l_returnflag",
        9: "l_linestatus",
        10: "l_shipdate",
        11: "l_commitdate",
        12: "l_receiptdate",
        13: "l_shipinstruct",
        14: "l_shipmode",
        15: "l_comment"
    }
}



In [None]:
import pandas as pd



column_min_max_vals = {}
col2idx = {}
current_index = 0

# def convert_to_numeric(val):
#     if isinstance(val, str):
#         # Convert string to numeric using a hash-based encoding (you could use other schemes too)
#         numeric_value = int.from_bytes(val.encode('utf-8'), 'big') % int(1e9)
#     elif pd.isnull(val):
#         # Handle NaN values
#         numeric_value = 0
#     else:
#         numeric_value = val
#     return numeric_value

def extract_minmax(file_path):
    global current_index
    
    df = pd.read_table('tpch-kit/dbgen/outputs/' + file_path + ".tbl", delimiter='|', header=None, index_col=False)

    alias = db_alias[file_path]

    for col in df.columns:
        col_name = column_mapping[file_path][col]
        alias = db_alias[file_path]

        # df[col] = df[col].apply(convert_to_numeric)

        min = df[col].min()
        max = df[col].max()
        if isinstance(min, str) or isinstance(max, str):
            min = int.from_bytes(min.encode('utf-8'), 'big')
            max = int.from_bytes(max.encode('utf-8'), 'big')
        column_min_max_vals[alias + '.' + col_name] = [min, max]

        if col_name not in col2idx:
            col2idx[alias + '.' + col_name] = current_index
            current_index += 1

    col2idx['NA'] = current_index
    return column_min_max_vals

extract_minmax('customer')
extract_minmax('lineitem')
extract_minmax('nation')
extract_minmax('orders')
extract_minmax('part')
extract_minmax('partsupp')
extract_minmax('region')
extract_minmax('supplier')



In [None]:
from model.database_util import Encoding

encoding = Encoding(column_min_max_vals, col2idx, op2idx = {'=': 0, '>': 1, '<': 2, '>=': 3, '<=': 4, 'NA' : 5})

In [None]:

for json_string in full_train_df['json']:
    query_plan = json.loads(json_string)['Plan']
    def traverse_plan(plan):
        encoding.encode_type(plan['Node Type'])
        if 'Plans' in plan:
            for subplan in plan['Plans']:
                traverse_plan(subplan)

    traverse_plan(query_plan)



In [None]:

print(encoding.col2idx)
print(encoding.idx2col)
print(encoding.column_min_max_vals)
print(encoding.idx2type)
print(encoding.type2idx)

encoding.idx2col[61]

In [None]:
import torch


torch.save({'encoding': encoding}, 'checkpoints/tpch_encoding.pt')

encoding_ckpt = torch.load('checkpoints/tpch_encoding.pt')
encoding = encoding_ckpt['encoding']



In [None]:
conn = psycopg2.connect(
        database="tpch",
        user="ruiqiwang",
        password="admin",
        host="127.0.0.1",
        port="5432"
    )
cur = conn.cursor()

In [None]:
import numpy as np
import pandas as pd

hist_file = pd.DataFrame(columns=['table', 'column', 'bins', 'table_column', 'freq'])

def to_vals(data_list):
    for dat in data_list:
        val = dat[0]
        if val is not None: break
    try:
        float(val)
        return np.array(data_list, dtype=float).squeeze()
    except (ValueError, TypeError):
        res = []
        for dat in data_list:
            val = dat[0]
            if val is None:
                res.append(0)  
            elif isinstance(val, str):
                hex_value = int.from_bytes(val.encode('utf-8'), 'big')
                normalized_hex = hex_value % int(1e9)  
                res.append(normalized_hex)
            else:
                try:
                    mi = val.timestamp()
                except AttributeError:
                    mi = 0
                res.append(mi)
        
        return np.array(res)

for table, alias in db_alias.items():
    for column in column_mapping[table].values():
        cmd = f'SELECT {column} FROM {table}'
        cur.execute(cmd)
        col = cur.fetchall()
        col_array = to_vals(col)
        
        hists = np.nanpercentile(col_array, range(0, 101, 2), axis=0)
        
        freq = np.histogram(col_array, bins=hists)[0]
        
        freq_hex = freq.astype('float').tobytes().hex()
        
        # Store results in the dataframe
        res_dict = {
            'table': table,
            'column': column,
            'bins': repr((' ').join([str(int(i)) for i in hists])),  
            'freq': freq_hex,
            'table_column': f'{alias}.{column}' 
        }
        hist_file = pd.concat([hist_file, pd.DataFrame([res_dict])], ignore_index=True)



In [None]:
hist_file
hist_file.to_csv('histograms.csv', index=False)

hist_file = pd.read_csv('histograms.csv')
hist_file.head()
len(hist_file['bins'][0])

In [None]:
cmd = 'CREATE EXTENSION tsm_system_rows'
cur.execute(cmd)

tables = list(db_alias.keys())
sample_data = {}
for table in tables:
    cur.execute("Select * FROM {} LIMIT 0".format(table))
    colnames = [desc[0] for desc in cur.description]

    ts = pd.DataFrame(columns = colnames)

    for num in range(1000):
        cmd = 'SELECT * FROM {} TABLESAMPLE SYSTEM_ROWS(1)'.format(table)
        cur.execute(cmd)
        samples = cur.fetchall()
        for i,row in enumerate(samples):
            ts.loc[num]=row
    
    sample_data[table] = ts

In [None]:
from sqlalchemy import create_engine


engine = create_engine('postgresql://ruiqiwang:admin@localhost:5432/tpch_sample')


In [None]:
for k,v in sample_data.items():
    v['sid'] = list(range(1000))
    try:
        cmd = 'ALTER TABLE {} ADD COLUMN IF NOT EXISTS sid INTEGER'.format(k)
        cur.execute(cmd)
        v.to_sql(k,engine,if_exists='append',index=False)
    except Exception as e:
        print(f"Error: {e}")
        # conn.rollback()

In [None]:
def parse_predicate(predicate):
        parsed_conditions = []

        conditions = predicate.split(' AND ')
        for condition in conditions:
            condition = condition.replace('(', '').replace(')', '')
            print(condition)
            # Extract column, operator, value
            for operator in ['>=', '<=', '<>', '=', '>', '<', 'LIKE']:
                if operator in condition:
                    parts = condition.split(operator)
                    if len(parts) == 2:
                        column, value = parts
                        column = column.strip()
                        value = value.strip()
                        prefixed_column = column
                        if len(column.split('_')) > 1:
                            prefix = column.split('_')[0]
                            prefixed_column = prefix + '.' + column
                        parsed_conditions.append(f"{prefixed_column},{operator},{value}")
                    break
        return parsed_conditions

In [None]:
import json
import pandas as pd
import csv

def parse_query_plan(query_plan):
    tables = set()
    joins = []
    predicates = []
    cardinality = None

    def traverse_plan(plan):
        nonlocal cardinality
        node_type = plan.get('Node Type', '')
        
        if node_type in ['Seq Scan', 'Index Scan']:
            relation_name = plan.get('Relation Name', '')
            alias = plan.get('Alias', '')
            if relation_name and alias:
                tables.add(f"{relation_name} {alias}")

        if node_type in ['Nested Loop', 'Hash Join', 'Merge Join']:
            join_condition = plan.get('Join Filter', plan.get('Hash Cond', ''))
            if join_condition:
                joins.append(join_condition)

        if 'Filter' in plan:
            filter_condition = plan['Filter']
            parsed_predicates = parse_predicate(filter_condition)
            predicates.extend(parsed_predicates)
        
        if 'Actual Rows' in plan:
            cardinality = plan['Actual Rows']
        elif 'Plan Rows' in plan and cardinality is None:
            cardinality = plan['Plan Rows']
        

        if 'Plans' in plan:
            for subplan in plan['Plans']:
                traverse_plan(subplan)

    traverse_plan(query_plan)

    # Format the result
    table_str = ','.join(tables)
    join_str = ','.join(joins)
    predicate_str = ','.join(predicates)
    card_str = str(cardinality) if cardinality else 'N/A'

    return f"{table_str}#{join_str}#{predicate_str}#{card_str}"

parsed_results = []

for query_plan in query_plans:
    plan_dict = json.loads(query_plan['json'])
    parsed_result = parse_query_plan(plan_dict['Plan'])
    parsed_results.append(parsed_result)

df = pd.DataFrame(parsed_results, columns=["parsed_plan"])


df.to_csv('tpch.csv', index=False, header=False, quoting=csv.QUOTE_NONE, escapechar=' ')

print("Parsed query plans have been saved to 'parsed_query_plans.csv'")


In [None]:
query_file = pd.read_csv('tpch.csv',sep='#',header=None)
query_file.columns = ['table','join','predicate','card']

query_file.head()

In [None]:
conm = psycopg2.connect(database="tpch_sample", user="ruiqiwang", host="127.0.0.1",password="admin", port="5432")
conm.set_session(autocommit=True)
cur = conm.cursor()

In [None]:
import numpy as np

table_samples = []
for i,row in query_file.iterrows():
    table_sample = {}
    preds = row['predicate'].split(',')
    for i in range(0,len(preds),3):
        left, op, right = preds[i:i+3]
        alias,col = left.strip().split('.')
        pred_string = ''.join((col,op,right))
        q = 'select sid from {} where {}'.format(alias_to_db[alias], pred_string)
        cur.execute(q)
        sps = np.zeros(1000).astype('uint8')
        sids = cur.fetchall()
        sids = np.array(sids).squeeze()
        if sids.size>1:
            sps[sids] = 1
        if alias_to_db[alias] in table_sample:
            table_sample[alias_to_db[alias]] = table_sample[alias_to_db[alias]] & sps
        else:
            table_sample[alias_to_db[alias]] = sps
    table_samples.append(table_sample)
    if pd.isnull(row['join']):
        continue
    joins = row['join'].split(',')
    for join in joins:
        alias1,alias2 = join.split('=')[0].strip().split(' ')[0],join.split('=')[1].strip().split(' ')[0]
        alias1,alias2 = alias1.split('.')[0][1:],alias2.split('.')[0]
        print(alias1,alias2)
        table1,table2 = alias_to_db[alias1],alias_to_db[alias2]
        q = 'select {}.sid from {} {} join {} {} on {}'.format(alias1, table1, alias1, table2, alias2,join)
        cur.execute(q)
        sids = cur.fetchall()
        sids = np.array(sids).squeeze()
        if sids.size>1:
            sps[sids] = 1
        if alias1 in table_sample:
            table_sample[alias1] = table_sample[alias1] & sps
        else:
            table_sample[alias1] = sps
        q = 'select {}.sid from {} {} join {} {} on {}'.format(alias2, table1, alias1, table2, alias2,join)
        cur.execute(q)
        sids = cur.fetchall()
        sids = np.array(sids).squeeze()
        if sids.size>1:
            sps[sids] = 1
        if alias2 in table_sample:
            table_sample[alias2] = table_sample[alias2] & sps
        else:
            table_sample[alias2] = sps

bitmap_csv_file = "tpch.bitmaps"
with open(bitmap_csv_file, 'wb') as f:
    for table_sample in table_samples:
        # Write the number of tables for this query
        num_tables = len(table_sample)
        f.write(num_tables.to_bytes(4, byteorder='little'))
        for alias, bitmap in table_sample.items():
            # Pack the bitmap into bytes
            num_bytes_per_bitmap = (len(bitmap) + 7) // 8
            bitmap_bytes = np.packbits(bitmap[:num_bytes_per_bitmap * 8])
            f.write(bitmap_bytes)

print(f"Bitmap CSV saved as {bitmap_csv_file}")


In [None]:

# cur.close()
# conn.close()