In [None]:
# Import necessary libraries
import pyspark
import os
import sys
from pyspark import SparkContext
from pyspark.sql import SparkSession

# Set environment for PySpark
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

# Initialize Spark session with specific memory allocation
spark = SparkSession.builder.config("spark.driver.memory", "16g").appName('chapter_8').getOrCreate()

# Load stock data from multiple CSV files
stocks = spark.read.csv(["data/stocksA/ABAX.csv", "data/stocksA/AAME.csv", "data/stocksA/AEPI.csv"], 
                        header='true', inferSchema='true')
stocks.show(2)

# Import functions from PySpark
from pyspark.sql import functions as fun

# Add Symbol column and extract symbol from file name
stocks = stocks.withColumn("Symbol", fun.input_file_name()).\
    withColumn("Symbol", fun.element_at(fun.split("Symbol", "/"), -1)).\
    withColumn("Symbol", fun.element_at(fun.split("Symbol", "\\."), 1))
stocks.show(2)

# Load factor data from CSV files
factors = spark.read.csv(["data/stocksA/ABAX.csv", "data/stocksA/AAME.csv", "data/stocksA/AEPI.csv"], 
                         header='true', inferSchema='true')

# Add Symbol column and extract symbol from file name for factors
factors = factors.withColumn("Symbol", fun.input_file_name()).\
    withColumn("Symbol", fun.element_at(fun.split("Symbol", "/"), -1)).\
    withColumn("Symbol", fun.element_at(fun.split("Symbol", "\\."), 1))

# Filter stocks based on a minimum count threshold
from pyspark.sql import Window
stocks = stocks.withColumn('count', fun.count('Symbol').over(Window.partitionBy('Symbol'))).\
    filter(fun.col('count') > 260 * 5 + 10)

# Configure time parser policy and convert date format
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
stocks = stocks.withColumn('Date', fun.to_date(fun.to_timestamp(fun.col('Date'), 'dd-MMM-yy')))
stocks.printSchema()

# Filter data within specified date range
from datetime import datetime
stocks = stocks.filter(fun.col('Date') >= datetime(2009, 10, 23)).\
    filter(fun.col('Date') <= datetime(2014, 10, 23))
factors = factors.withColumn('Date', fun.to_date(fun.to_timestamp(fun.col('Date'), 'dd-MMM-yy')))
factors = factors.filter(fun.col('Date') >= datetime(2009, 10, 23)).\
    filter(fun.col('Date') <= datetime(2014, 10, 23))

# Convert to Pandas DataFrames
stocks_pd_df = stocks.toPandas()
factors_pd_df = factors.toPandas()
factors_pd_df.head(5)

# Define rolling window length for returns calculation
n_steps = 10

# Function to calculate return over time period
def my_fun(x):
    return ((x.iloc[-1] - x.iloc[0]) / x.iloc[0])

# Calculate stock and factor returns using rolling window
stock_returns = stocks_pd_df.groupby('Symbol').Close.rolling(window=n_steps).apply(my_fun)
factors_returns = factors_pd_df.groupby('Symbol').Close.rolling(window=n_steps).apply(my_fun)

# Reset index for further processing
stock_returns = stock_returns.reset_index().sort_values('level_1').reset_index()
factors_returns = factors_returns.reset_index().sort_values('level_1').reset_index()

# Create combined DataFrames with calculated returns
stocks_pd_df_with_returns = stocks_pd_df.assign(stock_returns=stock_returns['Close'])
factors_pd_df_with_returns = factors_pd_df.assign(factors_returns=factors_returns['Close'],
                                                  factors_returns_squared=factors_returns['Close']**2)

# Pivot factors DataFrame to get structured columns
factors_pd_df_with_returns = factors_pd_df_with_returns.pivot(index='Date', columns='Symbol', 
                                                              values=['factors_returns', 'factors_returns_squared'])
factors_pd_df_with_returns.columns = factors_pd_df_with_returns.columns.to_series().str.join('_').reset_index()[0]
factors_pd_df_with_returns = factors_pd_df_with_returns.reset_index()

# Display first row and columns of factors DataFrame
print(factors_pd_df_with_returns.head(1))
print(factors_pd_df_with_returns.columns)

# Import required libraries for linear regression
import pandas as pd
from sklearn.linear_model import LinearRegression

# Merge stocks and factors data for linear regression training
stocks_factors_combined_df = pd.merge(stocks_pd_df_with_returns, factors_pd_df_with_returns, how="left", on="Date")
feature_columns = list(stocks_factors_combined_df.columns[-6:])

