# Applied Project in Big Data on Industrial Dataset

## DATA PROCESSING TECHNIQUES
## Part I. Use of Spark to create dataset

### 1. Libraries

In [None]:
import os
import sys
import json
import boto3
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
pd.set_option('display.max_columns', None)

In [None]:
def access_data(file_path):
    with open(file_path) as file:
        access_data = json.load(file)
    return access_data

creds = access_data(file_path='access_bucket.json')
print(creds.keys())

### 2. Browse files at S3

In [None]:
session = boto3.session.Session()
s3 = session.client(
    service_name='s3',
    aws_access_key_id=creds['aws_access_key_id'],
    aws_secret_access_key=creds['aws_secret_access_key'],
    endpoint_url='https://storage.yandexcloud.net'
)

In [None]:
OPTS_DATA_BUCKET = 'apid-data-options'

In [None]:
all_files = [key['Key'] for key in s3.list_objects(Bucket=OPTS_DATA_BUCKET)['Contents']]
print('files in storage:', all_files[:10]) # works only for num of files < 1000

### 3. Spark processing

In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import udf, struct, count_distinct, from_unixtime

In [None]:
def uiWebUrl(self):
    from urllib.parse import urlparse
    web_url = self._jsc.sc().uiWebUrl().get()
    port = urlparse(web_url).port
    return '{}proxy/{}/jobs/'.format(os.environ['JUPYTERHUB_SERVICE_PREFIX'], port)

SparkContext.uiWebUrl = property(uiWebUrl)

conf = SparkConf()
conf.set('spark.master', 'local[*]')
conf.set('spark.executor.memory', '24G')
conf.set('spark.driver.memory', '8G')
conf.set('spark.driver.maxResultSize', '4G')
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
spark._jsc.hadoopConfiguration().set('fs.s3a.access.key', creds['aws_access_key_id'])
spark._jsc.hadoopConfiguration().set('fs.s3a.secret.key', creds['aws_secret_access_key'])
spark._jsc.hadoopConfiguration().set('fs.s3a.impl','org.apache.hadoop.fs.s3a.S3AFileSystem')
spark._jsc.hadoopConfiguration().set('fs.s3a.multipart.size', '104857600')
spark._jsc.hadoopConfiguration().set('fs.s3a.block.size', '33554432')
spark._jsc.hadoopConfiguration().set('fs.s3a.threads.max', '256')
spark._jsc.hadoopConfiguration().set('fs.s3a.endpoint', 'http://storage.yandexcloud.net')
spark

#### 3.1. Read base data

In [None]:
# take 1 year for the start
file_path = f's3a://{OPTS_DATA_BUCKET}/' + 'data/L3_options_2017*.parquet'
options = spark.read.parquet(file_path)

In [None]:
options.limit(5).toPandas()

In [None]:
options.count()

In [None]:
min_date = options.agg({'date': 'min'}).collect()[0].asDict()['min(date)']
max_date = options.agg({'date': 'max'}).collect()[0].asDict()['max(date)']
print('from', min_date, 'to', max_date)

#### 3.2. Filter by assets

In [None]:
assets_count = (
    options
        .groupBy('base_symbol')
        .count()
        .orderBy('count', ascending=False)
)
assets_count.limit(10).toPandas()

In [None]:
assets_selected = assets_count.limit(10).collect()

In [None]:
assets_selected[0].asDict()

In [None]:
assets_selected = [x.asDict()['base_symbol'] for x in assets_selected]

In [None]:
assets_selected

#### 3.3. Data preprocessing

In [None]:
@udf
def check_if_out_of_money(option_type, base_price, strike):
    if option_type == 'call' and base_price < strike:
        return 1
    elif option_type == 'call' and base_price >= strike:
        return 0
    elif option_type == 'put' and base_price > strike:
        return 1
    elif option_type == 'put' and base_price <= strike:
        return 0

