# Introduction to Big Data Modern Technologies course

## TOPIC 5: Data lake concept and tools
### Part 2. Apache Spark for ML models

### 1. Libraries

In [None]:
import os
import sys
import json
import boto3
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
pd.set_option('display.max_columns', None)

In [None]:
def access_data(file_path):
    with open(file_path) as file:
        access_data = json.load(file)
    return access_data

creds = access_data(file_path='access_bucket.json')
print(creds.keys())

<font color='red'>__IMPORTANT NOTE__</font>
1. Do not set credentials (keys, secrets, passwords) explicitly in your code
2. Do not print out variables with credentials in ypur code

### 2. Browse files at S3

In [None]:
session = boto3.session.Session()
s3 = session.client(
    service_name='s3',
    aws_access_key_id=creds['aws_access_key_id'],
    aws_secret_access_key=creds['aws_secret_access_key'],
    endpoint_url='https://storage.yandexcloud.net'
)

In [None]:
OPTS_DATA_BUCKET = 'apid-data-options'

In [None]:
all_files = [key['Key'] for key in s3.list_objects(Bucket=OPTS_DATA_BUCKET)['Contents']]
print('files in storage:', all_files[:10]) # works only for num of files < 1000

### 3. Data preprocessing with Spark

In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import udf, struct, count_distinct, from_unixtime

In [None]:
# web UI for the Spark

def uiWebUrl(self):
    from urllib.parse import urlparse
    web_url = self._jsc.sc().uiWebUrl().get()
    port = urlparse(web_url).port
    return '{}proxy/{}/jobs/'.format(os.environ['JUPYTERHUB_SERVICE_PREFIX'], port)

SparkContext.uiWebUrl = property(uiWebUrl)

# Spark settings
conf = SparkConf()
conf.set('spark.master', 'local[*]')    # max 5 cores available, use `local[*]` for all cores
conf.set('spark.driver.memory', '16G')  # max 16 GB available
conf.set('spark.driver.maxResultSize', '4G')
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

# Spark's access for object storage settings
spark._jsc.hadoopConfiguration().set('fs.s3a.access.key', creds['aws_access_key_id'])
spark._jsc.hadoopConfiguration().set('fs.s3a.secret.key', creds['aws_secret_access_key'])
spark._jsc.hadoopConfiguration().set('fs.s3a.impl','org.apache.hadoop.fs.s3a.S3AFileSystem')
spark._jsc.hadoopConfiguration().set('fs.s3a.multipart.size', '104857600')
spark._jsc.hadoopConfiguration().set('fs.s3a.block.size', '33554432')
spark._jsc.hadoopConfiguration().set('fs.s3a.threads.max', '256')
spark._jsc.hadoopConfiguration().set('fs.s3a.endpoint', 'http://storage.yandexcloud.net')
spark._jsc.hadoopConfiguration().set('fs.s3a.aws.credentials.provider', 
                                     'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')

spark

#### 3.1. Read base data