# Drop NaNs in feature and target columns
with pd.option_context('mode.use_inf_as_na', True):
    stocks_factors_combined_df = stocks_factors_combined_df.dropna(subset=feature_columns + ['stock_returns'])

# Function to compute OLS coefficients
def find_ols_coef(df):
    y = df[['stock_returns']].values
    X = df[feature_columns]
    regr = LinearRegression()
    regr_output = regr.fit(X, y)
    return list(df[['Symbol']].values[0]) + list(regr_output.coef_[0])

# Apply function to get coefficients for each stock
coefs_per_stock = stocks_factors_combined_df.groupby('Symbol').apply(find_ols_coef)
coefs_per_stock = pd.DataFrame(coefs_per_stock).reset_index()
coefs_per_stock.columns = ['symbol', 'factor_coef_list']
coefs_per_stock = pd.DataFrame(coefs_per_stock.factor_coef_list.tolist(), 
                               index=coefs_per_stock.index, columns=['Symbol'] + feature_columns)
coefs_per_stock

# Plot KDE of sample stock returns
samples = factors_returns.loc[factors_returns.Symbol == factors_returns.Symbol.unique()[0]]['Close']
samples.plot.kde()

# Select returns for first three factors
f_1 = factors_returns.loc[factors_returns.Symbol == factors_returns.Symbol.unique()[0]]['Close']
f_2 = factors_returns.loc[factors_returns.Symbol == factors_returns.Symbol.unique()[1]]['Close']
f_3 = factors_returns.loc[factors_returns.Symbol == factors_returns.Symbol.unique()[2]]['Close']
print(f_1.size, len(f_2), f_3.size)

# Compute covariance and mean for returns
factors_returns_cov = pd.DataFrame({'f1': list(f_1)[1:1040], 'f2': list(f_2)[1:1040], 'f3': list(f_3]}).cov().to_numpy()
factors_returns_mean = pd.DataFrame({'f1': list(f_1)[1:1040], 'f2': list(f_2)[1:1040], 'f3': list(f_3)}).mean()

# Generate random samples based on multivariate normal distribution
from numpy.random import multivariate_normal
multivariate_normal(factors_returns_mean, factors_returns_cov)

# Broadcast data for parallel processing
b_coefs_per_stock = spark.sparkContext.broadcast(coefs_per_stock)
b_feature_columns = spark.sparkContext.broadcast(feature_columns)
b_factors_returns_mean = spark.sparkContext.broadcast(factors_returns_mean)
b_factors_returns_cov = spark.sparkContext.broadcast(factors_returns_cov)

# Setup parameters for parallelism and trials
from pyspark.sql.types import IntegerType
parallelism = 1000
num_trials = 1000000
base_seed = 1496
seeds = [b for b in range(base_seed, base_seed + parallelism)]
seedsDF = spark.createDataFrame(seeds, IntegerType()).repartition(parallelism)

# Function to calculate trial returns
import random
from numpy.random import seed
from pyspark.sql.types import LongType, ArrayType, DoubleType
from pyspark.sql.functions import udf

def calculate_trial_return(x):
    trial_return_list = []
    for i in range(int(num_trials / parallelism)):
        random_int = random.randint(0, num_trials * num_trials)
        seed(x)
        random_factors = multivariate_normal(b_factors_returns_mean.value, b_factors_returns_cov.value)
        coefs_per_stock_df = b_coefs_per_stock.value
        returns_per_stock = (coefs_per_stock_df[b_feature_columns.value] * (list(random_factors) + list(random_factors**2)))
        trial_return_list.append(float(returns_per_stock.sum(axis=1).sum() / b_coefs_per_stock.value.size))
    return trial_return_list

# Register UDF for trial returns calculation
udf_return = udf(calculate_trial_return, ArrayType(DoubleType()))

# Run trials and calculate quantiles
from pyspark.sql.functions import col, explode
trials = seedsDF.withColumn("trial_return", udf_return(col("value")))
trials = trials.select('value', explode('trial_return').alias('trial_return'))
trials.cache()
trials.approxQuantile('trial_return', [0.05], 0.0)

# Calculate and display average trial return for lowest 5%
trials.orderBy(col('trial_return').asc()).\
    limit(int(trials.count() / 20)).\
    agg(fun.avg(col("trial_return"))).show()

# Convert trials to Pandas and plot results
import pandas
mytrials = trials.toPandas()
mytrials.plot.line()
