In [None]:

import random
import sqlalchemy
from sqlalchemy import create_engine, inspect, MetaData
from sqlalchemy.engine.reflection import Inspector
from neuralprophet import NeuralProphet

import matplotlib
import matplotlib.pyplot as plt

import pandas as pd
import pmdarima as pm
import numpy as np

import sklearn
from sktime.forecasting.arima import AutoARIMA
from sktime.forecasting.ets import AutoETS
from sktime.forecasting.trend import PolynomialTrendForecaster
from scipy.stats._continuous_distns import _distn_names
import scipy.stats
import warnings

from distfit import distfit
import numpy as np

import queue

In [5]:
scaled_tables = {}
scaled_tables_order = []

In [7]:
def resize_db(engine, scalefactor=2):
    global scaled_tables
    inspector = Inspector.from_engine(engine)
    table_names = inspector.get_table_names()
    
    # All the tables must be processed.
    
    workqueue = queue.Queue()
    for table_name in table_names:
        workqueue.put(table_name)

    while not workqueue.empty():
        table_name = workqueue.get()
        display(table_name)
        
        # Get schema information on primary keys and foreign keys.  
        info_pk = inspector.get_pk_constraint(table_name)
        info_unique = inspector.get_unique_constraints(table_name)
        info_fk = inspector.get_foreign_keys(table_name)
        
        pkey_columns = info_pk['constrained_columns']
        fkey_columns = {
            src_col: (fk['referred_table'], dst_col)
            for fk in info_fk
            for src_col, dst_col in zip(fk['constrained_columns'], fk['referred_columns'])
        }
        unique_columns_list = [ukey['column_names'] for ukey in info_unique]
        if len(unique_columns_list) > 0:
            raise Exception(f"Can't handle unique: {unique_columns_list}")
        
        # If this table has a foreign key constraint on a yet-unscaled table,
        # then this table cannot be processed yet.
        # Move this table to the back of the queue.
        # TODO(WAN): Replace with a topological sort on fkey dependencies.
        skippy = False
        for fkey_src, fkey_dst in fkey_columns.items():
            fkey_table_name = fkey_dst[0]
            if fkey_table_name not in scaled_tables:
                skippy = True
                print(f'Moving {table_name} to end because of {fkey_dst}.')
        if skippy:
            workqueue.put(table_name)
            continue
        
        df = pd.read_sql_table(table_name, engine)

        scaled_data = {}
        scaled_len = int(df.shape[0] * scalefactor)

        to_generate = scaled_len

        for col in df.columns:
            series = df[col]

            is_unique = any(col in unique_columns for unique_columns in unique_columns_list)
            if is_unique:
                is_fkey = col in fkey_columns
                if is_fkey:
                    fkey_table, fkey_col = fkey_columns[col]
                    scaled_data[col] = scaled_tables[fkey_table][fkey_col].sample(n=to_generate, replace=True).reset_index(drop=True)
                else:
                    scaled_data[col] = series.max() + np.arange(1, 1 + to_generate)
                print('unique', col, scaled_data[col].dtype)
                assert len(scaled_data[col]) == to_generate
                continue

            # For columns that are a foreign key, values must be drawn from the referenced table.
            is_fkey = col in fkey_columns
            if is_fkey:
                fkey_table_name, fkey_table_col = fkey_columns[col]
                fkey_values = scaled_tables[fkey_table_name][fkey_table_col]
                sample = fkey_values.sample(n=to_generate, replace=True).reset_index(drop=True)
                scaled_data[col] = sample
                print('fkey', col, scaled_data[col].dtype)
                assert len(scaled_data[col]) == to_generate
                continue

            # For columns that are a primary key and monotonically increasing, build a linear model.
            is_pkey = col in pkey_columns
            is_mono_inc = series.is_monotonic_increasing
            if is_pkey and is_mono_inc:
                # Edge case, e.g., TPC-C warehouse w_id SF1 has one row.
                if series.shape[0] == 1:
                    scaled_data[col] = series.max() + np.arange(1, 1 + to_generate)
                else:
                    ptf = PolynomialTrendForecaster(degree=1)
                    ptf.fit(series)
                    horizon = np.arange(1, 1 + to_generate)
                    predicted_ptf = ptf.predict(horizon)
                    scaled_data[col] = predicted_ptf.reset_index(drop=True)
                print('ptf', col, scaled_data[col].dtype)
                assert len(scaled_data[col]) == to_generate
                continue

            # For columns that are all constant or all NA, keep it that way.
            # For columns that are object-typed, e.g., strings, sample the original values.
            # The code looks identical for both.
            is_constant = series.nunique() == 1
            is_na = all(series.isna())
            is_obj = series.dtype == "object"
            if is_constant or is_na or is_obj:
                sample = series.sample(n=to_generate, replace=True).reset_index(drop=True)
                scaled_data[col] = sample
                print('const', col, scaled_data[col].dtype)
                assert len(scaled_data[col]) == to_generate
                continue

            # Otherwise, we sure hope we have a numeric column...
            # Otherwise, we hope we have a numeric column that we can fit a distribution to.
            # We then draw samples from that distribution.

            dist = distfit()
            dist.fit_transform(series.dropna(), verbose=1)
            generated = pd.Series(dist.generate(to_generate, verbose=1)).astype('Float64')
            na_frac = series.isna().sum() / series.shape[0]
            scaled_data[col] = generated
            indexes = scaled_data[col].sample(frac=na_frac).index
            scaled_data[col].iloc[indexes] = pd.NA
            print('distfit', col, scaled_data[col].dtype)
            assert len(scaled_data[col]) == to_generate

        assert set(scaled_data.keys()) == set(df.columns)
        scaled_df = pd.DataFrame(scaled_data)

        # Fix up datatypes.
        print('Fixing datatypes.')
        for col in inspector.get_columns(table_name):
            col_name = col['name']
            if col['type'].python_type == int:
                scaled_df[col_name] = scaled_df[col_name].round(0).astype('Int64', errors='ignore')

        print('Fixing multicol fkeys.')
        for fk in info_fk:
            # Pick one to be a source of truth.
            fkc = list(zip(fk['constrained_columns'], fk['referred_columns']))
            choice = random.choice(fkc)
            fkc.remove(choice)
            if len(fkc) == 0:
                # Not multicol.
                continue

            src_main_col, dst_main_col = choice
            src_other_cols = [c[0] for c in fkc]
            dst_other_cols = [c[1] for c in fkc]
            dfo = scaled_tables[fk['referred_table']]

            idxs = []
            for val in scaled_df[src_main_col]:
                matches = dfo[dst_main_col] == val
                idx = np.random.choice(matches.index[matches.values].values)
                idxs.append(idx)
            scaled_df[src_other_cols] = dfo.iloc[idxs][dst_other_cols].reset_index(drop=True)

