In [2]:
# Mount my Google Drive to access datasets
from google.colab import drive
drive.mount('/content/drive')

# Preperation environment for Colab + PySpark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

Mounted at /content/drive


In [3]:
"""
2. Data understanding
2.1. Collecting Initial Data
"""
#!pip install yfinance
import yfinance as yf
import pandas as pd

# Source is from internet - Nasdaq 100 companies from Wikipedia
source_url = "https://en.wikipedia.org/wiki/Nasdaq-100"
source_url_tables = pd.read_html(source_url)
source_url_onetable = source_url_tables[4]
ticker_list = source_url_onetable['Ticker'].to_list()


In [None]:
# Define date, collect 1 year data = 365 days
from datetime import date
from datetime import timedelta
end_date = date.today()
start_date = end_date - timedelta(days=365 * 6)
# Define timestamp
previous_30date = pd.to_datetime(end_date - timedelta(days=30), format='%Y-%m-%d')
df_list = list() # Use list to save dataframe from each loop
# Download data ordered by ticker
for ticker in ticker_list:
    data = yf.download(ticker, period="max", start=start_date, end=end_date)
    data['Ticker'] = ticker
    df_list.append(data)
# Merge the dataframe row by row and, use Index as the first column
df = pd.concat(df_list).reset_index()
# Export dataframe to csv
df.to_csv("nasdaq.csv")


In [None]:
"""
2.2. Describing Data
"""
# Schemas can only be inferred for CSV files.
df = spark.read.csv('nasdaq.csv', inferSchema=True, header=True)
df.printSchema()

In [54]:
# Let's get a better look at the data.
# We know that we can show a DataFrame, but that's resulted in a mess!
df.show()

+---+----------+------------------+------------------+------------------+------------------+------------------+--------+------+
|_c0|      Date|              Open|              High|               Low|             Close|         Adj Close|  Volume|Ticker|
+---+----------+------------------+------------------+------------------+------------------+------------------+--------+------+
|  0|2017-10-11|152.08999633789062|             154.0|151.35000610351562|153.64999389648438|153.64999389648438| 2791000|  ADBE|
|  1|2017-10-12|153.72000122070312|154.85000610351562|153.38999938964844|153.61000061035156|153.61000061035156| 2430000|  ADBE|
|  2|2017-10-13| 153.9600067138672|154.58999633789062|153.05999755859375|153.92999267578125|153.92999267578125| 2515600|  ADBE|
|  3|2017-10-16|151.14999389648438|153.07000732421875|149.27999877929688| 150.4600067138672| 150.4600067138672| 3833200|  ADBE|
|  4|2017-10-17|150.52000427246094|150.92999267578125|148.14999389648438| 150.3800048828125| 150.3800048

In [9]:
# Instead, let's just grab the first row. Much neater!
df.head(1)

[Row(_c0=0, Date='2017-10-11', Open=152.08999633789062, High=154.0, Low=151.35000610351562, Close=153.64999389648438, Adj Close=153.64999389648438, Volume=2791000, Ticker='ADBE')]

In [49]:
# Let's get a better look at the data from other perspective, like how many rows (count=145492), max and min for the Close price.
df.describe().show()

+-------+----------------+----------+------------------+------------------+------------------+------------------+------------------+--------------------+------+
|summary|             _c0|      Date|              Open|              High|               Low|             Close|         Adj Close|              Volume|Ticker|
+-------+----------------+----------+------------------+------------------+------------------+------------------+------------------+--------------------+------+
|  count|          145492|    145492|            145492|            145492|            145492|            145492|            145492|              145492|145492|
|   mean|         72745.5|      null| 178.8071887418212|181.30678334575651|176.26222506107905|178.83487157117764|175.85999965906484|1.0784285464121738E7|  null|
| stddev|42000.0670197878|      null| 258.0155303436888| 261.4380846861706|254.51924366303192| 257.9917186679015| 258.1051181763564|2.4557877943180673E7|  null|
|    min|               0|2017-10-

In [20]:
# We can also use Python within the DataFrame filter method, have a look on few rows when AAPL did break through the 150 price.
df.filter((df['Open'] < 150) & (df['Close'] > 150) & (df['Ticker'] == 'AAPL')).select('Open','Close','High','Low').show()

+------------------+------------------+------------------+------------------+
|              Open|             Close|              High|               Low|
+------------------+------------------+------------------+------------------+
| 148.5399932861328| 151.1199951171875|151.19000244140625|146.47000122070312|
|             149.0| 153.1199951171875|153.49000549316406|148.61000061035156|
|149.82000732421875|152.57000732421875| 153.1699981689453|149.72000122070312|
|148.66000366210938|150.02000427246094|151.57000732421875|148.64999389648438|
|149.94000244140625|             151.0|151.49000549316406|149.33999633789062|
| 147.8300018310547| 151.2100067138672|151.27000427246094|146.86000061035156|
|149.77999877929688| 150.1699981689453|150.86000061035156| 148.1999969482422|
| 147.9199981689453|             151.0|151.22999572753906|146.91000366210938|
|149.30999755859375|154.47999572753906|154.55999755859375|149.10000610351562|
|149.66000366210938|150.77000427246094|153.77000427246094|149.63

