# Helpers

In [None]:
import time

time_end, time_start = None, None

def clock_reset():
    global time_end, time_start
    time_end, time_start = None, time.perf_counter()

def clock(label):
    global time_end, time_start
    assert time_start is not None
    time_end = time.perf_counter()
    print("\r{}: {:.2f} s".format(label, time_end - time_start))
    time_start = time_end

In [None]:
def describe_df(df):
    print(f"Shape: {df.shape}, Memory: {df.memory_usage().sum() // 1024 // 1024} MB, Columns: {df.columns}")

# MySQL to PostgreSQL

In [None]:
# Convert MySQL trace to PostgreSQL trace.
import csv
import pandas as pd
import re
import sys

from pathlib import Path


PG_COLS = [
    "log_time",
    "user_name",
    "database_name",
    "process_id",
    "connection_from",
    "session_id",
    "session_line_num",
    "command_tag",
    "session_start_time",
    "virtual_transaction_id",
    "transaction_id",
    "error_severity",
    "sql_state_code",
    "message",
    "detail",
    "hint",
    "internal_query",
    "internal_query_pos",
    "context",
    "query",
    "query_pos",
    "location",
    "application_name",
    "backend_type",
]

def convert_mysql_log_to_mysql_df(log_file):
    # Regexes for recognizing various MySQL query log constructs.
    header_regex = re.compile(r'[\s\S]*Time\s+Id\s+Command\s+Argument')
    date_id_regex = re.compile(r'^(\d+.*)Z(\d+)')
    full_regex = re.compile(r'(\d+.*)Z(\d+) (Connect|Init DB|Query|Quit|Statistics)\t([\s\S]*)')
    
    # Extract the rows from the query log.
    rows = []
    seen = set()
    with open(log_file, 'r', encoding='latin-1') as f:
        buffer = []
        # Iterate over each line in the query log as delimited by \n.
        for line in f:
            # First, remove any trailing \n's.
            line = line.rstrip('\n')
            
            # If there is no date, this is _probably_ part of the previous line.
            if date_id_regex.match(line) is None:
                # Continuation of previous line.
                buffer.append(line)
                continue

            # Otherwise, finish the current line and initialize the next.
            joined_buf = ' '.join(buffer)
            buffer = [line]

            # PostgreSQL vs MySQL things.
            joined_buf = joined_buf.replace("\'", "'")
            
            # Parse the current line into a row.
            match = full_regex.match(joined_buf)
            if match is None:
                assert header_regex.match(joined_buf) is not None, f"Bad line: {joined_buf}"
            else:
                rows.append(match.groups())

    # Construct a dataframe out of the rows.
    raw_df = pd.DataFrame(rows, columns=['Time', 'Id', 'Command', 'Argument'])
    command_set = set(raw_df['Command'])
    known_command_set = set('Connect|Init DB|Query|Quit|Statistics'.split('|'))
    assert command_set.issubset(known_command_set), f"Bad command set: {command_set}"
    return raw_df

def convert_mysql_df_to_postgresql_df(mysql_df):
    dfs = []
    gb = mysql_df.sort_values(['Time', 'Id']).groupby('Id')
    for group, group_vals in gb:
        thread_id, df = group, group_vals
        # TODO(WAN): Right now, we assume autocommit=1. But maybe we can parse this out.
        df['session_id'] = thread_id
        df['session_line_num'] = range(df.shape[0])
        df['virtual_transaction_id'] = [f'AAC/{thread_id}/{n}' for n in range(df.shape[0])]
        df.drop(columns=['Id'], inplace=True)
        # TODO(WAN): This is kind of an abuse of PostgreSQL portal names.
        df['message'] = 'execute ' + df['Command'] + ': ' + df['Argument']
        df.drop(columns=['Command', 'Argument'], inplace=True)
        df.rename(columns={'Time': 'log_time'}, inplace=True)
        dfs.append(df)

    big_df = pd.concat(dfs).sort_values(['log_time'])
    reindexed = big_df.reindex(columns=PG_COLS, fill_value='')
    return reindexed

log_file = Path('/home/kapi/admissions/magneto.log.2016-09-04')
csv_file = Path(f'postgresql_{log_file.name}.csv')

clock_reset()
mysql_df = convert_mysql_log_to_mysql_df(log_file)
clock("Convert MySQL log to MySQL df")
describe_df(mysql_df)

clock_reset()
postgresql_df = convert_mysql_df_to_postgresql_df(mysql_df)
clock("Convert MySQL df to PostgreSQL df")
describe_df(postgresql_df)
del mysql_df

clock_reset()
postgresql_df.to_csv(csv_file, index=False, header=False, quoting=csv.QUOTE_ALL)
clock("Write PostgreSQL df to CSV")
del postgresql_df

# Invoke preprocessor

In [None]:
with open(csv_file, 'r') as f:
    ctr = 1
    lines = []
    for i, line in enumerate(f, 1):
        lines.append(line)
        if i % 100000 == 0:
            with open(f"{csv_file.stem}_{ctr}.csv", "w") as outfile:
                outfile.writelines(lines)
                lines = []
            ctr += 1
    if len(lines) > 0:
        with open(f"{csv_file.stem}_{ctr}.csv", "w") as outfile:
                outfile.writelines(lines)
                lines = []

In [None]:
%%script env csv_file="postgresql_magneto.log.2016-09-04_1.csv" bash

set -eux
rm -rf ./tmp/
mkdir -p ./tmp/
cp "$csv_file" ./tmp/
python3 ./forecast/preprocessor.py --query-log-folder ./tmp --output-parquet out.parquet.gzip --output-timestamp out.timestamp.txt --output-query-templates out.templates.txt > preprocessor.log

