In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
# Libraies used
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
import pandas as pd
import matplotlib as mpl
import matplotlib.pyplot as plt

In [None]:
# Reference Module 10
# Make sure python and driver version NOT mismatch
import os
import sys
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [None]:
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

print("Using Apache Spark Version", spark.version)

Using Apache Spark Version 3.4.0


In [None]:
# Google drive path

from google.colab import drive
drive.mount('/content/drive')

news_file = "/content/drive/MyDrive/clean_AAPL.csv"

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# news_file = "/content/drive/MyDrive/clean_AAPL.csv"

In [None]:
# Reference Module 10

# data = spark.read.csv("/content/drive/MyDrive/clean_AAPL.csv", header=True, inferSchema=True)

# Read data into spark dataframe
data = spark.read.format("csv") \
               .options(header='true', inferschema='true', treatEmptyValuesAsNulls='true') \
               .load(f"{news_file}")

# Count number of row 
data.count()

4838

In [None]:
from pyspark.sql.functions import col

#Build a new dataframe for NLP processing
data = data.select(col("date"), col("title"), col("body"))
data.show()

+----------+--------------------+--------------------+
|      date|               title|                body|
+----------+--------------------+--------------------+
|2020-02-18|Apple To Miss Q2 ...|Apple Inc. (NASDA...|
|2020-02-18|9 Stocks To Watch...|Some of the stock...|
|2020-02-18|A Peek Into The M...|Pre-open movers U...|
|2020-02-18|30 Stocks Moving ...|Gainers Senmiao T...|
|2020-02-18|EU Rejects Facebo...|Facebook Inc.’s (...|
|2020-02-18|Global Stocks Dro...|Markets in Asia a...|
|2020-02-18|Apple Experts Rea...|"Shares of Apple ...|
|2020-02-18|Mid-Morning Marke...|Following the mar...|
|2020-02-18|Market Taking A S...|It’s shaping up t...|
|2020-02-18|3 ETFs To Watch T...|U.S. markets were...|
|2020-02-18|Walmart's Exposur...|"The coronavirus ...|
|2020-02-18|3 Apple Analysts ...|Apple Inc. (NASDA...|
|2020-02-18|Mid-Day Market Up...|Midway through tr...|
|2020-02-18|46 Stocks Moving ...|Gainers Belleroph...|
|2020-02-18|Mid-Afternoon Mar...|Toward the end of...|
|2020-02-1

In [None]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover
#Define what the tokenizer do, takes 'body' columns and create another column 'word' for results
tokenizer = Tokenizer(inputCol="body", outputCol="words")
#Define name for stopwords, remove stopwords in columns 'words' then output to a new columns 'sw_removed'
stopwordFilter = StopWordsRemover(inputCol = "words", outputCol = 'sw_removed')

#Apply to the dataframe, first tokenize the words, then remove the stop words. 
data_token = tokenizer.transform(data)
data_token_woStop = stopwordFilter.transform(data_token)

#Print this dataframe to see
data_token_woStop.show()

+----------+--------------------+--------------------+--------------------+--------------------+
|      date|               title|                body|               words|          sw_removed|
+----------+--------------------+--------------------+--------------------+--------------------+
|2020-02-18|Apple To Miss Q2 ...|Apple Inc. (NASDA...|[apple, inc., (na...|[apple, inc., (na...|
|2020-02-18|9 Stocks To Watch...|Some of the stock...|[some, of, the, s...|[stocks, may, gra...|
|2020-02-18|A Peek Into The M...|Pre-open movers U...|[pre-open, movers...|[pre-open, movers...|
|2020-02-18|30 Stocks Moving ...|Gainers Senmiao T...|[gainers, senmiao...|[gainers, senmiao...|
|2020-02-18|EU Rejects Facebo...|Facebook Inc.’s (...|[facebook, inc.’s...|[facebook, inc.’s...|
|2020-02-18|Global Stocks Dro...|Markets in Asia a...|[markets, in, asi...|[markets, asia, e...|
|2020-02-18|Apple Experts Rea...|"Shares of Apple ...|["shares, of, app...|["shares, apple, ...|
|2020-02-18|Mid-Morning Marke.

Machine Learning

Vectorizing words into numbers.


In [None]:
from pyspark.ml.feature import Word2Vec

w2v = Word2Vec(seed = 100, minCount = 3, inputCol = 'sw_removed', outputCol = 'wordvector')
w2v_model = w2v.fit(data_token_woStop)

w2v_data = w2v_model.transform(data_token_woStop)
w2v_data.show()

+----------+--------------------+--------------------+--------------------+--------------------+--------------------+
|      date|               title|                body|               words|          sw_removed|          wordvector|
+----------+--------------------+--------------------+--------------------+--------------------+--------------------+
|2020-02-18|Apple To Miss Q2 ...|Apple Inc. (NASDA...|[apple, inc., (na...|[apple, inc., (na...|[-0.0193461150184...|
|2020-02-18|9 Stocks To Watch...|Some of the stock...|[some, of, the, s...|[stocks, may, gra...|[0.00624538739266...|
|2020-02-18|A Peek Into The M...|Pre-open movers U...|[pre-open, movers...|[pre-open, movers...|[-0.0686045814689...|
|2020-02-18|30 Stocks Moving ...|Gainers Senmiao T...|[gainers, senmiao...|[gainers, senmiao...|[-0.0263180864146...|
|2020-02-18|EU Rejects Facebo...|Facebook Inc.’s (...|[facebook, inc.’s...|[facebook, inc.’s...|[-0.0330649789683...|
|2020-02-18|Global Stocks Dro...|Markets in Asia a...|[m

Introduce the Prophet model from Facebook for Time Series Stock Price prediciton

In [None]:
stock_data = "/content/drive/MyDrive/dawjones_tenyears.csv"

In [None]:
stock_data_spark = data = spark.read.format("csv") \
                  .options(header='true', inferschema='true', treatEmptyValuesAsNulls='true') \
                  .load(f"{stock_data}")

In [None]:
from pyspark.sql.functions import col

#Print out the names of all columns
stock_data_spark.columns

['date', 'open', 'high', 'low', 'close', 'adjclose', 'volume', 'ticker']

In [None]:
# Filter only AAPL
apple = stock_data_spark.filter(col('ticker') == 'AAPL')
# Select necessary "date", "close" as features
apple = apple.select(col('date'), col('close'))
apple.count()

2518

In [None]:
apple.head()
apple.tail(2)

[Row(date=datetime.date(2023, 3, 28), close=157.64999389648438),
 Row(date=datetime.date(2023, 3, 29), close=160.77000427246094)]

In [None]:
# import time series model
from prophet import Prophet

In [None]:
# Prep dataframe to fit model 
apple = apple.withColumnRenamed('date', 'ds') \
            .withColumnRenamed('close', 'y')

apple.show()

+----------+------------------+
|        ds|                 y|
+----------+------------------+
|2013-04-01|15.318214416503906|
|2013-04-02|15.349642753601074|
|2013-04-03|15.428214073181152|
|2013-04-04|15.275713920593262|
|2013-04-05|15.114286422729492|
|2013-04-08|15.221785545349121|
|2013-04-09|15.249285697937012|
|2013-04-10|15.560357093811035|
|2013-04-11|15.511786460876465|
|2013-04-12|15.350000381469727|
|2013-04-15|14.994643211364746|
|2013-04-16|15.222857475280762|
|2013-04-17|14.385713577270508|
|2013-04-18|14.001786231994629|
|2013-04-19|13.947500228881836|
|2013-04-22|14.238213539123535|
|2013-04-23|14.504643440246582|
|2013-04-24|14.480713844299316|
|2013-04-25|14.585000038146973|
|2013-04-26|14.899999618530273|
+----------+------------------+
only showing top 20 rows



In [None]:
#Call Prophet model
model = Prophet()

#transform spark dataframe to pandas dataframe for prophet model
df_apple = apple.toPandas()

#fit the model
model.fit(df_apple)

INFO:prophet:Disabling daily seasonality. Run prophet with daily_seasonality=True to override this.
DEBUG:cmdstanpy:input tempfile: /tmp/tmpmzf40cus/9qa_a5ex.json
DEBUG:cmdstanpy:input tempfile: /tmp/tmpmzf40cus/z2o236a9.json
DEBUG:cmdstanpy:idx 0
DEBUG:cmdstanpy:running CmdStan, num_threads: None
DEBUG:cmdstanpy:CmdStan args: ['/usr/local/lib/python3.10/dist-packages/prophet/stan_model/prophet_model.bin', 'random', 'seed=13511', 'data', 'file=/tmp/tmpmzf40cus/9qa_a5ex.json', 'init=/tmp/tmpmzf40cus/z2o236a9.json', 'output', 'file=/tmp/tmpmzf40cus/prophet_modelg68s5ijw/prophet_model-20230501195549.csv', 'method=optimize', 'algorithm=lbfgs', 'iter=10000']
19:55:49 - cmdstanpy - INFO - Chain [1] start processing
INFO:cmdstanpy:Chain [1] start processing
19:55:50 - cmdstanpy - INFO - Chain [1] done processing
INFO:cmdstanpy:Chain [1] done processing


<prophet.forecaster.Prophet at 0x7ffaf0451c60>

In [None]:
# Generate 365 days in the future.
future_date = model.make_future_dataframe(periods = 365)
future_date.head()

Unnamed: 0,ds
0,2013-04-01
1,2013-04-02
2,2013-04-03
3,2013-04-04
4,2013-04-05


In [None]:
# Make forecast with the future date data 
forecast = model.predict(future_date)

In [None]:
#Extract only the date, and predicted price
apple_forecast = forecast[['ds', 'yhat']]

#Add a column that specify this is Apple
apple_forecast['ticker'] = 'AAPL'
apple_forecast = apple_forecast.rename(columns = {'ds': 'date', 'yhat': 'price'})

#print out the forecast
apple_forecast

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  apple_forecast['ticker'] = 'AAPL'


Unnamed: 0,date,price,ticker
0,2013-04-01,13.361847,AAPL
1,2013-04-02,13.359300,AAPL
2,2013-04-03,13.430811,AAPL
3,2013-04-04,13.368473,AAPL
4,2013-04-05,13.316242,AAPL
...,...,...,...
2878,2024-03-24,161.638114,AAPL
2879,2024-03-25,161.025396,AAPL
2880,2024-03-26,161.070085,AAPL
2881,2024-03-27,161.185748,AAPL


In [None]:
# Convert spark df to panda dataframe, change the name of columns
w2v_pandas = w2v_data.toPandas()
apple_close_pandas = apple.toPandas().rename(columns = {'ds': 'date', 'y': 'price'})

w2v_pandas['date'] = pd.to_datetime(w2v_pandas['date'])
apple_close_pandas['date'] = pd.to_datetime(apple_close_pandas['date'])

In [None]:
# Check the type of columns before merging
print(w2v_pandas.dtypes)
print(apple_close_pandas.dtypes)

date          datetime64[ns]
title                 object
body                  object
words                 object
sw_removed            object
wordvector            object
dtype: object
date     datetime64[ns]
price           float64
dtype: object


In [None]:
#merge the data with recorded stock price
combined_data = pd.merge(w2v_pandas, apple_close_pandas, on='date')
combined_data.dtypes

date          datetime64[ns]
title                 object
body                  object
words                 object
sw_removed            object
wordvector            object
price                float64
dtype: object

In [None]:
# merge the data with predicted stock price
full_data_prediction = pd.merge(combined_data, apple_forecast, on='date', how = 'outer')
full_data_prediction

Unnamed: 0,date,title,body,words,sw_removed,wordvector,price_x,price_y,ticker
0,2020-02-18,Apple To Miss Q2 Earnings As Coronavirus Outbr...,Apple Inc. (NASDAQ:AAPL) on Monday said that i...,"[apple, inc., (nasdaq:aapl), on, monday, said,...","[apple, inc., (nasdaq:aapl), monday, said, exp...","[-0.019346115018432853, 0.021623999184857197, ...",79.75,74.043672,AAPL
1,2020-02-18,"9 Stocks To Watch For February 18, 2020",Some of the stocks that may grab investor focu...,"[some, of, the, stocks, that, may, grab, inves...","[stocks, may, grab, investor, focus, today, ar...","[0.006245387392668567, -0.027449340964704166, ...",79.75,74.043672,AAPL
2,2020-02-18,A Peek Into The Markets: US Stock Futures Drop...,Pre-open movers U.S. stock futures traded lowe...,"[pre-open, movers, u.s., stock, futures, trade...","[pre-open, movers, u.s., stock, futures, trade...","[-0.06860458146894373, 0.05131916658438202, 0....",79.75,74.043672,AAPL
3,2020-02-18,30 Stocks Moving in Tuesday's Pre-Market Session,Gainers Senmiao Technology Limited (NASDAQ:AIH...,"[gainers, senmiao, technology, limited, (nasda...","[gainers, senmiao, technology, limited, (nasda...","[-0.026318086414659412, 0.04042497831580705, -...",79.75,74.043672,AAPL
4,2020-02-18,EU Rejects Facebook Proposal On Content Modera...,Facebook Inc.’s (NASDAQ:FB) proposed framework...,"[facebook, inc.’s, (nasdaq:fb), proposed, fram...","[facebook, inc.’s, (nasdaq:fb), proposed, fram...","[-0.033064978968368824, 0.06496041013910243, 0...",79.75,74.043672,AAPL
...,...,...,...,...,...,...,...,...,...
6621,2024-03-24,,,,,,,161.638114,AAPL
6622,2024-03-25,,,,,,,161.025396,AAPL
6623,2024-03-26,,,,,,,161.070085,AAPL
6624,2024-03-27,,,,,,,161.185748,AAPL


In [None]:
# output the full dataframe to csv
# full_data_prediction.to_csv('NLP Data with predicted Stock Price')