Our data refers to [Options Stock markets](https://www.investopedia.com/terms/o/option.asp) and contains gigabytes of the data from foreign stock markets. The data is kindly provided by VTB experts for educational purposes only.

In [None]:
# take 1 year for the start
file_path = f's3a://{OPTS_DATA_BUCKET}/' + 'data/L3_options_2016*.parquet'
options = spark.read.parquet(file_path)

In [None]:
options.limit(5).toPandas()

In [None]:
options.count()

In [None]:
min_date = options.agg({'date': 'min'}).collect()[0].asDict()['min(date)']
max_date = options.agg({'date': 'max'}).collect()[0].asDict()['max(date)']
print('from', min_date, 'to', max_date)

#### 3.2. Filter by assets

In [None]:
assets_count = (
    options
        .groupBy('base_symbol')
        .count()
        .orderBy('count', ascending=False)
)
assets_count.limit(10).toPandas()

In [None]:
assets_selected = assets_count.limit(10).collect()

In [None]:
assets_selected[0].asDict()

In [None]:
assets_selected = [x.asDict()['base_symbol'] for x in assets_selected]

In [None]:
assets_selected

#### 3.3. Data preprocessing

In [None]:
@udf
def check_if_out_of_money(option_type, base_price, strike):
    if option_type == 'call' and base_price < strike:
        return 1
    elif option_type == 'call' and base_price >= strike:
        return 0
    elif option_type == 'put' and base_price > strike:
        return 1
    elif option_type == 'put' and base_price <= strike:
        return 0

In [None]:
options_add_cols = (
    options 
        .filter(F.col('base_symbol').isin(assets_selected))
        .withColumn('date_parsed', F.to_date(F.col('date'), 'MM/dd/yyyy')) 
        .withColumn('day', F.dayofmonth(F.col('date_parsed'))) 
        .withColumn('month', F.month(F.col('date_parsed'))) 
        .withColumn('year', F.year(F.col('date_parsed'))) 
        .withColumn('exp_date_parsed', F.to_date(F.col('expiration'), 'MM/dd/yyyy')) 
        .withColumn('days_diff', F.datediff(F.col('exp_date_parsed'), F.col('date_parsed'))) 
        .withColumn('weeks_diff', F.col('days_diff') / 7) 
        .withColumn('bid_ask_mean', (F.col('bid') + F.col('ask')) / 2) 
        .withColumn('is_call_option', (F.col('type') == 'call').cast(IntegerType())) 
        .withColumn('strike_over_base', F.col('strike') / F.col('base_price')) 
        .withColumn(
            'out_of_money', check_if_out_of_money(
                F.col('type'),
                F.col('base_price'),
                F.col('strike')
            ).cast(IntegerType())
        )
        .drop('date', 'expiration', 'aka') 
        .withColumnRenamed('exp_date_parsed', 'expiration_date') 
        .withColumnRenamed('date_parsed', 'date') 
        .select(
            'base_symbol',
            'base_price',
            'option_symbol',
            'type',
            'is_call_option',
            'date',
            'expiration_date',
            'days_diff',
            'bid_ask_mean',
            'strike',
            'strike_over_base',
            'out_of_money',
            'volume',
        )
        .orderBy('date')
)

In [None]:
options_add_cols.limit(3).toPandas()

In [None]:
options_add_cols.count()

#### 3.4. Volatilities

In [None]:
stocks_data = (
    options
        .select(
            'base_symbol',
            'base_price',
            'date'
        )
        .withColumn('date_parsed', F.to_date(F.col('date'), 'MM/dd/yyyy'))
        .drop('date')
        .withColumnRenamed('date_parsed', 'date')
        .groupBy('base_symbol', 'date')
        .agg(
             F.first('base_price').alias('base_price')
        )
        .orderBy('date')
)

In [None]:
stocks_data.limit(3).toPandas()

In [None]:
from pyspark.sql.window import Window

In [None]:
# timestamp is interpreted as UNIX timestamp in seconds
days = lambda x: x * 86400 

In [None]:
d1 = (Window()
      .partitionBy(F.col('base_symbol'))
      .orderBy(F.col('date').cast('timestamp').cast('long'))
      .rangeBetween(-days(1 + 1), -days(1)))
d2 = (Window()
      .partitionBy(F.col('base_symbol'))
      .orderBy(F.col('date').cast('timestamp').cast('long'))
      .rangeBetween(-days(2 + 1), -days(1)))

# HOME ASSIGNMENT
# create `d3` (three days before lag) 
# try to use folowing code:
# d3 = (Window()
#       .partitionBy(F.col('base_symbol'))
#       .orderBy(F.col('date').cast('timestamp').cast('long'))
#       .rangeBetween(-days(<DAYS_BEFORE_LAG> + 1), -days(1)))

w1 = (Window()
      .partitionBy(F.col('base_symbol'))
      .orderBy(F.col('date').cast('timestamp').cast('long'))
      .rangeBetween(-days(7 + 1), -days(1)))

# HOME ASSIGNMENT
# w2(two weeks before lag)
# try to use folowing code:
# w2 = (Window()
#       .partitionBy(F.col('base_symbol'))
#       .orderBy(F.col('date').cast('timestamp').cast('long'))
#       .rangeBetween(-days(<DAYS_BEFORE_LAG> * <NUM_WEEKS_BEFORE_LAG> + 1), -days(1)))

In [None]:
stocks_volatilities = (
    stocks_data
        .withColumn('1d_mean', F.mean('base_price').over(d1))
        .withColumn('2d_mean', F.mean('base_price').over(d2))
        # HOME ASSIGNMENT
        # add `mean` column for `d3`
        # you may use code like:
        # .withColumn('3d_mean', F.mean('base_price').over(<WINDOW_D3>))
        .withColumn('1w_mean', F.mean('base_price').over(w1))
        # HOME ASSIGNMENT
        # add `mean` column for `w2`
        # <YOUR_CODE_HERE>
        .withColumn('1d_std', F.stddev('base_price').over(d1))
        .withColumn('2d_std', F.stddev('base_price').over(d2))
        # HOME ASSIGNMENT
        # add `std` column for `d3`
        # <YOUR_CODE_HERE>
        .withColumn('1w_std', F.stddev('base_price').over(w1))
        # HOME ASSIGNMENT
        # add `std` column for `w2`
        # <YOUR_CODE_HERE>
        .withColumn('1d_volatility', F.col('1d_std') / F.col('1d_mean'))
        .withColumn('2d_volatility', F.col('2d_std') / F.col('2d_mean'))
        # HOME ASSIGNMENT
        # add `volatility` column for `d3`
        # <YOUR_CODE_HERE>
        .withColumn('1w_volatility', F.col('1w_std') / F.col('1w_mean'))
        # HOME ASSIGNMENT
        # add `volatility` column for `w2`
        # <YOUR_CODE_HERE>
        .select(
            'base_symbol',
            'date',
            '1d_mean',
            '2d_mean',
            '1w_mean',
            '1d_volatility',
            '2d_volatility',
            '1w_volatility'
        )
)

In [None]:
stocks_volatilities.limit(5).toPandas()

In [None]:
min_date = stocks_volatilities.agg({'date': 'min'}).collect()[0].asDict()['min(date)']
max_date = stocks_volatilities.agg({'date': 'max'}).collect()[0].asDict()['max(date)']
print(min_date, max_date)

In [None]:
import datetime

In [None]:
start_date = min_date + datetime.timedelta(weeks=2) #datetime.date(2017, 1, 1)
end_date = max_date

features = (
    options_add_cols.join(stocks_volatilities, on=['base_symbol', 'date'], how='left')
        .filter(F.col('date') > start_date)
        .filter(F.col('date') <= end_date)
        .orderBy('date')
        .drop('option_symbol', 'expiration_date', 'type', 'date')
)

In [None]:
features.limit(5).toPandas()

### 4. Dataset

#### 4.1. External data

In [None]:
!ls ~/__DATA/IBDT_Spring_2024/topic_5/

In [None]:
markets = spark.read.csv(
    '/home/jovyan/__DATA/IBDT_Spring_2024/topic_5/Sector_Industry_Country_MarketCap.csv',
    sep=',', 
    header=True
)
markets.printSchema()

In [None]:
markets.count()

In [None]:
markets.show(5)

In [None]:
sectors_dummy = markets.groupBy("Ticker").pivot("Sector").agg(F.lit(1)).na.fill(0)
sectors_dummy.limit(5).toPandas()

In [None]:
countries_dummy = markets.groupBy("Ticker").pivot("Country").agg(F.lit(1)).na.fill(0)
countries_dummy.limit(5).toPandas()

#### 4.2. Add external data and finalizing dataset

In [None]:
features = features.join(sectors_dummy, features.base_symbol == sectors_dummy.Ticker)
features = features.drop(features.Ticker)
features.limit(5).toPandas()

In [None]:
features = features.join(countries_dummy, features.base_symbol == countries_dummy.Ticker)
features = features.drop(features.Ticker)
features.limit(5).toPandas()

In [None]:
features.columns

In [None]:
# that is what we are trying to predict
y_col = 'bid_ask_mean'

In [None]:
x_cols = [x for x in features.columns if x not in ['base_symbol', y_col]]
print(x_cols)

### 5. Modelling with Spark

#### 5.1. Training

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor, LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.mllib.evaluation import RegressionMetrics
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [None]:
vecAssembler = VectorAssembler(
    inputCols=x_cols,     # all X columns except y-column
    outputCol='features'  # name for assembled rows
)

In [None]:
features = features.dropna()

In [None]:
features_vec = vecAssembler.transform(features)
features_vec.limit(5).toPandas()

In [None]:
features_data = features_vec.select('bid_ask_mean', 'features')
features_data.limit(5).toPandas()

In [None]:
features_data = features_data.withColumnRenamed('bid_ask_mean', 'label')
features_data.limit(5).toPandas()

In [None]:
# will use random forest
rf = RandomForestRegressor(
    labelCol='label', 
    featuresCol='features'
)

# HOME ASSIGNMENT
# you may want to implement linear regression
# with the following code:
# lr = LinearRegression(
#     labelCol=<LABEL_COLUMN>, 
#     featuresCol=<FEATURES_COLUMN>
# )

# pipeline may include many steps to prepare data
pipeline = Pipeline(stages=[rf])

# HOME ASSIGNMENT
# you may want to implement new 
# pipeline for linear regression
# with the following code:
# pipeline = Pipeline(stages=[<YOUR_NEW_REGRESSOR>])

# search for best parameters
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [5, 10]) \
    .addGrid(rf.maxDepth, [1, 2]) \
    .build()

