# Data proprocessing
We are preprocessing here 2020 stock data.
This script creates 4 folders:
* extrapolated_data.parquet - data preprocesses up to and including extrapolation
* hourly_data.parquet - data preprocessed with hourly frequency (made out of extrapolated_data.parquet)
* pearson_corr.parquet - data preprocessed specifically for usage with pearson correlation
* MI_corr.parquet - data preprocessed specifically for usage with MI correlation

In [43]:
from datetime import datetime
from operator import attrgetter
import os

import pandas as pd
from pyspark import SparkContext, SQLContext
from pyspark.sql import functions as fn
from pyspark.sql.types import StructField, StructType, DoubleType, StringType
from pyspark.sql.window import Window
from pyspark.ml.feature import QuantileDiscretizer

In [44]:
SC = SparkContext()
print(SC.version)
SQL_CONTEXT = SQLContext(SC)

2.4.5


In [45]:
DATA_LOCATION = "./2020/*.txt"

In [46]:
STOCKS_SCHEMA = StructType([
    StructField('date', StringType(), False),
    StructField('time', StringType(), False),
    StructField('opening_price', DoubleType(), False),
    StructField('highest_price', DoubleType(), False),
    StructField('lowest_price', DoubleType(), False),
    StructField('closing_price', DoubleType(), False),
    StructField('sum_of_transactions', DoubleType(), False),
])

In [47]:
def read_files_into_df(file_location):
    headers = (
        'date', 'time', 'opening_price', 
        'highest_price', 'lowest_price', 
        'closing_price', 'sum_of_transactions',
    )
    
    df = (
        SQL_CONTEXT
        .read
        .format('csv')
        .options(
            header=False, delimiter=',', 
            inferSchema=True, schema=STOCKS_SCHEMA,
        )
        .load(file_location)
        .toDF(*headers)
    )
    return df

In [48]:
def get_stock_name(df):
    stock_name_regex = '(.*\/[0-9]*_)|(_NoExpiry.txt)'
    df = df.withColumn("filename", fn.input_file_name())
    df = df.withColumn(
        "stock", 
        fn.split(fn.col('filename'), stock_name_regex)[1],
    )
    df = df.drop('filename')
    return df

In [49]:
def get_timestamp(df):
    df = df.withColumn(
        'date_time',
        fn.concat(fn.col('date'),fn.lit(' '), fn.col('time')),
    )
    df = df.withColumn(
        'timestamp', 
        fn.to_timestamp(fn.col("date_time"),"MM/dd/yyyy HH:mm"),
    )
    columns_to_drop = ['time', 'date_time']
    df = df.drop(*columns_to_drop)
    return df

In [50]:
def filter_months(df, no_months):
    """Returns a dataframe where stocks have at least 
    data of 'no_months' different months.
    """
    group_by_stock_data = df.groupBy(["stock"])
    df_no_months = group_by_stock_data.agg(
        fn.countDistinct(fn.month('timestamp')).alias('no_months')
    )
    df_all_months = df_no_months.filter(df_no_months['no_months'] == no_months)
    df = df.join(df_all_months, on=['stock'], how='inner').drop('no_months')
    return df

In [51]:
def filter_hours(df, start, end):
    return df.filter(
        (
            (fn.hour(df['timestamp']) >= start) 
            & (fn.hour(df['timestamp']) <= (end - 1))
        ) | (
            (fn.minute(df['timestamp']) == 0) 
            & (fn.hour(df['timestamp']) == end)
        )
    )

In [52]:
def deduplicate_data(df):
    """ Returns a dataframe where duplicate 
    timestamps are aggregrated. We don't take 
    sum_of_transactions anymore, because 
    we don't use it later in the data.
    """
    group_data = df.groupBy(["stock","timestamp","date"])
    df_dedup = group_data.agg(
        fn.avg('opening_price').alias('opening_price'),
        fn.max('highest_price').alias('highest_price'),
        fn.min('lowest_price').alias('lowest_price'),
        fn.avg('closing_price').alias('closing_price')
    ).orderBy(["stock","timestamp"])
    
    return df_dedup