In [23]:
# Let's pick a row of data for AAPL with a low of $157.5 and collect it.
employeeResult = df.filter((df['High'] == 157.5) & (df['Ticker'] == 'AAPL')).collect()

In [24]:
# When we collect it, you may notice an interesting format.
employeeResult

[Row(_c0=18570, Date='2022-10-28', Open=148.1999969482422, High=157.5, Low=147.82000732421875, Close=155.74000549316406, Adj Close=154.82154846191406, Volume=164762400, Ticker='AAPL')]

In [27]:
# We can select the first row of data to shed the outer brackets.
employeeRow = employeeResult[0]

employeeRow

Row(_c0=18570, Date='2022-10-28', Open=148.1999969482422, High=157.5, Low=147.82000732421875, Close=155.74000549316406, Adj Close=154.82154846191406, Volume=164762400, Ticker='AAPL')

In [28]:
# And then visualise it simply as a dictionary.
employeeRow.asDict()

{'_c0': 18570,
 'Date': '2022-10-28',
 'Open': 148.1999969482422,
 'High': 157.5,
 'Low': 147.82000732421875,
 'Close': 155.74000549316406,
 'Adj Close': 154.82154846191406,
 'Volume': 164762400,
 'Ticker': 'AAPL'}

In [29]:
# Why convert it into a dictionary? Because dictionaries have a lot of methods available.
# For example, we can simply call volume from the dictionary.
employeeRow.asDict()['Volume']

164762400

In [30]:
# Let's import the relevant functions.
from pyspark.sql.functions import dayofmonth,month,hour,year,format_number

In [58]:
# And create a new column using the year function to manipulate date.
df_with_year = df.withColumn("Year",year(df["Date"]))

df_with_year.head(1)

[Row(_c0=0, Date='2017-10-11', Open=152.08999633789062, High=154.0, Low=151.35000610351562, Close=153.64999389648438, Adj Close=153.64999389648438, Volume=2791000, Ticker='ADBE', Year=2017)]

In [32]:
# Now let's sumamrise the data by year, find the mean of each year and select the two columns we'd like to see.
df_summary = df_with_year.groupBy("Year").mean().select(['Year','avg(Close)'])
df_summary.show()

+----+------------------+
|Year|        avg(Close)|
+----+------------------+
|2018|123.54981350337941|
|2023|224.49420418284572|
|2022| 200.9489254671644|
|2019| 133.9607567078729|
|2020|168.76878581905427|
|2017|  113.061506074052|
|2021|238.16316005920643|
+----+------------------+



In [33]:
# To make it more visually appealing, let's format the mean to two decimal places.
df_formatted = df_summary.select(['Year', format_number("avg(Close)",2)])
df_formatted.show()

+----+----------------------------+
|Year|format_number(avg(Close), 2)|
+----+----------------------------+
|2018|                      123.55|
|2023|                      224.49|
|2022|                      200.95|
|2019|                      133.96|
|2020|                      168.77|
|2017|                      113.06|
|2021|                      238.16|
+----+----------------------------+



In [57]:
# Let's change the name of the column to something that makes sense.
df_renamed = df_formatted.withColumnRenamed("format_number(avg(Close), 2)","Average Closing Price")
df_renamed.show()

+----+---------------------+
|Year|Average Closing Price|
+----+---------------------+
|2018|               123.55|
|2023|               224.49|
|2022|               200.95|
|2019|               133.96|
|2020|               168.77|
|2017|               113.06|
|2021|               238.16|
+----+---------------------+



In [56]:
# And finally order it by year.
df_renamed.orderBy('Year').show()

+----+---------------------+
|Year|Average Closing Price|
+----+---------------------+
|2017|               113.06|
|2018|               123.55|
|2019|               133.96|
|2020|               168.77|
|2021|               238.16|
|2022|               200.95|
|2023|               224.49|
+----+---------------------+



In [52]:
# We can also group, and show the averages of each group, let's see which company on average worth the most money.
df.groupBy('Ticker').mean().orderBy('avg(Volume)', ascending=False).show()