# Load preprocessor back in

In [None]:
from forecast.preprocessor import Preprocessor

import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import warnings

In [None]:
def load_preprocessed(parquet):
    preprocessor = Preprocessor(parquet_path=parquet)
    df = preprocessor.get_dataframe()
    empties = df['query_template'] == ''
    print(f"Removing {sum(empties)} empty query template values.")
    df = df[:][~empties]
    df.shape
    return df

parquet = r'./out.parquet.gzip' # admissions
# parquet = r'./preprocessed.parquet.gzip' # tpcc
df = load_preprocessed(parquet)
describe_df(df)
display(df)
print(f"{len(set(df['query_template']))} unique query templates.")

In [None]:
from sklearn.preprocessing import LabelEncoder
from pomegranate import DiscreteDistribution, ConditionalProbabilityTable, MarkovChain

from multiprocessing import Pool, Manager, cpu_count

mc_vtxid = False

def build_label_encoder(df):
    clock_reset()
    le = LabelEncoder()
    target = df['query_template'].values
    if not mc_vtxid:
        target = np.concatenate([['SESSION_BEGIN', 'SESSION_END'], target])
    le.fit(target)
    df['query_template_enc'] = le.transform(df['query_template'])
    clock('LabelEncoder')
    return le

def grouper(item):
    group_id, group = item
    group = group.sort_values(['log_time', 'session_line_num'])
    group_vals = group['query_template_enc'].values
    if not mc_vtxid:
        group_vals = np.concatenate([[le.transform(['SESSION_BEGIN'])[0]], group_vals, [le.transform(['SESSION_END'])[0]]])
    return group_id, group_vals

def build_markov_chain(df, le):
    clock_reset()
    
    if mc_vtxid:
        groups = df.groupby('virtual_transaction_id')
    else:
        groups = df.groupby('session_id')
    clock(f'Grouping ({len(groups)} groups)')
    
    ret_list = None
    with Pool(cpu_count()) as pool:
        ret_list = pool.map(grouper, groups)
    data = {k: v for k, v in ret_list}
    print(len(data), " unique transactions")
    clock('Grouping and building up data')

    trajs = set()
    for values in data.values():
        trajs.add(','.join(str(v) for v in values.tolist()))
    print(len(trajs), " unique trajectories")
    clock('Computing trajectories (optional)')

    n = len(le.classes_)
    startdist = {i : 0 for i in range(n)}
    if mc_vtxid:
        if 'BEGIN' in le.classes_:
            startdist[le.transform(['BEGIN'])[0]] = 1.0
    dist = DiscreteDistribution(startdist)

    cptt = [[i, j, 1 if i == j else 0] for i in range(n) for j in range(n)]
    cpt = ConditionalProbabilityTable(cptt, [dist])

    mc = MarkovChain([dist, cpt])
    clock("Building Markov Chain")
    
    samples = list(data.values())
    mc = mc.from_samples(samples)

    cpt = mc.distributions[1]
    cptd = cpt.to_dict()
    dada = pd.DataFrame(cptd['table'], columns=['src','dst','val'])
    accs = []
    for i in range(n):
        dasub = dada[dada['src'] == str(i)]
        if dasub['val'].nunique() == 1:
            accs.append(str(i))

    dada.loc[dada['src'].isin(accs), 'val'] = 0
    for i in accs:
        dada.loc[(dada['src'] == str(i)) & (dada['dst'] == str(i)), 'val'] = 1
    dadal = dada.to_dict(orient='list')

    sdv = [[s,d,v] for s,d,v in zip(dadal['src'], dadal['dst'], dadal['val'])]
    cptd['table'] = sdv
    mc.distributions[1] = cpt.from_dict(cptd)

    print('Had to fix distribution for ')
    print(le.inverse_transform([int(x) for x in accs]))
    clock("Fixing Markov Chain")
    
    return mc

le = build_label_encoder(df)
mc = build_markov_chain(df, le)

In [None]:
%matplotlib inline
from graphviz import Digraph, escape


def printer(string, every=24):
    return '\n'.join(string[i:i+every] for i in range(0, len(string), every))

def draw_graph(table, label_encoder):
    f = Digraph('graph', filename='graph', format='pdf')
    
    letters = np.unique(table[:, 0])

    def inv(s):
        if True:
            return s
        return printer(le.inverse_transform([int(s)])[0])
    
    for state in letters:
        f.node(inv(state))

    for row in table:
        if float(row[2]) > 0.00001:
            src = inv(row[0])
            dst = inv(row[1])
            
            if mc_vtxid:
                if 'COMMIT' in le.classes_:
                    if int(row[0]) in le.transform(['COMMIT']):
                        assert row[0] == row[1] or float(row[2]) <= 0.001, row
                        continue
                if 'ROLLBACK' in le.classes_:
                    if int(row[0]) in le.transform(['ROLLBACK']):
                        assert row[0] == row[1] or float(row[2]) <= 0.001, row
                        continue
                if 'BEGIN' in le.classes_:
                    if int(row[1]) in le.transform(['BEGIN']):
                        assert row[0] == row[1] or float(row[2]) <= 0.001, row
                        continue
            f.edge(src, dst, label=f'{float(row[2]):.2f}', penwidth=f'{2.5*float(row[2])}')

    return f


import json
table = np.array(json.loads(mc.distributions[1].to_json())['table'])
g = draw_graph(table, le)
g.render(directory='./')

In [None]:
le.inverse_transform([139, 6, 14, 47, 55, 167, 116, 48, 172, 156, 140])