# Day Trading

The dataset for this problem can be downloaded in zipped form from:

https://rotman-cloud-data.s3.amazonaws.com/finance/finance.zip

Alternatively, if you're running on an S3-enabled cluster, you can operate directly on the files in S3 by using the wildcard path:

s3://rotman-cloud-data/finance/Stocks/*.txt

## Initializing Spark

In [1]:
!pip install pyspark



In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

In [3]:
spark = SparkSession.builder \
    .appName("Day Trading") \
    .getOrCreate()

## Loading the Data

Each stock ticker can be found in the corresponding CSV in `finance/Stocks/<ticker>.us.txt`. The ETF we're benchmarking against is in `finance/ETFs/spy.us.txt`. Each individual file can be loaded into a Spark DataFrame as follows:

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [5]:
# MODIFY THIS LINE with the path to where you saved the unzipped datafiles on your Google drive
path = '/content/drive/MyDrive/ColabNotebooks/8431/finance'

In [6]:
!unzip /content/drive/MyDrive/ColabNotebooks/8431/finance.zip > /dev/null

In [5]:
stocks = spark.read.csv('Stocks/*.txt')
etfs = spark.read.csv('ETFs/*.txt')

# S & P 500 Benchmark Development

In [6]:
sp500 = spark.read.option("header", True).option("inferSchema", True).csv("ETFs/spy.us.txt")

In [7]:
#Create month and year variables
sp500=sp500.withColumn('Month', F.split(sp500['Date'],'-').getItem(1))
sp500=sp500.withColumn('Year', F.split(sp500['Date'],'-').getItem(0))

In [8]:
#register a temporary table for further analysis
sp500.registerTempTable("SP500")



In [9]:
sp500=spark.sql("""
SELECT Year,Month,AVG(Close-Open) AS Avg_Performance 
from sp500
WHERE DATE BETWEEN '2007-01-01' AND '2018-12-31'
GROUP BY Year,Month
ORDER BY Year,MONTH ASC
""")

In [10]:
sp500.show()

+----+-----+--------------------+
|Year|Month|     Avg_Performance|
+----+-----+--------------------+
|2007|   01| 0.06749999999999971|
|2007|   02|-0.13000000000000292|
|2007|   03|0.040909090909089875|
|2007|   04| 0.19399999999999765|
|2007|   05| 0.11954545454545305|
|2007|   06|-0.11571428571428874|
|2007|   07| -0.2904761904761943|
|2007|   08|-0.04000000000000131|
|2007|   09| 0.21631578947368643|
|2007|   10|-0.01434782608695...|
|2007|   11| -0.3123809523809518|
|2007|   12|-0.28400000000000036|
|2008|   01|-0.07333333333333363|
|2008|   02|0.013500000000000512|
|2008|   03| -0.0855000000000004|
|2008|   04|   0.114545454545455|
|2008|   05| 0.03666666666666783|
|2008|   06| -0.3961904761904752|
|2008|   07|-0.15136363636363692|
|2008|   08|  0.1833333333333344|
+----+-----+--------------------+
only showing top 20 rows



average performance on a monthly basis was determined by taking the daily  performance (close-open price), and then aggregating by month and finding the average of all of those values.

In [11]:
#convert to pandas dataframe
sp500=sp500.toPandas()

In [12]:
sp500=sp500.rename(columns={"Avg_Performance":"sp500_Avg_Performance"})

In [13]:
sp500

Unnamed: 0,Year,Month,sp500_Avg_Performance
0,2007,01,0.067500
1,2007,02,-0.130000
2,2007,03,0.040909
3,2007,04,0.194000
4,2007,05,0.119545
...,...,...,...
126,2017,07,0.098500
127,2017,08,-0.044783
128,2017,09,0.118500
129,2017,10,0.129545


Thus we must find 100 stocks who's monthly average performance exceeds the S&P 500 every month for the past 10 years

# Top 100 performing stocks

In [14]:
stocks = spark.read \
    .option("header", True) \
    .option("inferSchema", True) \
    .csv("Stocks/*.us.txt") \
    .withColumn("filename", F.input_file_name())

In [15]:
stocks.show(truncate=False)

+----------+------+------+------+------+-------+-------+------------------------------+
|Date      |Open  |High  |Low   |Close |Volume |OpenInt|filename                      |
+----------+------+------+------+------+-------+-------+------------------------------+
|1962-01-02|0.6277|0.6362|0.6201|0.6201|2575579|0      |file:/content/Stocks/ge.us.txt|
|1962-01-03|0.6201|0.6201|0.6122|0.6201|1764749|0      |file:/content/Stocks/ge.us.txt|
|1962-01-04|0.6201|0.6201|0.6037|0.6122|2194010|0      |file:/content/Stocks/ge.us.txt|
|1962-01-05|0.6122|0.6122|0.5798|0.5957|3255244|0      |file:/content/Stocks/ge.us.txt|
|1962-01-08|0.5957|0.5957|0.5716|0.5957|3696430|0      |file:/content/Stocks/ge.us.txt|
|1962-01-09|0.5957|0.6037|0.5878|0.5957|2778285|0      |file:/content/Stocks/ge.us.txt|
|1962-01-10|0.5957|0.6037|0.5957|0.5957|2337096|0      |file:/content/Stocks/ge.us.txt|
|1962-01-11|0.5957|0.5957|0.5878|0.5957|1943605|0      |file:/content/Stocks/ge.us.txt|
|1962-01-12|0.5957|0.6037|0.5878

In [16]:
#create a month and year variable
stocks=stocks.withColumn('Year', F.split(stocks['Date'],'-').getItem(0))
stocks=stocks.withColumn('Month', F.split(stocks['Date'],'-').getItem(1))