#             print('Pruning uniques.')
#             for unique_columns in unique_columns_list + [pkey_columns]:
#                 raise Exception('gg')
#                 # TODO(WAN): !! this doesn't actually work! violation?
#                 print('predrop', scaled_df.shape)
#                 scaled_df.drop_duplicates(subset=unique_columns, inplace=True, ignore_index=True)
#                 print('postdrop', scaled_df.shape)

        print(f'Result: {scaled_df.shape[0]}')
            
        print('Final fix of datatypes.')
        for col in inspector.get_columns(table_name):
            col_name = col['name']
            if str(col['type']) in ['DATE', 'TIMESTAMP']:
                scaled_df[col_name] = pd.to_datetime(scaled_df[col_name])
            
        scaled_tables[table_name] = scaled_df
        scaled_tables_order.append(table_name)

# Unfortunately, stream_results breaks pandas to_sql.
execution_options = {'stream_results': True}
engine = create_engine('postgresql://benchbaseuser:benchbasepass@localhost/benchbase', execution_options=execution_options)
resize_db(engine, 2)

'region'

ptf r_regionkey float64
const r_name object
const r_comment object
Fixing datatypes.
Fixing multicol fkeys.
Result: 10
Final fix of datatypes.


'nation'

ptf n_nationkey float64
const n_name object
fkey n_regionkey Int64
const n_comment object
Fixing datatypes.
Fixing multicol fkeys.
Result: 50
Final fix of datatypes.