In [None]:
options_add_cols = (
    options 
        .filter(F.col('base_symbol').isin(assets_selected))
        .withColumn('date_parsed', F.to_date(F.col('date'), 'MM/dd/yyyy')) 
        .withColumn('day', F.dayofmonth(F.col('date_parsed'))) 
        .withColumn('month', F.month(F.col('date_parsed'))) 
        .withColumn('year', F.year(F.col('date_parsed'))) 
        .withColumn('exp_date_parsed', F.to_date(F.col('expiration'), 'MM/dd/yyyy')) 
        .withColumn('days_diff', F.datediff(F.col('exp_date_parsed'), F.col('date_parsed'))) 
        .withColumn('weeks_diff', F.col('days_diff') / 7) 
        .withColumn('bid_ask_mean', (F.col('bid') + F.col('ask')) / 2) 
        .withColumn('is_call_option', (F.col('type') == 'call').cast(IntegerType())) 
        .withColumn('strike_over_base', F.col('strike') / F.col('base_price')) 
        .withColumn(
            'out_of_money', check_if_out_of_money(
                F.col('type'),
                F.col('base_price'),
                F.col('strike')
            ).cast(IntegerType())
        )
        .drop('date', 'expiration', 'aka') 
        .withColumnRenamed('exp_date_parsed', 'expiration_date') 
        .withColumnRenamed('date_parsed', 'date') 
        .select(
            'base_symbol',
            'base_price',
            'option_symbol',
            'type',
            'is_call_option',
            'date',
            'expiration_date',
            'days_diff',
            'bid_ask_mean',
            'strike',
            'strike_over_base',
            'out_of_money',
            'volume',
        )
        .orderBy('date')
)

In [None]:
options_add_cols.limit(3).toPandas()

In [None]:
options_add_cols.count()

#### 3.4. Volatilities

In [None]:
stocks_data = (
    options
        .select(
            'base_symbol',
            'base_price',
            'date'
        )
        .withColumn('date_parsed', F.to_date(F.col('date'), 'MM/dd/yyyy'))
        .drop('date')
        .withColumnRenamed('date_parsed', 'date')
        .groupBy('base_symbol', 'date')
        .agg(
             F.first('base_price').alias('base_price')
        )
        .orderBy('date')
)

In [None]:
stocks_data.limit(3).toPandas()

In [None]:
from pyspark.sql.window import Window

In [None]:
# timestamp is interpreted as UNIX timestamp in seconds
days = lambda x: x * 86400 

In [None]:
d1 = (Window()
      .partitionBy(F.col('base_symbol'))
      .orderBy(F.col('date').cast('timestamp').cast('long'))
      .rangeBetween(-days(1 + 1), -days(1)))
d2 = (Window()
      .partitionBy(F.col('base_symbol'))
      .orderBy(F.col('date').cast('timestamp').cast('long'))
      .rangeBetween(-days(2 + 1), -days(1)))
d3 = (Window()
      .partitionBy(F.col('base_symbol'))
      .orderBy(F.col('date').cast('timestamp').cast('long'))
      .rangeBetween(-days(3 + 1), -days(1)))
w1 = (Window()
      .partitionBy(F.col('base_symbol'))
      .orderBy(F.col('date').cast('timestamp').cast('long'))
      .rangeBetween(-days(7 + 1), -days(1)))
w2 = (Window()
      .partitionBy(F.col('base_symbol'))
      .orderBy(F.col('date').cast('timestamp').cast('long'))
      .rangeBetween(-days(2 * 7 + 1), -days(1)))