In [17]:
stocks.registerTempTable('UpdatedStocks')



In [18]:
df = spark.sql("""
SELECT filename,Year,Month,AVG(Close-Open) as Avg_Performance
FROM UpdatedStocks
WHERE Date BETWEEN '2007-01-01' AND '2017-12-31'
GROUP BY filename,Year,Month
ORDER BY Month ASC
""")


In [19]:
#get the ticker symbol
df=df.withColumn('splitticker', F.split(df['filename'], '/').getItem(3))
df=df.withColumn('ticker', F.split(df['splitticker'], '\.').getItem(0))

In [20]:
df=df.toPandas()

In [21]:
df

Unnamed: 0,filename,Year,Month,Avg_Performance,splitticker,ticker
0,file:/content/Stocks/utx.us.txt,2015,01,0.038500,utx.us.txt,utx
1,file:/content/Stocks/ibm.us.txt,2014,01,-0.059048,ibm.us.txt,ibm
2,file:/content/Stocks/hon.us.txt,2017,01,0.018000,hon.us.txt,hon
3,file:/content/Stocks/dis.us.txt,2013,01,0.175571,dis.us.txt,dis
4,file:/content/Stocks/hon.us.txt,2014,01,-0.028000,hon.us.txt,hon
...,...,...,...,...,...,...
587981,file:/content/Stocks/eacqu.us.txt,2016,12,0.040000,eacqu.us.txt,eacqu
587982,file:/content/Stocks/sbnyw.us.txt,2016,12,0.000000,sbnyw.us.txt,sbnyw
587983,file:/content/Stocks/lexeb.us.txt,2016,12,0.000000,lexeb.us.txt,lexeb
587984,file:/content/Stocks/dtyl.us.txt,2016,12,0.000000,dtyl.us.txt,dtyl


In [22]:
df=df.drop(['filename','splitticker'],axis=1)

In [23]:
df

Unnamed: 0,Year,Month,Avg_Performance,ticker
0,2015,01,0.038500,utx
1,2014,01,-0.059048,ibm
2,2017,01,0.018000,hon
3,2013,01,0.175571,dis
4,2014,01,-0.028000,hon
...,...,...,...,...
587981,2016,12,0.040000,eacqu
587982,2016,12,0.000000,sbnyw
587983,2016,12,0.000000,lexeb
587984,2016,12,0.000000,dtyl


In [24]:
df=df.sort_values(by=['ticker','Year','Month'])

In [25]:
df

Unnamed: 0,Year,Month,Avg_Performance,ticker
25517,2007,01,-0.082650,a
49691,2007,02,-0.002737,a
97700,2007,03,0.090091,a
169970,2007,04,0.018050,a
219094,2007,05,0.152000,a
...,...,...,...,...
314798,2017,07,-0.222000,zyne
364469,2017,08,0.090870,zyne
414906,2017,09,-0.176000,zyne
491041,2017,10,-0.027218,zyne


In [26]:
#merge the sp500 data with the monthly stock data
df=df.merge(sp500,on=['Year','Month'],how='left')
df.head(132)

Unnamed: 0,Year,Month,Avg_Performance,ticker,sp500_Avg_Performance
0,2007,01,-0.082650,a,0.067500
1,2007,02,-0.002737,a,-0.130000
2,2007,03,0.090091,a,0.040909
3,2007,04,0.018050,a,0.194000
4,2007,05,0.152000,a,0.119545
...,...,...,...,...,...
127,2017,08,0.082565,a,-0.044783
128,2017,09,-0.059000,a,0.118500
129,2017,10,0.057273,a,0.129545
130,2017,11,-0.112500,a,0.250000


In [28]:
#Check whether the stocks average performance at that year at that month beat the average performance of the sp500 at that time
df['BeatSP500']=df['Avg_Performance']>df['sp500_Avg_Performance']
df.head(132)

Unnamed: 0,Year,Month,Avg_Performance,ticker,sp500_Avg_Performance,BeatSP500
0,2007,01,-0.082650,a,0.067500,False
1,2007,02,-0.002737,a,-0.130000,True
2,2007,03,0.090091,a,0.040909,True
3,2007,04,0.018050,a,0.194000,False
4,2007,05,0.152000,a,0.119545,True
...,...,...,...,...,...,...
127,2017,08,0.082565,a,-0.044783,True
128,2017,09,-0.059000,a,0.118500,False
129,2017,10,0.057273,a,0.129545,False
130,2017,11,-0.112500,a,0.250000,False


In [29]:
#Every time a true value occurs 1 is added to the tickers Beat SP500 sum, meaning at that year at that month the stock had a higher average performance then the sp500
groupeddf=df.groupby(by=['ticker']).agg({'BeatSP500':'sum'})

In [37]:
groupeddf[groupeddf.BeatSP500>132]

Unnamed: 0_level_0,BeatSP500
ticker,Unnamed: 1_level_1


Based off my definition of the average performance on a monthly basis, it seems not a single stock was able to consistently beat the sp500 every month for 10 years!

In [38]:
groupeddf[groupeddf.BeatSP500>90]

Unnamed: 0_level_0,BeatSP500
ticker,Unnamed: 1_level_1
nvr,91


The stock able to do this most was nvr, which was able to beat the sp500 the most out of any stock in the dataset

In [58]:
groupeddf[groupeddf.BeatSP500>71]

Unnamed: 0_level_0,BeatSP500
ticker,Unnamed: 1_level_1
acn,84
adbe,73
ads,77
adsk,73
amg,73
...,...
wcg,72
wina,78
wltw,74
wtm,81


I would definitely keep an eye on the 93 stocks above, as they were all able to beat the sp500 over 71 times based on average performance on a monthly basis! Very impressive to see such consistent results! 