In [53]:
def filter_frequency(df, freq):
    """ Returns a dataframe where the stocks that have 
    less than some number of rows are filtered out.
    """
    group_by_stock_data = df.groupBy(["stock"])
    
    df_nr_of_records = group_by_stock_data.agg(
        fn.count('stock').alias('nr_of_records')
    )
    
    df_eligible_stocks = df_nr_of_records.filter(
        df_nr_of_records['nr_of_records'] > freq
    )
    
    df = df.join(
        df_eligible_stocks, on=['stock'], how='inner',
    ).drop('nr_of_records')
    
    return df

In [54]:
def resample(schema, freq, timestamp_col = "timestamp", **kwargs):
    """ Returns a function that resamples at a certain 
    frequency and interpolates the data.
    
    Keyword arguments:
    schema -- dataframe schema
    freq -- frequency at which need to be resampled
    timestamp_col -- column used for resampling/interpolation (default 'timestamp')
    """
    @fn.pandas_udf(
        StructType(sorted(schema, key=attrgetter("name"))), 
        fn.PandasUDFType.GROUPED_MAP,
    )
    
    def _(pdf):
        pdf.set_index(timestamp_col, inplace=True)
        pdf = pdf.resample(freq).interpolate()
        pdf.ffill(inplace=True)
        pdf.reset_index(drop=False, inplace=True)
        pdf.sort_index(axis=1, inplace=True)
        return pdf
    
    return _

In [55]:
def interpolate(df, freq_range='60S'):
    """ Returns a dataframe where each stock, 
    at the available dates, is resampled at 'freq_range' 
    and interpolation is used to fill missing values.
    """
    return (
        df
        .groupBy(["stock", "date"])
        .apply(resample(df.schema, freq_range))
    )

In [56]:
def get_days_that_have_enough_stocks(df, limit):
    diff_stocks_per_day_df = (
        df
        .groupBy(["date"])
        .agg(fn.countDistinct('stock').alias('diff_stocks'))
        .orderBy("date")
    )
    enough_stocks_df = diff_stocks_per_day_df.filter(
        diff_stocks_per_day_df['diff_stocks'] >= limit
    )
    return enough_stocks_df.select("date").toPandas()

In [57]:
def get_day_range(days, start_hour, end_hour, freq, index_name='timestamp'):
    """ Returns the range as DatetimeIndex.
    
    Keyword arguments:
    days -- days of the period as pandas df
    start_time -- time at which a working day starts as string (using 24h)
    end_time -- time at which a working day ends as string (using 24h)
    freq -- frequency at which need to be resampled
    index_name -- name of the index (default 'timestamp')
    """
    days['date'] = pd.to_datetime(days['date'])
    index_date = pd.Series(days['date'])
    index_time = pd.date_range(start_hour, end_hour, freq=freq)
    index_time = pd.Series(index_time.time)

    index = index_date.apply(
        lambda d: index_time.apply(
            lambda t: datetime.combine(d, t)
            )
        ).unstack().sort_values().reset_index(drop=True)
    
    return pd.DatetimeIndex(index, name='timestamp')

In [58]:
def extrapolate_fn(schema, working_range, timestamp_col = "timestamp", **kwargs):
    """ Returns a function that aligns time between stocks 
    and 'extrapolates' the data by filling missing values 
    by the nearest.
        
    Keyword arguments:
    schema -- dataframe schema
    working_range -- range of timestamps for which values need to be generated for each stock
    timestamp_col -- column used for resampling/extrapolation (default 'timestamp')
    """
    @fn.pandas_udf(
        StructType(sorted(schema, key=attrgetter("name"))), 
        fn.PandasUDFType.GROUPED_MAP,
    )
    def _(pdf):
        pdf.set_index(timestamp_col, inplace=True)
        pdf.sort_index(inplace=True)
        pdf = pdf.reindex(working_range, method='nearest')
        pdf.reset_index(drop=False, inplace=True)
        pdf.sort_index(axis=1, inplace=True)
        return pdf
    return _

In [59]:
def extrapolate(df, min_no_of_stocks, start_hour, end_hour):
    """ Returns a dataframe where the stocks are time aligned
    and missing values are filled by the nearest available value.
    """
    days = get_days_that_have_enough_stocks(df, min_no_of_stocks)
    day_index = get_day_range(
        days, start_hour=start_hour, 
        end_hour=end_hour, freq='1min', 
        index_name='timestamp',
    )
    return df.groupBy(["stock"]).apply(extrapolate_fn(df.schema, day_index))