In [None]:
stocks_volatilities = (
    stocks_data
        .withColumn('1d_mean', F.mean('base_price').over(d1))
        .withColumn('2d_mean', F.mean('base_price').over(d2))
        .withColumn('3d_mean', F.mean('base_price').over(d3))
        .withColumn('1w_mean', F.mean('base_price').over(w1))
        .withColumn('2w_mean', F.mean('base_price').over(w2))
        .withColumn('1d_std', F.stddev('base_price').over(d1))
        .withColumn('2d_std', F.stddev('base_price').over(d2))
        .withColumn('3d_std', F.stddev('base_price').over(d3))
        .withColumn('1w_std', F.stddev('base_price').over(w1))
        .withColumn('2w_std', F.stddev('base_price').over(w2))
        .withColumn('1d_volatility', F.col('1d_std') / F.col('1d_mean'))
        .withColumn('2d_volatility', F.col('2d_std') / F.col('2d_mean'))
        .withColumn('3d_volatility', F.col('3d_std') / F.col('3d_mean'))
        .withColumn('1w_volatility', F.col('1w_std') / F.col('1w_mean'))
        .withColumn('2w_volatility', F.col('2w_std') / F.col('2w_mean'))
        .select(
            'base_symbol',
            'date',
            '1d_mean',
            '2d_mean',
            '3d_mean',
            '1w_mean',
            '2w_mean',
            '1d_volatility',
            '2d_volatility',
            '3d_volatility',
            '1w_volatility',
            '2w_volatility'
        )
)

In [None]:
stocks_volatilities.limit(5).toPandas()

In [None]:
min_date = stocks_volatilities.agg({'date': 'min'}).collect()[0].asDict()['min(date)']
max_date = stocks_volatilities.agg({'date': 'max'}).collect()[0].asDict()['max(date)']
print(min_date, max_date)

In [None]:
import datetime

In [None]:
start_date = min_date + datetime.timedelta(weeks=2) #datetime.date(2017, 1, 1)
end_date = max_date

features = (
    options_add_cols.join(stocks_volatilities, on=['base_symbol', 'date'], how='left')
        .filter(F.col('date') > start_date)
        .filter(F.col('date') <= end_date)
        .orderBy('date')
        .drop('option_symbol', 'expiration_date', 'type', 'date')
)

In [None]:
features.limit(5).toPandas()

### 4. Dataset

#### 4.1. Save features

In [None]:
features = features.toPandas()
features.to_csv('features.csv')
features.info()

#### 4.2. External data

In [None]:
markets = pd.read_csv('../../__OPTIONS/Sector_Industry_Country_MarketCap.csv')
markets = markets.rename(columns={'Ticker': 'base_symbol'})
markets = markets[["base_symbol","Sector", "Industry", "Country"]].copy()
print(markets.shape)
markets.describe()

In [None]:
display(markets.head())

In [None]:
markets_one_hot = pd.get_dummies(markets[['Sector', 'Country']])

In [None]:
markets = markets.join(markets_one_hot)
print(markets.shape)
markets.describe()

In [None]:
display(markets.head())

#### 4.3. Create dataset

In [None]:
features = features.join(
    markets.set_index('base_symbol'),
    on=['base_symbol'], 
    how='left'
)
print(features.shape)
display(features.head())

In [None]:
features = features.dropna()
print(features.shape)
display(features.head())

In [None]:
features['Sector'].groupby(features['Sector']).count()

In [None]:
markets['Country'].groupby(markets['Country']).count()

In [None]:
features = features.drop([
    'base_symbol',
    'Sector',
    'Industry',
    'Country'], axis=1)
print(features.shape)
display(features.head())

#### 4.4. Train-test split

In [None]:
target_col = 'bid_ask_mean'
feats_cols = [x for x in features.columns if x not in target_col]

In [None]:
from sklearn.model_selection import train_test_split

In [None]:
X_train, X_test, y_train, y_test = train_test_split(
    features[feats_cols], 
    features[target_col], 
    test_size=.3, 
    random_state=2022
)
print(f"X_train shape: {X_train.shape}")
print(f"y_train shape: {y_train.shape}")
print(f"X_test shape: {X_test.shape}")
print(f"y_test shape: {y_test.shape}")

### 5. Model

Next topic...