'supplier'

distfit s_suppkey Float64
const s_name object
const s_address object
fkey s_nationkey Int64
const s_phone object
distfit s_acctbal Float64
const s_comment object
Fixing datatypes.
Fixing multicol fkeys.
Result: 2000
Final fix of datatypes.


'part'

distfit p_partkey Float64
const p_name object
const p_mfgr object
const p_brand object
const p_type object
distfit p_size Float64
const p_container object
distfit p_retailprice Float64
const p_comment object
Fixing datatypes.
Fixing multicol fkeys.
Result: 40000
Final fix of datatypes.


'partsupp'

fkey ps_partkey Int64
fkey ps_suppkey Int64
distfit ps_availqty Float64
distfit ps_supplycost Float64
const ps_comment object
Fixing datatypes.
Fixing multicol fkeys.
Result: 160000
Final fix of datatypes.


'orders'

distfit o_orderkey Float64
distfit o_custkey Float64
const o_orderstatus object
distfit o_totalprice Float64
distfit o_orderdate Float64
const o_orderpriority object
const o_clerk object
const o_shippriority int64
const o_comment object
Fixing datatypes.
Fixing multicol fkeys.
Result: 300000
Final fix of datatypes.


'lineitem'

fkey l_orderkey Int64
fkey l_partkey Int64
fkey l_suppkey Int64
distfit l_linenumber Float64
distfit l_quantity Float64
distfit l_extendedprice Float64
distfit l_discount Float64
distfit l_tax Float64
const l_returnflag object
const l_linestatus object
distfit l_shipdate Float64
distfit l_commitdate Float64
distfit l_receiptdate Float64
const l_shipinstruct object
const l_shipmode object
const l_comment object
Fixing datatypes.
Fixing multicol fkeys.
Result: 1201144
Final fix of datatypes.


'warehouse'

ptf w_id int64
const w_ytd float64
const w_tax float64
const w_name object
const w_street_1 object
const w_street_2 object
const w_city object
const w_state object
const w_zip object
Fixing datatypes.
Fixing multicol fkeys.
Result: 2
Final fix of datatypes.


'stock'

fkey s_w_id Int64
fkey s_i_id Int64
distfit s_quantity Float64
const s_ytd float64
const s_order_cnt int64
const s_remote_cnt int64
const s_data object
const s_dist_01 object
const s_dist_02 object
const s_dist_03 object
const s_dist_04 object
const s_dist_05 object
const s_dist_06 object
const s_dist_07 object
const s_dist_08 object
const s_dist_09 object
const s_dist_10 object
Fixing datatypes.
Fixing multicol fkeys.
Result: 200000
Final fix of datatypes.


'item'

distfit i_id Float64
const i_name object
distfit i_price Float64
const i_data object
distfit i_im_id Float64
Fixing datatypes.
Fixing multicol fkeys.
Result: 200000
Final fix of datatypes.


'district'

fkey d_w_id Int64
ptf d_id float64
const d_ytd float64
distfit d_tax Float64
const d_next_o_id int64
const d_name object
const d_street_1 object
const d_street_2 object
const d_city object
const d_state object
const d_zip object
Fixing datatypes.
Fixing multicol fkeys.
Result: 20
Final fix of datatypes.


'history'

fkey h_c_id Int64
fkey h_c_d_id Int64
fkey h_c_w_id Int64
fkey h_d_id Int64
fkey h_w_id Int64
distfit h_date Float64
const h_amount float64
const h_data object
Fixing datatypes.
Fixing multicol fkeys.
Result: 60000
Final fix of datatypes.


'customer'

fkey c_w_id Int64
fkey c_d_id Int64
distfit c_id Float64
distfit c_discount Float64
const c_credit object
const c_last object
const c_first object
const c_credit_lim float64
const c_balance float64
const c_ytd_payment float64
const c_payment_cnt int64
const c_delivery_cnt int64
const c_street_1 object
const c_street_2 object
const c_city object
const c_state object
const c_zip object
const c_phone object
distfit c_since Float64
const c_middle object
const c_data object
Fixing datatypes.
Fixing multicol fkeys.
Result: 60000
Final fix of datatypes.


