In [1]:
import glob
import hashlib
import re

import pandas as pd
import sqlparse

In [2]:
pgfiles = glob.glob('data/extracted/simple/postgresql*.csv')
display(pgfiles)

['data/extracted/simple/postgresql-2021-12-06_160118.csv',
 'data/extracted/simple/postgresql-2021-12-06_160210.csv',
 'data/extracted/simple/postgresql-2021-12-06_160202.csv',
 'data/extracted/simple/postgresql-2021-12-06_160154.csv',
 'data/extracted/simple/postgresql-2021-12-06_160132.csv',
 'data/extracted/simple/postgresql-2021-12-06_160149.csv',
 'data/extracted/simple/postgresql-2021-12-06_160207.csv',
 'data/extracted/simple/postgresql-2021-12-06_160205.csv',
 'data/extracted/simple/postgresql-2021-12-06_160121.csv',
 'data/extracted/simple/postgresql-2021-12-06_160048.csv',
 'data/extracted/simple/postgresql-2021-12-06_160138.csv',
 'data/extracted/simple/postgresql-2021-12-06_160127.csv',
 'data/extracted/simple/postgresql-2021-12-06_160146.csv',
 'data/extracted/simple/postgresql-2021-12-06_160135.csv',
 'data/extracted/simple/postgresql-2021-12-06_160129.csv',
 'data/extracted/simple/postgresql-2021-12-06_160124.csv',
 'data/extracted/simple/postgresql-2021-12-06_160151.csv

In [5]:
# https://www.postgresql.org/docs/13/runtime-config-logging.html#RUNTIME-CONFIG-LOGGING-CSVLOG
PG_LOG_COLUMNS = [
    'log_time',
    'user_name',
    'database_name',
    'process_id',
    'connection_from',
    'session_id',
    'session_line_num',
    'command_tag',
    'session_start_time',
    'virtual_transaction_id',
    'transaction_id',
    'error_severity',
    'sql_state_code',
    'message',
    'detail',
    'hint',
    'internal_query',
    'internal_query_pos',
    'context',
    'query',
    'query_pos',
    'location',
    'application_name',
    'backend_type',
]


df = pd.concat(
    pd.read_csv(pgfile,
                names=PG_LOG_COLUMNS,
                parse_dates=['log_time', 'session_start_time'],
                usecols=['log_time', 'session_start_time', 'command_tag', 'message'],
                header=None,
                index_col=False)
    for pgfile in pgfiles
)
print(df.shape)
print(df.columns)
print(set(df['command_tag']))

(603434, 4)
Index(['log_time', 'command_tag', 'session_start_time', 'message'], dtype='object')
{nan, 'SELECT', 'DELETE', 'SHOW', 'SET', 'INSERT', 'BEGIN', 'COMMIT', 'ROLLBACK', 'UPDATE'}


## Extracting the relevant queries.

In [6]:
commands = ['SELECT', 'INSERT', 'UPDATE', 'DELETE']

def extract_query(message):
    for command in commands:
        idx = message.find(command)
        if idx != -1:
            query = message[idx:]
            return query
    return ''

df['query'] = df['message'].apply(extract_query)
df['query']

0        UPDATE stock   SET S_QUANTITY = 38 ,        S_...
1        UPDATE stock   SET S_QUANTITY = 76 ,        S_...
2        UPDATE stock   SET S_QUANTITY = 43 ,        S_...
3        UPDATE stock   SET S_QUANTITY = 73 ,        S_...
4        UPDATE stock   SET S_QUANTITY = 89 ,        S_...
                               ...                        
28203    INSERT INTO order_line (OL_O_ID, OL_D_ID, OL_W...
28204    INSERT INTO order_line (OL_O_ID, OL_D_ID, OL_W...
28205    UPDATE stock   SET S_QUANTITY = 89 ,        S_...
28206    UPDATE stock   SET S_QUANTITY = 43 ,        S_...
28207    UPDATE stock   SET S_QUANTITY = 47 ,        S_...
Name: query, Length: 603434, dtype: object

## Anonymizer: salt and hash non-date non-digit strings.

In [7]:
ANONYMIZE = False

SALT = 'andycannotsay.com'.encode('utf-8')
DATE_REGEX = re.compile(r'\d{4}-\d{2}-\d{2}.*')
DIGITS_REGEX = re.compile(r'\d+\.?\d*')

def anonymize(sql):
    cleaned_tokens = []

    # TODO(WAN): sqlparse.parse is actually quite slow.
    # Do we really need this?
    parsed = sqlparse.parse(sql)
    if len(parsed) == 0:
        return ''
    
    assert len(parsed) == 1
    tokens = parsed[0].flatten()
    for token in tokens:
        token = str(token)

        single_quoted = token.startswith("'") and token.endswith("'")
        double_quoted = token.startswith('"') and token.endswith('"')
        not_quoted = not single_quoted and not double_quoted

        is_date = DATE_REGEX.search(token) is not None
        is_digits = DIGITS_REGEX.search(token) is not None

        if not_quoted or is_date or is_digits:
            cleaned_tokens.append(token)
            continue

        sha = hashlib.sha256(SALT + token.encode('utf-8')).hexdigest()
        clean_token = "'{}\\{}'".format(len(token) - 2, sha)
        cleaned_tokens.append(clean_token)

    return ''.join(cleaned_tokens)

if ANONYMIZE:
    df['query_anon'] = df['query'].apply(anonymize)
    df['query_anon']

## Pre-processor: extracting query templates.

In [8]:
STRING_REGEX = r'([^\\])\'((\')|(.*?([^\\])\'))'
DOUBLE_QUOTE_STRING_REGEX = r'([^\\])"((")|(.*?([^\\])"))'
INT_REGEX = r'([^a-zA-Z])-?\d+(\.\d+)?'
HASH_REGEX = r'(\'\d+\\.*?\')'

def extract_template(query):
    template = query
    template = re.sub(HASH_REGEX, r"@@@", template)
    template = re.sub(STRING_REGEX, r"\1&&&", template)
    template = re.sub(DOUBLE_QUOTE_STRING_REGEX, r"\1&&&", template)
    template = re.sub(INT_REGEX, r"\1#", template)
    return template

query_column = 'query_anon' if ANONYMIZE else 'query'

df['query_template'] = df[query_column].apply(extract_template)
df['query_template']

0        UPDATE stock   SET S_QUANTITY = # ,        S_Y...
1        UPDATE stock   SET S_QUANTITY = # ,        S_Y...
2        UPDATE stock   SET S_QUANTITY = # ,        S_Y...
3        UPDATE stock   SET S_QUANTITY = # ,        S_Y...
4        UPDATE stock   SET S_QUANTITY = # ,        S_Y...
                               ...                        
28203    INSERT INTO order_line (OL_O_ID, OL_D_ID, OL_W...
28204    INSERT INTO order_line (OL_O_ID, OL_D_ID, OL_W...
28205    UPDATE stock   SET S_QUANTITY = # ,        S_Y...
28206    UPDATE stock   SET S_QUANTITY = # ,        S_Y...
28207    UPDATE stock   SET S_QUANTITY = # ,        S_Y...
Name: query_template, Length: 603434, dtype: object

In [15]:
df['log_time_s'] = df['log_time'].round('S')
df['log_time_s']

0       2021-12-06 16:01:18-05:00
1       2021-12-06 16:01:18-05:00
2       2021-12-06 16:01:18-05:00
3       2021-12-06 16:01:18-05:00
4       2021-12-06 16:01:18-05:00
                   ...           
28203   2021-12-06 16:01:18-05:00
28204   2021-12-06 16:01:18-05:00
28205   2021-12-06 16:01:18-05:00
28206   2021-12-06 16:01:18-05:00
28207   2021-12-06 16:01:18-05:00
Name: log_time_s, Length: 603434, dtype: datetime64[ns, pytz.FixedOffset(-300)]

In [16]:
gb = df.groupby(['query_template', 'log_time_s']).size()
grouped_df = pd.DataFrame(gb, columns=['count'])
grouped_df.drop('', axis=0, level=0, inplace=True)
grouped_df

Unnamed: 0_level_0,Unnamed: 1_level_0,count
query_template,log_time_s,Unnamed: 2_level_1
DELETE FROM new_order WHERE NO_O_ID = # AND NO_D_ID = # AND NO_W_ID = #,2021-12-06 16:01:12-05:00,110
DELETE FROM new_order WHERE NO_O_ID = # AND NO_D_ID = # AND NO_W_ID = #,2021-12-06 16:01:13-05:00,158
DELETE FROM new_order WHERE NO_O_ID = # AND NO_D_ID = # AND NO_W_ID = #,2021-12-06 16:01:14-05:00,75
DELETE FROM new_order WHERE NO_O_ID = # AND NO_D_ID = # AND NO_W_ID = #,2021-12-06 16:01:15-05:00,137
DELETE FROM new_order WHERE NO_O_ID = # AND NO_D_ID = # AND NO_W_ID = #,2021-12-06 16:01:16-05:00,90
...,...,...
UPDATE warehouse SET W_YTD = W_YTD + # WHERE W_ID = #,2021-12-06 16:02:08-05:00,159
UPDATE warehouse SET W_YTD = W_YTD + # WHERE W_ID = #,2021-12-06 16:02:09-05:00,174
UPDATE warehouse SET W_YTD = W_YTD + # WHERE W_ID = #,2021-12-06 16:02:10-05:00,161
UPDATE warehouse SET W_YTD = W_YTD + # WHERE W_ID = #,2021-12-06 16:02:11-05:00,169


## Clusterer

In [None]:
# TODO(WAN): Port online_clustering.py.
# TODO(WAN): I would be somewhat surprised if sklearn doesn't have this built in... We'll see

In [21]:
min_date = grouped_df.index.get_level_values(1).min()
max_date = grouped_df.index.get_level_values(1).max()

rho = 0.8
cluster_gap = 1440
n = (max_date - min_date).seconds // 60 + (max_date - min_date).days * 1440 + 1
num_gaps = n // cluster_gap

centers, cluster_totals, cluster_sizes = {}, {}, {}

assignments = [(min_date, {template: -1 for template in grouped_df.index.get_level_values(0)})]

current_date = min_date
next_cluster = 0

n_sample = 10000



# AdjustCluster(month_min_date, current_date, next_date, data, assignments[-1][1],
#                next_cluster, centers, cluster_totals, total_queries, cluster_sizes, rho)

# def AdjustCluster(min_date, current_date, next_date, data, last_ass,
#        next_cluster, centers, cluster_totals, total_queries, cluster_sizes, rho):

2

## Forecaster

In [None]:
# TODO(WAN): Port exp_multi_online_continuous.py