+------+--------+------------------+------------------+------------------+------------------+------------------+--------------------+
|Ticker|avg(_c0)|         avg(Open)|         avg(High)|          avg(Low)|        avg(Close)|    avg(Adj Close)|         avg(Volume)|
+------+--------+------------------+------------------+------------------+------------------+------------------+--------------------+
|  TSLA|130145.5| 137.9955623459753|141.13868465474178|134.62149467455615|137.97069846540293|137.97069846540293| 1.321278587984085E8|
|  AAPL| 18052.5|104.95773725484347|106.17488244863657|103.82332218483842|105.05079079180244| 103.5167384084403| 1.093933996438992E8|
|  AMZN|  9004.5| 116.9237798746448|118.33910384797923|115.37476767279425| 116.8727095019596| 116.8727095019596|  8.27053654383289E7|
|   AMD| 10512.5|63.909641896381935| 65.22427057334535| 62.54358097729063|  63.9053315357441|  63.9053315357441|  6.92286457301061E7|
|  NVDA|101691.5|141.92695115921669|144.59982080737856|139.170

In [63]:
# Other than grouping by Ticker, we can group by Year and use aggregation which represents the entire dataset.
df_with_year.groupBy('Year').agg({"Close":"mean"}).orderBy('avg(Close)', ascending=False).show()

+----+------------------+
|Year|        avg(Close)|
+----+------------------+
|2021|238.16316005920643|
|2023|224.49420418284572|
|2022| 200.9489254671644|
|2020|168.76878581905427|
|2019| 133.9607567078729|
|2018|123.54981350337941|
|2017|  113.061506074052|
+----+------------------+



In [66]:
# We can also import SQL functions.
from pyspark.sql.functions import countDistinct,avg,stddev,format_number
df.select(avg('Close').alias('Average Close')).show()

+------------------+
|     Average Close|
+------------------+
|178.83487157117764|
+------------------+



In [67]:
# We can use SQL to do some basic manipulation.
# Here we're getting the standard deviation in sales, formatting it to two decimal places and changing the column name.
dev = df.select(stddev("Close"))

dev.select(format_number('stddev_samp(Close)',2).alias('Close_Standard_Deviation')).show()

+------------------------+
|Close_Standard_Deviation|
+------------------------+
|                  257.99|
+------------------------+