'oorder'

fkey o_w_id Int64
fkey o_d_id Int64
distfit o_id Float64
fkey o_c_id Int64
distfit o_carrier_id Float64
distfit o_ol_cnt Float64
const o_all_local int64
distfit o_entry_d Float64
Fixing datatypes.
Fixing multicol fkeys.
Result: 60000
Final fix of datatypes.


'new_order'

fkey no_w_id Int64
fkey no_d_id Int64
fkey no_o_id Int64
Fixing datatypes.
Fixing multicol fkeys.
Result: 18000
Final fix of datatypes.


'order_line'

fkey ol_w_id Int64
fkey ol_d_id Int64
fkey ol_o_id Int64
distfit ol_number Float64
fkey ol_i_id Int64
distfit ol_delivery_d Float64
distfit ol_amount Float64
fkey ol_supply_w_id Int64
const ol_quantity float64
const ol_dist_info object
Fixing datatypes.
Fixing multicol fkeys.
Result: 600080
Final fix of datatypes.


In [10]:
# Unfortunately, stream_results breaks pandas to_sql.
execution_options = {'stream_results': True}
execution_options = {}
engine = create_engine('postgresql://benchbaseuser:benchbasepass@localhost/benchbasescaled', execution_options=execution_options)

original_engine = create_engine('postgresql://benchbaseuser:benchbasepass@localhost/benchbase', execution_options=execution_options)
inspector = Inspector.from_engine(original_engine)


metadata = MetaData()
metadata.reflect(bind=engine)

for table in metadata.sorted_tables:
    print(table_name)
    df = scaled_tables[table.name]
    columns = inspector.get_columns(table_name)
    dtype = {col['name']: col['type'] for col in columns}
    df.to_sql(table_name, con=engine, index=False, if_exists='append', dtype=dtype)

item


IntegrityError: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "item_pkey"
DETAIL:  Key (i_id)=(41044) already exists.

