In [1]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

# Create a SparkContext and an SQLContext to read the signal parquet file
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

# Read the signal parquet file
signalDF = sqlContext.read.parquet("C:\\Users\\This PC\\Desktop\\bnbm\\hw2_signal.parquet")

# Filter the data to include only lag(prc) greater than $5, hexcd (1,2,3), sharecode (10,11) and lag market cap not in the bottom 20%
filteredDF = signalDF.filter((signalDF.lag_prc > 5) & (signalDF.hexc in (1,2,3)) & (signalDF.sharecode in (10,11)) & (signalDF.lag_mktcap_pct > 0.2))

# Compute the equal-weighted and value-weighted portfolios re-balancing on a monthly basis
equalWeightedPortfolio = filteredDF.groupBy("date").agg({"return": "avg"}).withColumnRenamed("avg(return)", "equal_weighted_return")
valueWeightedPortfolio = filteredDF.groupBy("date").agg({"lag_mktcap": "avg", "return": "avg"}).withColumnRenamed("avg(lag_mktcap)", "value_weighted_mktcap").withColumnRenamed("avg(return)", "value_weighted_return")
valueWeightedPortfolio = valueWeightedPortfolio.withColumn("value_weighted_return", valueWeightedPortfolio["value_weighted_return"]/valueWeightedPortfolio["value_weighted_mktcap"])

# Does the equal or value-weighted strategy have a higher average return?
equalWeightedAverageReturn = equalWeightedPortfolio.agg({"equal_weighted_return": "avg"}).collect()[0][0]
valueWeightedAverageReturn = valueWeightedPortfolio.agg({"value_weighted_return": "avg"}).collect()[0][0]

if equalWeightedAverageReturn > valueWeightedAverageReturn:
    print("The equal-weighted strategy has a higher average return")
else:
    print("The value-weighted strategy has a higher average return")

# Present the bar-plot of equal-weighted returns and value-weighted returns
import matplotlib.pyplot as plt

equalWeightedReturns = equalWeightedPortfolio.collect()
valueWeightedReturns = valueWeightedPortfolio.collect()

equalWeightedDates = [x[0] for x in equalWeightedReturns]
equalWeightedReturns = [x[1] for x in equalWeightedReturns]
valueWeightedDates = [x[0] for x in valueWeightedReturns]
valueWeightedReturns = [x[2] for x in valueWeightedReturns]

plt.bar(equalWeightedDates, equalWeightedReturns, width=0.5, label="Equal-Weighted Returns")
plt.bar(valueWeightedDates, valueWeightedReturns, width=0.5, label="Value-Weighted Returns")
plt.legend()
plt.show()

# P&L curve
equalWeightedCumReturns = [sum(equalWeightedReturns[0:i+1]) for i in range(len(equalWeightedReturns))]
valueWeightedCumReturns = [sum(valueWeightedReturns[0:i+1]) for i in range(len(valueWeightedReturns))]

plt.plot(equalWeightedDates, equalWeightedCumReturns, label="Equal-Weighted Cumulative Returns")
plt.plot(valueWeightedDates, valueWeightedCumReturns, label="Value-Weighted Cumulative Returns")
plt.legend()
plt.show()

# Present the factor loadings to the four factor model and five factor model.
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Vectorize the feature columns
vectorAssembler = VectorAssembler(inputCols=["lag_prc_pct_change", "lag_mktcap_pct_change", "lag_beta", "lag_divyield", "mom"], outputCol="features")
vectorizedDF = vectorAssembler.transform(filteredDF)

# Create a LinearRegression model
lr = LinearRegression(featuresCol="features", labelCol="return", predictionCol="prediction")

# Fit the model to the data
lrModel = lr.fit(vectorizedDF)

# Print the factor loadings
print("Intercept:", lrModel.intercept)
print("Coefficients:", lrModel.coefficients)



AttributeError: 'DataFrame' object has no attribute 'lag_prc'

In [1]:
import pandas as pd
import numpy as np

# Read file
data = pd.read_parquet("C:\\Users\\This PC\\Desktop\\bnbm\\hw2_signal.parquet")

# Filter data according to lag(prc) greater than $5, hexcd (1,2,3), sharecode (10,11) and lag market cap not in the bottom 20% that month
filtered_data = data[(data['lag_prc'] > 5) & ((data['hexcds'] == 1) | (data['hexcds'] == 2) | (data['hexcds'] == 3)) & ((data['sharecode'] == 10) | (data['sharecode'] == 11)) & (data['lag_mkvalt'] > data['lag_mkvalt'].quantile(0.2))]

# Compute the equal-weighted and value-weighted portfolios re-balancing on a monthly basis
# Equal-Weighted Portfolio
ew_port = filtered_data.groupby(['date']).apply(lambda x: x/x.sum())
ew_port['ew_port_ret'] = ew_port.groupby(['date'])['ret'].transform('mean')

#Value-Weighted Portfolio
vw_port = filtered_data.groupby(['date']).apply(lambda x: x*x['mkvalt']/x['mkvalt'].sum())
vw_port['vw_port_ret'] = vw_port.groupby(['date'])['ret'].transform('mean')

# Does the equal or value-weighted strategy have a higher average return?
print(f'Equal-Weighted Portfolio Average Return: {ew_port["ew_port_ret"].mean():.2f}')
print(f'Value-Weighted Portfolio Average Return: {vw_port["vw_port_ret"].mean():.2f}')

# Is it common for the equal or value-weighted strategies to have higher returns?
print('It is common for the value-weighted strategies to have higher returns.')

# Present the bar-plot of equal-weighted returns and value-weighted returns
import matplotlib.pyplot as plt
plt.bar(["Equal-Weighted","Value-Weighted"], [ew_port["ew_port_ret"].mean(), vw_port["vw_port_ret"].mean()])
plt.title("Average Returns")
plt.show()

# P&L curve 
plt.plot(ew_port.groupby(['date'])['ew_port_ret'].cumsum(), label="Equal-Weighted")
plt.plot(vw_port.groupby(['date'])['vw_port_ret'].cumsum(), label="Value-Weighted")
plt.title("Cumulative Returns")
plt.legend()
plt.show()

ImportError: Unable to find a usable engine; tried using: 'pyarrow', 'fastparquet'.
A suitable version of pyarrow or fastparquet is required for parquet support.
Trying to import the above resulted in these errors:
 - Missing optional dependency 'pyarrow'. pyarrow is required for parquet support. Use pip or conda to install pyarrow.
 - Missing optional dependency 'fastparquet'. fastparquet is required for parquet support. Use pip or conda to install fastparquet.