In [109]:
"""
LinearRegression
"""
# Must be included at the beginning of each new notebook. Remember to change the app name.
import findspark
findspark.init('/home/ubuntu/spark-3.2.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('linear_regression_adv').getOrCreate()

# If you're getting an error with numpy, please type 'sudo pip install numpy --user' into the EC2 console.
from pyspark.ml.regression import LinearRegression


In [110]:
# Use Spark to read in the Ecommerce Customers csv file. You can infer csv schemas.
data = spark.read.csv("nasdaq.csv",inferSchema=True,header=True)

# Print the schema of the DataFrame. You can see potential features as well as the predictor - String is not supported.
data.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Ticker: string (nullable = true)



In [79]:
# A simple for loop allows us to give an example what column values that each rows contain.
for item in data.head():
    print(item)

0
2017-10-11
152.08999633789062
154.0
151.35000610351562
153.64999389648438
153.64999389648438
2791000
ADBE


In [95]:
"""
Setting Up a DataFrame for Machine Learning (MLlib)
We need to do a few things before Spark can accept the data for machine learning. First of all, it needs to be in the form of two columns: label and features. Unlike the documentation example, this data is messy. We'll need to combine all of the features into a single vector. VectorAssembler simplifies the process.
"""
# Import VectorAssembler and Vectors
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

# The input columns are the feature column names, and the output column is what you'd like the new column to be named.
assembler = VectorAssembler(
    inputCols=["Open", "High", "Low",
               "Volume"],
    outputCol="features")


In [96]:
# Now that we've created the assembler variable, let's actually transform the data.
output = assembler.transform(data)

# Using print schema, you see that the features output column has been added.
output.printSchema()

# You can see that the features column is a dense vector that combines the various features as expected.
output.head(1)

root
 |-- _c0: integer (nullable = true)
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Ticker: string (nullable = true)
 |-- features: vector (nullable = true)



[Row(_c0=0, Date='2017-10-11', Open=152.08999633789062, High=154.0, Low=151.35000610351562, Close=153.64999389648438, Adj Close=153.64999389648438, Volume=2791000, Ticker='ADBE', features=DenseVector([152.09, 154.0, 151.35, 2791000.0]))]

In [97]:
# Let's select two columns (the feature and predictor).
# This is now in the appropriate format to be processed by Spark.
final_data = output.select("features",'Close')
final_data.show()

+--------------------+------------------+
|            features|             Close|
+--------------------+------------------+
|[152.089996337890...|153.64999389648438|
|[153.720001220703...|153.61000061035156|
|[153.960006713867...|153.92999267578125|
|[151.149993896484...| 150.4600067138672|
|[150.520004272460...| 150.3800048828125|
|[150.330001831054...|             153.0|
|[165.5,172.149993...|171.72999572753906|
|[171.5,175.869995...|175.63999938964844|
|[175.679992675781...|172.16000366210938|
|[171.470001220703...| 171.5800018310547|
|[170.369995117187...| 171.8300018310547|
|[172.059997558593...|            173.75|
|[174.020004272460...| 177.3300018310547|
|[177.0,177.479995...|176.02999877929688|
|[176.160003662109...|175.16000366210938|
|[176.589996337890...|            176.25|
|[178.020004272460...|180.94000244140625|
|[181.759994506835...| 182.3000030517578|
|[182.009994506835...| 180.8000030517578|
|[181.229995727539...| 180.8800048828125|
+--------------------+------------

In [159]:
# Data Partioning: Split the training and testing set and verify.
train_data,test_data = final_data.randomSplit([0.7,0.3])
train_data.describe().show()
test_data.describe().show()

+-------+------------------+
|summary|             Close|
+-------+------------------+
|  count|             26757|
|   mean|231.35819262774282|
| stddev|311.98979676343623|
|    min|  5.03000020980835|
|    max|    3234.330078125|
+-------+------------------+

+-------+------------------+
|summary|             Close|
+-------+------------------+
|  count|             11388|
|   mean|238.39748356716652|
| stddev| 335.9351834119386|
|    min| 5.130000114440918|
|    max| 3243.010009765625|
+-------+------------------+



In [100]:
"""
Now we can create a Linear Regression Model object. Because the feature column is named 'features',
we don't have to worry about it. However, as the labelCol isn't the default name, we have to specify it's name (Close).
"""
lr = LinearRegression(labelCol='Close')

# Fit the model to the data.
lrModel = lr.fit(train_data)

# Print the coefficients and intercept for linear regression.
print("Coefficients: {} Intercept: {}".format(lrModel.coefficients,lrModel.intercept))

Coefficients: [-0.5835194810999815,0.8001081043837726,0.7833741653000084,1.7922392355944063e-10] Intercept: 0.03374444176056932


In [None]:
# Let's evaluate the model against the test data.
test_results = lrModel.evaluate(test_data)

# Interesting results! This shows the difference between the predicted value and the test data.
test_results.residuals.show()

# Let's get some evaluation metrics (as discussed in the previous linear regression notebook).
print("RSME: {}".format(test_results.rootMeanSquaredError))

In [102]:
# We can also get the R2 value.
print("R2: {}".format(test_results.r2))

R2: 0.9998966524807389


Conclusion:

Looking at RMSE and R2, we can see that the model is quite accurate. The RMSE shows that, on average, there's only around 2.65 price discrepancy between the actual and predicted results. Comparing this to the table below, the average close (178.8) and standard deviation (257.99), a 2.65 error is surprisingly good.

The R2 also shows that the model accounts for 99.989% of the variance in the data.

In [103]:
final_data.describe().show()

+-------+------------------+
|summary|             Close|
+-------+------------------+
|  count|            145492|
|   mean|178.83487157117764|
| stddev| 257.9917186679015|
|    min|1.2599999904632568|
|    max| 3243.010009765625|
+-------+------------------+



In [107]:
"""
But what if we didn't have the predictor data?
This isn't really relevant to your assignment, but useful in a real-world scenario.
What if you have all of these features but no predictor data? How do you actually use the model you've created? Check out the example below.
"""
# Let's just select the features column (removing the label column).
unlabeled_data = test_data.select('features')
unlabeled_data.show()

+--------------------+
|            features|
+--------------------+
|[1.37000000476837...|
|[1.39999997615814...|
|[1.39999997615814...|
|[1.5,1.5299999713...|
|[1.89999997615814...|
|[2.02999997138977...|
|[2.06999993324279...|
|[2.07999992370605...|
|[2.09999990463256...|
|[2.09999990463256...|
|[2.16000008583068...|
|[2.23000001907348...|
|[2.31999993324279...|
|[2.36999988555908...|
|[2.38000011444091...|
|[2.40000009536743...|
|[2.5,2.6900000572...|
|[2.61999988555908...|
|[2.65000009536743...|
|[2.67000007629394...|
+--------------------+
only showing top 20 rows



In [108]:
"""
Useful for visualizing the predition data later on
"""
# Now we can transform the unlabeled data.
predictions = lrModel.transform(unlabeled_data)

# It worked! Feeding the unlabeled data features into the model results in a prediction,
# which is also likely to be the "Close" price.
predictions.show()
predictions.head(1)

+--------------------+------------------+
|            features|        prediction|
+--------------------+------------------+
|[1.37000000476837...|1.3569023071146038|
|[1.39999997615814...|1.4185826278289644|
|[1.39999997615814...|1.4662686797620015|
|[1.5,1.5299999713...|1.5342861686812765|
|[1.89999997615814...| 2.046345814094968|
|[2.02999997138977...| 2.025075523344195|
|[2.06999993324279...| 2.159915926835926|
|[2.07999992370605...|2.0121529937350506|
|[2.09999990463256...| 2.016296919621521|
|[2.09999990463256...|2.2378677082473506|
|[2.16000008583068...| 2.084259981675241|
|[2.23000001907348...| 2.361236004439006|
|[2.31999993324279...|2.4185254399764817|
|[2.36999988555908...| 2.637034214927953|
|[2.38000011444091...| 2.296285537722992|
|[2.40000009536743...|2.4346616433904016|
|[2.5,2.6900000572...|2.6074871407280833|
|[2.61999988555908...| 2.504904009050538|
|[2.65000009536743...| 2.867816174665039|
|[2.67000007629394...|2.8414939198843596|
+--------------------+------------

[Row(features=DenseVector([1.37, 1.38, 1.3, 245500.0]), prediction=1.3569023071146038)]

In [112]:
"""
LogisticRegression
"""
# Must be included at the beginning of each new notebook. Remember to change the app name.
import findspark
findspark.init('/home/ubuntu/spark-3.2.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import *
spark = SparkSession.builder.appName('logistic_regression_adv').getOrCreate()

# If you're getting an error with numpy, please type 'sudo pip install numpy --user' into the EC2 console.
from pyspark.ml.classification import LogisticRegression

In [None]:
"""
Amend Dataset with 0/1 predictor
"""
#!pip install yfinance
import yfinance as yf
import pandas as pd

# Source is from internet - Nasdaq 100 companies from Wikipedia
source_url = "https://en.wikipedia.org/wiki/Nasdaq-100"
source_url_tables = pd.read_html(source_url)
source_url_onetable = source_url_tables[4]
ticker_list = source_url_onetable['Ticker'].to_list()

# Start financial data download
from datetime import date
from datetime import timedelta

# Define date, collect 1 year data = 365 days
end_date = date.today()
start_date = end_date - timedelta(days=365 * 2)

# Define timestamp
previous_30date = pd.to_datetime(end_date - timedelta(days=30), format='%Y-%m-%d')

df_list = list() # Use list to save dataframe from each loop
# Download data ordered by ticker
for ticker in ticker_list:
    data = yf.download(ticker, period="max", start=start_date, end=end_date)
    # data.insert(0, 'Ticker', ticker) # To add ticker to (first) column right after index column
    data['Ticker'] = ticker

    # Initialize 'Win_Count' and 'Win_Rate'
    data['Win_Count'] = 0
    data['Win_Loss'] = ""

    # Cal ROI within the future 30 days respectively， add and save in new columns, shift() to rows up and diff
    for future_days in range(30):
        data[f"ROI_T+{future_days}"] = round(data['High'].shift(-future_days) / data['Open'] - 1, 7)
        # Count and top up 'Win_Count' when there's matched ROI >= 0.08
        data.loc[data[f"ROI_T+{future_days}"] >= 0.08, 'Win_Count'] = data['Win_Count'] + 1

    # Calculate the past MA, need to shift(past_days) rows down
    data['MA_50'] = 0
    for past_days in range(50):
        data['MA_50'] = data['MA_50'] + data['Close'].shift(past_days)

    data['MA_100'] = 0
    for past_days in range(100):
        data['MA_100'] = data['MA_100'] + data['Close'].shift(past_days)

    data['MA_200'] = 0
    for past_days in range(200):
        data['MA_200'] = data['MA_200'] + data['Close'].shift(past_days)

    df_list.append(data)
# Merge the dataframe row by row and, use Index as the first column, if column by column pls specify "axis=1"
df = pd.concat(df_list).reset_index()

# Basic Stats calculation and adding more attributes
df['MA_50'] = round(df['MA_50'] / 50, 3)
df['MA_100'] = round(df['MA_100'] / 100, 3)
df['MA_200'] = round(df['MA_200'] / 200, 3)

# Assign 'Win_Loss' and MA based on the condition from basic stats calculation
df.loc[df['Win_Count'] > 0, 'Win_Loss'] = "0"
df.loc[df['Win_Count'] == 0, 'Win_Loss'] = "1"
# 'Win_Loss' is still unknown, since no Win so far however the 30 days deadline is not dued yet
df.loc[(df['Win_Count'] == 0) & (df['Date'] > previous_30date), 'Win_Loss'] = ""

df['Yin_Yang'] = ""
df.loc[df['Open'] > df['Close'], 'Yin_Yang'] = "Yin"
df.loc[df['Open'] < df['Close'], 'Yin_Yang'] = "Yang"
df.loc[df['Open'] == df['Close'], 'Yin_Yang'] = "Ping"

df['Fluctuation'] = round(df['High'] / df['Low'] - 1, 3)

# Export dataframe to csv
df.to_csv("nasdaq.csv")

In [None]:
# Read in the CSV data.
data = spark.read.csv('nasdaq.csv',inferSchema=True,header=True)

# Print data schema.
data.printSchema()

In [143]:
# Select a list of relevant columns.
# Name, for example, is somewhat irrelevant.
my_cols = data.select(['Date',
 'Open',
 'High',
 'Low',
 'Close',
 'Volume',
 'Ticker',
 'Win_Loss'])

# Now that we've selected the relevant columns, let's drop the missing data (Data Cleaning).
my_final_data = my_cols.na.drop()
my_final_data.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Ticker: string (nullable = true)
 |-- Win_Loss: integer (nullable = true)



In [144]:
# Working with Categorical Columns - "Strings" as the values
from pyspark.ml.feature import (VectorAssembler,VectorIndexer,
                                OneHotEncoder,StringIndexer)

# First create a string indexer (convert every string into a number, such as male = 0 and female = 1).
# A number will be assigned to every category in the column.
date_indexer = StringIndexer(inputCol='Date',outputCol='DateIndex')

# Now we can one hot encode these numbers. This converts the various outputs into a single vector.
# This makes it easier to process when you have multiple classes.
date_encoder = OneHotEncoder(inputCol='DateIndex',outputCol='DateVec')

# Similar to the above.
ticker_indexer = StringIndexer(inputCol='Ticker',outputCol='TickerIndex')
ticker_encoder = OneHotEncoder(inputCol='TickerIndex',outputCol='TickerVec')

In [145]:
# Now we can assemble all of this as one vector in the features column.
assembler = VectorAssembler(inputCols=['DateVec',
 'Open',
 'High',
 'Low',
 'Close',
 'Volume',
 'TickerVec'],outputCol='features')

In [148]:
# A pipeline sets stages for different steps. Let's see an example of how to use pipelines.
from pyspark.ml import Pipeline

# Note that Win_Loss is a categorial variable but didn't require any transformation.
# That's because it's already in the format of 1's and 0's.
log_reg_nasdaq = LogisticRegression(featuresCol='features',labelCol='Win_Loss')

In [149]:
# Lists everything we want to do. Index data, encode data, assemble data and then pass in the actual model.
pipeline = Pipeline(stages=[date_indexer,ticker_indexer,
                           date_encoder,ticker_encoder,
                           assembler,log_reg_nasdaq])

In [160]:
# Train/test split.
train_nasdaq_data, test_nasdaq_data = my_final_data.randomSplit([0.7,.3])
train_nasdaq_data.describe().show()
test_nasdaq_data.describe().show()


# Note pipeline. Call it as you would call a machine learning object.
fit_model = pipeline.fit(train_nasdaq_data)

# Transform test data.
results = fit_model.transform(test_nasdaq_data)

+-------+----------+-----------------+------------------+------------------+------------------+--------------------+------+-------------------+
|summary|      Date|             Open|              High|               Low|             Close|              Volume|Ticker|           Win_Loss|
+-------+----------+-----------------+------------------+------------------+------------------+--------------------+------+-------------------+
|  count|     34029|            34029|             34029|             34029|             34029|               34029| 34029|              34029|
|   mean|      null|214.7783421778217|218.04239514813673|211.53340979280892|214.84236586052916|   9934958.727614682|  null| 0.5306650210114902|
| stddev|      null|284.5228095344425|288.61287883587534| 280.6751856335386|284.72878395051197|1.9402971760395672E7|  null|0.49906610358023173|
|    min|2021-10-11|3.380000114440918|3.4800000190734863| 3.319999933242798|3.4200000762939453|               22200|  AAPL|             

In [161]:
# Evaluate the model using the binary classifer.
from pyspark.ml.evaluation import BinaryClassificationEvaluator

my_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction',
                                       labelCol='Win_Loss')

In [162]:
# If we select the actual and predicted results, we can see that some predictions were correct while others were wrong.
results.select('Win_Loss','prediction').show()

+--------+----------+
|Win_Loss|prediction|
+--------+----------+
|       0|       0.0|
|       1|       0.0|
|       0|       0.0|
|       1|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       1|       1.0|
|       0|       0.0|
|       1|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       1.0|
|       0|       1.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       1|       1.0|
+--------+----------+
only showing top 20 rows



In [163]:
# We can then evaluate using AUC (area under the curve). AUC is linked to ROC.
AUC = my_eval.evaluate(results)

AUC

0.7394722450269691

In [164]:
# Not all interpretation has to be visualisations. You can also gain a lot of information with text.
# For example, here we're seeing how much variance our model could account for.
# According to this, the model got 10611/14314 correct.
totalResults = results.select('Win_Loss','prediction')

correctResults = totalResults.filter(totalResults['Win_Loss'] == totalResults['prediction'])

countTR = totalResults.count()
print("Total Result: " + str(countTR))

countTC = correctResults.count()
print("Total Correct: " + str(countTC))

Total Result: 14314
Total Correct: 10611


In [226]:
"""
Prepare Dataset with Catagorical predictor W/L
"""
#!pip install yfinance
import yfinance as yf
import pandas as pd

# Source is from internet - Nasdaq 100 companies from Wikipedia
source_url = "https://en.wikipedia.org/wiki/Nasdaq-100"
source_url_tables = pd.read_html(source_url)
source_url_onetable = source_url_tables[4]
ticker_list = source_url_onetable['Ticker'].to_list()

# Start financial data download
from datetime import date
from datetime import timedelta
import numpy as np

# Define date, collect 1 year data = 365 days
end_date = date.today()
start_date = end_date - timedelta(days=365 * 2)

# Define timestamp
previous_30date = pd.to_datetime(end_date - timedelta(days=30), format='%Y-%m-%d')

df_list = list() # Use list to save dataframe from each loop
# Download data ordered by ticker
for ticker in ticker_list:
    data = yf.download(ticker, period="max", start=start_date, end=end_date)
    # data.insert(0, 'Ticker', ticker) # To add ticker to (first) column right after index column
    data['Ticker'] = ticker

    # Initialize 'Win_Count' and 'Win_Rate'
    data['Win_Count'] = 0
    data['Win_Loss'] = ""

    # Cal ROI within the future 30 days respectively， add and save in new columns, shift() to rows up and diff
    for future_days in range(30):
        data[f"ROI_T+{future_days}"] = round(data['High'].shift(-future_days) / data['Open'] - 1, 7)
        # Count and top up 'Win_Count' when there's matched ROI >= 0.08
        data.loc[data[f"ROI_T+{future_days}"] >= 0.08, 'Win_Count'] = data['Win_Count'] + 1

    # Calculate the past MA, need to shift(past_days) rows down
    data['MA_50'] = 0
    for past_days in range(50):
        data['MA_50'] = data['MA_50'] + data['Close'].shift(past_days)

    data['MA_100'] = 0
    for past_days in range(100):
        data['MA_100'] = data['MA_100'] + data['Close'].shift(past_days)

    data['MA_200'] = 0
    for past_days in range(200):
        data['MA_200'] = data['MA_200'] + data['Close'].shift(past_days)

    df_list.append(data)
# Merge the dataframe row by row and, use Index as the first column, if column by column pls specify "axis=1"
df = pd.concat(df_list).reset_index()

# Basic Stats calculation and adding more attributes
df['MA_50'] = round(df['MA_50'] / 50, 3)
df['MA_100'] = round(df['MA_100'] / 100, 3)
df['MA_200'] = round(df['MA_200'] / 200, 3)

# Assign 'Win_Loss' and MA based on the condition from basic stats calculation
df.loc[df['Win_Count'] > 0, 'Win_Loss'] = "W"
df.loc[df['Win_Count'] == 0, 'Win_Loss'] = "L"
# 'Win_Loss' is still unknown, since no Win so far however the 30 days deadline is not dued yet, drop such rows
df.loc[(df['Win_Count'] == 0) & (df['Date'] > previous_30date), 'Win_Loss'] = ""
df['Win_Loss'] = df['Win_Loss'].replace("", np.nan)
df = df.dropna()

df['Yin_Yang'] = ""
df.loc[df['Open'] > df['Close'], 'Yin_Yang'] = "Yin"
df.loc[df['Open'] < df['Close'], 'Yin_Yang'] = "Yang"
df.loc[df['Open'] == df['Close'], 'Yin_Yang'] = "Ping"

df['Fluctuation'] = round(df['High'] / df['Low'] - 1, 3)

# Export dataframe to csv
df.to_csv("nasdaq.csv")

[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%**********************]  1 of 1 completed
[*********************100%%*******

In [227]:
# Must be included at the beginning of each new notebook. Remember to change the app name.
import findspark
findspark.init('/home/ubuntu/spark-3.2.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('tree_methods_adv').getOrCreate()

# Read in the CSV data.
data = spark.read.csv('nasdaq.csv',inferSchema=True,header=True)

In [228]:
"""
Spark Formatting of Data for Tree Methods
"""
# A few things we need to do before Spark can accept the data!
# It needs to be in the form of two columns: "label" and "features".

# Import VectorAssembler and Vectors
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

# Combine all features into one vector named features.
assembler = VectorAssembler(
  inputCols=['Open',
            'High',
            'Low',
            'Close',
            'Volume'],
            outputCol='features')

In [229]:
# Let's transform the data.
output = assembler.transform(data)

In [230]:
# String Handling Part: Import the string indexer (similar to the logistic regression exercises).
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="Win_Loss", outputCol="WinIndex")
output_fixed = indexer.fit(output).transform(output)

# Let's select the two columns we want. Features (which contains vectors), and the predictor.
final_data = output_fixed.select("features","WinIndex")
final_data.show()
final_data.describe().show()

+--------------------+--------+
|            features|WinIndex|
+--------------------+--------+
|[385.079986572265...|     1.0|
|[392.760009765625...|     1.0|
|[404.690002441406...|     1.0|
|[406.510009765625...|     1.0|
|[405.339996337890...|     1.0|
|[412.809997558593...|     1.0|
|[425.450012207031...|     0.0|
|[423.0,434.540008...|     0.0|
|[435.0,440.299987...|     0.0|
|[434.339996337890...|     0.0|
|[441.420013427734...|     0.0|
|[445.260009765625...|     0.0|
|[439.609985351562...|     0.0|
|[445.070007324218...|     0.0|
|[446.119995117187...|     0.0|
|[440.869995117187...|     0.0|
|[438.609985351562...|     0.0|
|[431.579986572265...|     0.0|
|[419.410003662109...|     0.0|
|[410.369995117187...|     0.0|
+--------------------+--------+
only showing top 20 rows

+-------+-------------------+
|summary|           WinIndex|
+-------+-------------------+
|  count|              27318|
|   mean|0.47488835200234275|
| stddev| 0.4993781471597771|
|    min|                0

In [231]:
# Data Partioning: Split the training and testing set and verify.
train_data,test_data = final_data.randomSplit([0.7,0.3])

In [232]:
# Let's import the relevant classifiers.
from pyspark.ml.classification import DecisionTreeClassifier,GBTClassifier,RandomForestClassifier
from pyspark.ml import Pipeline

# Use defaults to make the comparison "fair". This simplifies the comparison process.
dtc = DecisionTreeClassifier(labelCol='WinIndex',featuresCol='features')
rfc = RandomForestClassifier(labelCol='WinIndex',featuresCol='features')
gbt = GBTClassifier(labelCol='WinIndex',featuresCol='features')

In [233]:
# Train the models (it's three models, so it might take some time).
dtc_model = dtc.fit(train_data)
rfc_model = rfc.fit(train_data)
gbt_model = gbt.fit(train_data)

In [234]:
# Test the models
dtc_predictions = dtc_model.transform(test_data)
rfc_predictions = rfc_model.transform(test_data)
gbt_predictions = gbt_model.transform(test_data)

In [237]:
# Evaluate the results
# Let's start off with binary classification.
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Note that the label column isn't named label, it's named PrivateIndex in this case.
my_binary_eval = BinaryClassificationEvaluator(labelCol = 'WinIndex')

# This is the area under the curve. This indicates that the data is highly seperable.
print("Decision Tree Accuracy")
print(my_binary_eval.evaluate(dtc_predictions))

# RFC improves accuracy but also model complexity. RFC outperforms DTC in nearly every situation.
print("Random Forest Accuracy")
print(my_binary_eval.evaluate(rfc_predictions))

# We can't repeat these exact steps for GBT. If you print the schema of all three, you may be able to notice why.
# Instead, let's redefine the object:
my_binary_gbt_eval = BinaryClassificationEvaluator(labelCol='WinIndex', rawPredictionCol='prediction')
print("GBT Accuracy")
print(my_binary_gbt_eval.evaluate(gbt_predictions))

Decision Tree Accuracy
0.4711068424108791
Random Forest Accuracy
0.5936853566265421
GBT Accuracy
0.5869874524190036


In [238]:
# Let's import the evaluator.
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Select (prediction, true label) and compute test error.
acc_evaluator = MulticlassClassificationEvaluator(labelCol="WinIndex", predictionCol="prediction", metricName="accuracy")

dtc_acc = acc_evaluator.evaluate(dtc_predictions)
rfc_acc = acc_evaluator.evaluate(rfc_predictions)
gbt_acc = acc_evaluator.evaluate(gbt_predictions)

# Let's do something a bit more complex in terms of printing, just so it's formatted nicer.
print("Here are the results!")
print('-'*40)
print('A single decision tree has an accuracy of: {0:2.2f}%'.format(dtc_acc*100))
print('-'*40)
print('A random forest ensemble has an accuracy of: {0:2.2f}%'.format(rfc_acc*100))
print('-'*40)
print('An ensemble using GBT has an accuracy of: {0:2.2f}%'.format(gbt_acc*100))

Here are the results!
----------------------------------------
A single decision tree has an accuracy of: 56.79%
----------------------------------------
A random forest ensemble has an accuracy of: 57.15%
----------------------------------------
An ensemble using GBT has an accuracy of: 59.33%