[SQL: INSERT INTO item (i_id, i_name, i_price, i_data, i_im_id) VALUES (%(i_id)s, %(i_name)s, %(i_price)s, %(i_data)s, %(i_im_id)s)]
[parameters: ({'i_id': 71873, 'i_name': 'tdjtylgvvwtwgyxhj', 'i_price': 72.91484610436777, 'i_data': 'hdduohbpzmtpqcxxiaycdzefgntboifehfgalndewaznutnin', 'i_im_id': 8609}, {'i_id': 43783, 'i_name': 'xseowiihvcuoyjuncbdzril', 'i_price': 98.10781507092857, 'i_data': 'hdbklonizbqtywyjdxdvrbsbvfuk', 'i_im_id': 8364}, {'i_id': 47077, 'i_name': 'uqwguivweumqsbcntrvpglz', 'i_price': 65.60060731611074, 'i_data': 'jsuktpybndupslilzuyburjazxygajpbhunvloc', 'i_im_id': 749}, {'i_id': 50014, 'i_name': 'npyhkbushoybrzv', 'i_price': 72.86345612233407, 'i_data': 'auyhziljwvayekrxntiwdnazcynexga', 'i_im_id': 2899}, {'i_id': 85504, 'i_name': 'fsbwffneoyzyqriemtlgerz', 'i_price': 14.49870262950143, 'i_data': 'tyqyiwdkxxaqkyxleppjommfdzyojoqqyxfx', 'i_im_id': 4390}, {'i_id': 86847, 'i_name': 'knnjczehdtwwockwpv', 'i_price': 28.037320560710167, 'i_data': 'jncaoukfpvwphirxtfexszzckngpeauz', 'i_im_id': 1662}, {'i_id': 59275, 'i_name': 'joxapwcoottbarfjo', 'i_price': 71.20878869458735, 'i_data': 'tweekhdvzgwkrmntpcrtgngdzskzhfckfeqbio', 'i_im_id': 7604}, {'i_id': 88783, 'i_name': 'utuplvtoanopyzhykkvxxm', 'i_price': 82.65415910826509, 'i_data': 'ytjsvoklpkdweazjxhchzfcddbsbqbxi', 'i_im_id': 2906}  ... displaying 10 of 200000 total bound parameter sets ...  {'i_id': 60351, 'i_name': 'kjhxwxjszmtpeulpktqmp', 'i_price': 15.31778719710216, 'i_data': 'wuzmzzxthobhpgwcfwhckbtlf', 'i_im_id': 8846}, {'i_id': 91272, 'i_name': 'xzaiytcwaddpgye', 'i_price': 56.20498711072217, 'i_data': 'sfbuegxyzjipsmbjhkutkvvhnaxwhtekrgdhqrfwgxzrhn', 'i_im_id': 4700})]
(Background on this error at: https://sqlalche.me/e/14/gkpj)

In [None]:
metadata = MetaData(engine)
metadata.reflect()

for table_name in metadata.tables:
    df = pd.read_sql_table(table_name, engine)#, chunksize=1000):
    df = df.tail(420)

    unchanging = np.array([df[c].nunique() == 1 or all(df[c].isna()) for c in df.columns])
    objects = (~unchanging) & np.array([df[c].dtype == 'object' for c in df.columns])
    monotonics = (~objects) & (~unchanging) & np.array([df[c].is_monotonic_increasing for c in df.columns])
    randoms = ~(unchanging | objects | monotonics)

    assert all(unchanging ^ objects ^ monotonics ^ randoms), f"Error:\n{unchanging}\n{objects}\n{monotonics}\n{randoms}"

    u = df.loc[:, unchanging].reset_index(drop=True)

    o = df.loc[:, objects].reset_index(drop=True)
    # Shuffle the rows.
    o = o.sample(frac=1).reset_index(drop=True)

    m = df.loc[:, monotonics].reset_index(drop=True)

    r = df.loc[:, randoms].reset_index(drop=True)

    forecast = {}

    for df_subset in [m, r]:
        for col in df_subset:
            tsdf = pd.DataFrame()
            tsdf['ds'] = pd.date_range(start=0, periods=len(df.index), freq='1D')
            tsdf['y'] = df_subset[col]

            prophet = NeuralProphet(daily_seasonality=True, weekly_seasonality=True, yearly_seasonality=True)
            prophet.fit(tsdf, freq="D")
            dff = prophet.make_future_dataframe(tsdf, periods=len(df.index))
            predicted_prophet = prophet.predict(dff)['yhat1']
            
#             arima = pm.AutoARIMA()
#             arima.fit(tsdf['y'])
#             predicted_arima = arima.predict(n_periods=len(df.index))
            
#             ets = AutoETS(auto=True)
#             ets.fit(tsdf['y'].astype(float, errors='ignore'))
#             predicted_ets = ets.predict(tsdf.index.values + 1).values
            
            forecast[col] = predicted_prophet

    forecast = pd.DataFrame(forecast)

    fake = pd.concat([u,o,forecast], axis=1)[df.columns]


    table = metadata.tables[table_name]
    for col in table.columns:
        if col.type.python_type == int:
            fake[col.name] = fake[col.name].astype(int, errors='ignore')

In [None]:
s = pd.Series([1,2,3]*100)
ets = AutoETS(auto=True, trend='add', seasonal='add', sp=3)
ets.fit(s)
predicted_ets = ets.predict(s.index.values + 1)
display(predicted_ets)

In [None]:
s = pd.Series([1,2,3]*100)
arima = pm.AutoARIMA()
arima.fit(s)
predicted_arima = arima.predict(n_periods=len(s.index))
display(predicted_arima)

In [None]:
s = pd.Series([1,2,3]*100)
arima = AutoARIMA()
arima.fit(s)
predicted_arima = arima.predict(s.index.values + 1)
display(predicted_arima)

In [None]:
s = pd.Series([1,2,3,4,5]*10)
ptf = PolynomialTrendForecaster(degree=3)
ptf.fit(s)
predicted_ptf = ptf.predict(s.index.values + 1)
display(predicted_ptf)