In [60]:
def turn_data_to_hourly(df):
    """ Returns a dataframe where stock data is aggregate by hour.
    """
    df_with_hours = df.withColumn(
        'timestamp', 
        fn.date_trunc('Hour', df['timestamp']),
    )
    group_data = df_with_hours.groupBy(["stock","timestamp"])
    hourly_df = group_data.agg(
            fn.first('opening_price').alias('opening_price'),
            fn.max('highest_price').alias('highest_price'),
            fn.min('lowest_price').alias('lowest_price'),
            fn.last('closing_price').alias('closing_price'),
    ).orderBy(["stock","timestamp"])
    return hourly_df

In [61]:
def convert_for_MI(df, value):
    window = Window.partitionBy(
        ['stock', fn.month('timestamp')]
    ).orderBy('timestamp')
    
    prev_value = "prev_value_" + value
    df = df.withColumn(prev_value, fn.lag(df[value]).over(window))
    df = df.withColumn(
        "MI_" + value, 
        (fn
         .when(fn.isnull(df[prev_value]) | (df[prev_value] == df[value]), 0)
         .when(df[value] > df[prev_value], 1)
         .otherwise(-1)
        ),
    )
    df = df.drop(prev_value)
    return df

In [62]:
def discretize_for_MI(df,no_buckets,value):
    discretizer = QuantileDiscretizer(
        numBuckets=no_buckets, 
        inputCol=value,
        outputCol="MI_" + value,
    )
    return discretizer.fit(df).transform(df)

In [63]:
def preprocess_MI(df, values,no_buckets):
    for value in values:
        df = discretize_for_MI(df, no_buckets, value)
    return df

In [64]:
def preprocess_pearson(df, values):
    group_data = df.groupBy(["stock"])
    averages = [fn.avg(value).alias('avg_'+value) for value in values]
    df_avg = group_data.agg(*averages)
    
    df = df.join(df_avg, on=['stock'], how='inner')
    for value in values:
        df = df.withColumn("p_" + value, df[value] - df['avg_'+value])
    
    drop_columns = ['avg_' + value for value in values]
    df = df.drop(*drop_columns)
    return df

In [65]:
if not os.path.isdir('extrapolated_data.parquet'):
    df = read_files_into_df(DATA_LOCATION)
    df = get_stock_name(df)
    df = get_timestamp(df)

    df = filter_months(df, 4)
    df = filter_hours(df, 9, 17)
    df = deduplicate_data(df)

    # 12000 is around the minimum nr of records you 
    # should have for ~22 working days per month, 
    # 3 months + 10 more days in April, 8h a day, 5min freq interval
    # after this filtering we have around 666 different stocks left.
    df = filter_frequency(df, 12000)

    # Conda install -c conda-forge pyarrow=0.13.
    df = interpolate(df)
    
    # We checked how many different stocks each day has and 
    # decided that taking 650 as a minimum stock limit is the best
    df = extrapolate(df, 650, '09:00:00', '17:00:00')
    
    df.write.parquet('extrapolated_data.parquet')
    
df = SQL_CONTEXT.read.parquet('extrapolated_data.parquet')

In [66]:
if not os.path.isdir('hourly_data.parquet'):
    df = turn_data_to_hourly(df)
    df.write.parquet('hourly_data.parquet')
    
df = SQL_CONTEXT.read.parquet('hourly_data.parquet')

In [None]:
columns_for_corr = [
    'opening_price', 'highest_price', 
    'lowest_price', 'closing_price',
]

if not os.path.isdir('pearson_corr.parquet'):   
    pearson_df = preprocess_pearson(df, columns_for_corr)
    pearson_df.write.parquet('pearson_corr.parquet')
    
if not os.path.isdir('MI_corr.parquet'):   
    # 200 bins is the optimal no of bins 
    MI_df = preprocess_MI(df, columns_for_corr, 200)
    MI_df.write.parquet('MI_corr.parquet')

In [75]:
SC.stop()