# HOME ASSIGNMENT
# you may want to implement new 
# parameters grid search 
# with the following code:
# paramGrid = ParamGridBuilder() \
#     .addGrid(<PARAMETER_TO_TUNE>, <LIST_OF_VALUES>) \
#     .build()

# cross-validation strategy
cross_val = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=RegressionEvaluator(),
    numFolds=4)

In [None]:
%%time

# takes about 5 min

feat_train, feat_test = features_data.randomSplit([.8, .2], seed=2024)
model = cross_val.fit(feat_train)

#### 5.2. Evaluation

In [None]:
predictions = model.transform(feat_test)

In [None]:
evaluator = RegressionEvaluator(
    labelCol="label", 
    predictionCol="prediction", 
    metricName="rmse"
)
rmse = evaluator.evaluate(predictions)
print('RMSE:', rmse)

In [None]:
df = predictions.toPandas()
df.head()

In [None]:
df.shape

In [None]:
def WMAPE(y_true, y_pred): 
    y_true, y_pred = np.array(y_true), np.array(y_pred)
    return np.sum(np.abs(y_true - y_pred)) / np.sum(y_true) * 100

wmape = WMAPE(df.label.to_list(), df.prediction.to_list())

In [None]:
plt.figure(figsize=(16, 8))
plt.plot(
    df.label.to_list(), 
    df.prediction.to_list(), 
    'bo'
)
plt.xlabel('Bid-ask mean price')
plt.ylabel('Prediction')
plt.suptitle(f'RMSE: {rmse:.1f},  WMAPE: {wmape:.1f}')
plt.show()

### 6. Home assignment

Your home assignment will be to implement `LinearRegression` to predict `bid_ask_mean` as it was done with Random Forest regression model. Here is [a manual for linear regression with Spark ML](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.regression.LinearRegression.html). Note, that you have to tune only one parameter `regParam` for the linear regression model in the `ParamGridBuilder`.

You will also have to:
- use data for year 2017 e.g. `file_path = f's3a://{OPTS_DATA_BUCKET}/' + 'data/L3_options_2017*.parquet` for the data to load (year 2017 has more observations)
- add more features for stock volatilities in `3.4. Volatilities`: `d3` (three days before lag) and `w2`(two weeks before lag) to dataset