### Installing Dependencies

**Step 1: Install Dependencies**
We need to install following components to run pyspark seamlessly:
OpenJDK 8,
Spark Environment,
FindSpark package

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz
!tar xf spark-3.2.0-bin-hadoop3.2.tgz
!pip install -q findspark

**Step 2: Add environment variables**
After installing dependencies, we need to some variables to the environment so that pyspark knows where to look for using dependencies. We can do that using following commands:

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "spark-3.2.0-bin-hadoop3.2"

**Step 3: Initilize pyspark**
Finally, we just need to initilize pyspark which can be easily achieved using third-party package named findspark as shown below:

In [3]:
import findspark
findspark.init()

You can try running following commands to check if pyspark is properly installed or not:

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext
sc

## Mounting Google Drive

In [5]:
from google.colab import drive

# Mount your Google Drive
drive.mount('/content/drive')

Mounted at /content/drive


In [6]:
%cd /content/drive/MyDrive/cloud_computing/Project/Preprocessed Data/

/content/drive/MyDrive/cloud_computing/Project/Preprocessed Data


# Merging stocks with tweets

In [7]:
import pandas as pd

ggl_process_df = pd.read_csv('google_preprocessed.csv')
ms_process_df = pd.read_csv('microsoft_preprocessed.csv')
tsla_process_df = pd.read_csv('tesla_preprocessed.csv')
amzn_process_df = pd.read_csv('amazon_preprocessed.csv')
apple_process_df = pd.read_csv('apple_preprocessed.csv')

In [8]:
ggl_process_df['weighted_avg_sentiment_score'][0]

0.1831923999999999

In [9]:
ms_process_df['weighted_avg_sentiment_score'][0]

0.5562448275862069

In [10]:
tsla_process_df['weighted_avg_sentiment_score'][0]

0.3480822370617696

In [11]:
amzn_process_df['weighted_avg_sentiment_score'][0]

0.2102843575418994

In [12]:
apple_process_df['weighted_avg_sentiment_score'][0]

0.5865996240601504

In [13]:
# convert the Pandas DataFrame to a PySpark DataFrame
ggl_process_df = spark.createDataFrame(ggl_process_df)
ms_process_df = spark.createDataFrame(ms_process_df)
tsla_process_df = spark.createDataFrame(tsla_process_df)
amzn_process_df = spark.createDataFrame(amzn_process_df)
apple_process_df = spark.createDataFrame(apple_process_df)

  for column, series in pdf.iteritems():


In [14]:
%cd /content/drive/MyDrive/cloud_computing/Project/Stock Prices

/content/drive/MyDrive/cloud_computing/Project/Stock Prices


In [15]:
# Reading stocks data
ggl_stocks_df = pd.read_csv('Google Stock Prices.csv')
ms_stocks_df = pd.read_csv('Microsoft Stock Prices.csv')
tsla_stocks_df = pd.read_csv('Tesla Stock Prices.csv')
amzn_stocks_df = pd.read_csv('Amazon Stock Prices.csv')
apple_stocks_df = pd.read_csv('Apple Stock Prices.csv')

In [16]:
# convert the Pandas DataFrame to a PySpark DataFrame
ggl_stocks_df = spark.createDataFrame(ggl_stocks_df)
ms_stocks_df = spark.createDataFrame(ms_stocks_df)
tsla_stocks_df = spark.createDataFrame(tsla_stocks_df)
amzn_stocks_df = spark.createDataFrame(amzn_stocks_df)
apple_stocks_df = spark.createDataFrame(apple_stocks_df)

  for column, series in pdf.iteritems():


In [18]:
# Renaming Date column to date
ggl_stocks_df = ggl_stocks_df.withColumnRenamed("Date", "date")
ms_stocks_df = ms_stocks_df.withColumnRenamed("Date", "date")
tsla_stocks_df = tsla_stocks_df.withColumnRenamed("Date", "date")
amzn_stocks_df = amzn_stocks_df.withColumnRenamed("Date", "date")
apple_stocks_df = apple_stocks_df.withColumnRenamed("Date", "date")

ggl_stocks_df = ggl_stocks_df.withColumnRenamed("Close", "stock_price")
ms_stocks_df = ms_stocks_df.withColumnRenamed("Close", "stock_price")
tsla_stocks_df = tsla_stocks_df.withColumnRenamed("Close", "stock_price")
amzn_stocks_df = amzn_stocks_df.withColumnRenamed("Close", "stock_price")
apple_stocks_df = apple_stocks_df.withColumnRenamed("Close", "stock_price")

In [19]:
# Perform a left join on the "id" column
ggl_process_stocks_df = ggl_process_df.join(ggl_stocks_df, on="date", how="left")
ms_process_stocks_df = ms_process_df.join(ms_stocks_df, on="date", how="left")
tsla_process_stocks_df = tsla_process_df.join(tsla_stocks_df, on="date", how="left")
amzn_process_stocks_df = amzn_process_df.join(amzn_stocks_df, on="date", how="left")
apple_process_stocks_df = apple_process_df.join(apple_stocks_df, on="date", how="left")

In [20]:
# Drop multiple columns
ggl_final_df = ggl_process_stocks_df.drop("Open", "High", "Low", "Adj Close", "Volume")
ms_final_df = ms_process_stocks_df.drop("Open", "High", "Low", "Adj Close", "Volume")
tsla_final_df = tsla_process_stocks_df.drop("Open", "High", "Low", "Adj Close", "Volume")
amzn_final_df = amzn_process_stocks_df.drop("Open", "High", "Low", "Adj Close", "Volume")
apple_final_df = apple_process_stocks_df.drop("Open", "High", "Low", "Adj Close", "Volume")

In [21]:
ggl_final_df = ggl_final_df.orderBy('date')
ms_final_df = ms_final_df.orderBy('date')
tsla_final_df = tsla_final_df.orderBy('date')
amzn_final_df = amzn_final_df.orderBy('date')
apple_final_df = apple_final_df.orderBy('date')

In [22]:
ggl_final_df.count()

364

In [23]:
ggl_final_df.show(10)

+----------+----------------------------+-----------------------------+-----------------------------+----------------------------+----------------------------+----------------------+------------------------+---------------------+----------------------+-------+-----------+
|      date|weighted_avg_sentiment_score|daywise_median_positive_score|daywise_median_negative_score|daywise_positive_tweet_count|daywise_negative_tweet_count|daywise_avg_replyCount|daywise_avg_retweetCount|daywise_avg_likeCount|daywise_avg_quoteCount|company|stock_price|
+----------+----------------------------+-----------------------------+-----------------------------+----------------------------+----------------------------+----------------------+------------------------+---------------------+----------------------+-------+-----------+
|2022-01-01|          0.1831923999999999|                       0.3818|                       0.3182|                           9|                           1|                     4

### Filling null values of stock prices

We fill the null values of stock prices with the average value of last five day's stock prices. Since starting two columns doesn't have last five days stock prices, we will remove the those rows

In [25]:
# Removing first two rows from the dataframe

ggl_final_df = ggl_final_df.filter(ggl_final_df.date >= "2022-01-03")
ms_final_df = ms_final_df.filter(ms_final_df.date >= "2022-01-03")
tsla_final_df = tsla_final_df.filter(tsla_final_df.date >= "2022-01-03")
amzn_final_df = amzn_final_df.filter(amzn_final_df.date >= "2022-01-03")
apple_final_df = apple_final_df.filter(apple_final_df.date >= "2022-01-03")

In [26]:
# assume your PySpark DataFrame is loaded into a variable called `df`
# and the column containing the stock prices is named `stock_prices` 

from pyspark.sql.functions import udf, collect_list, coalesce
from pyspark.sql.types import FloatType
from pyspark.sql.window import Window

# assume your PySpark DataFrame is loaded into a variable called `df`
# and the column containing the stock prices is named `stock_prices`

# define a window that will order the rows by date and consider the previous five rows
w = Window.orderBy("date").rowsBetween(-5, -1)

# define a function that returns the rolling average of the last five non-null values
def rolling_avg(values):
    non_null_values = [v for v in values if v is not None]
    if len(non_null_values) == 0:
        return None
    return sum(non_null_values) / len(non_null_values)

# create a UDF from the rolling_avg function
rolling_avg_udf = udf(rolling_avg, FloatType())

# use the UDF to replace null values with the rolling average
ggl_final_df = ggl_final_df.withColumn("stock_price", coalesce("stock_price", rolling_avg_udf(collect_list("stock_price").over(w))))
ms_final_df = ms_final_df.withColumn("stock_price", coalesce("stock_price", rolling_avg_udf(collect_list("stock_price").over(w))))
tsla_final_df = tsla_final_df.withColumn("stock_price", coalesce("stock_price", rolling_avg_udf(collect_list("stock_price").over(w))))
amzn_final_df = amzn_final_df.withColumn("stock_price", coalesce("stock_price", rolling_avg_udf(collect_list("stock_price").over(w))))
apple_final_df = apple_final_df.withColumn("stock_price", coalesce("stock_price", rolling_avg_udf(collect_list("stock_price").over(w))))

In [27]:
ggl_final_df.select('stock_price').show(20)

+------------------+
|       stock_price|
+------------------+
|        144.991501|
|        144.399506|
|        137.774994|
|        137.747498|
|        137.016998|
|140.38609313964844|
|139.23475646972656|
|        138.669495|
|        139.735992|
|        141.430496|
|        138.587006|
|        139.480499|
|139.58070373535156|
|139.80850219726562|
|139.83267211914062|
|        135.998001|
|        135.116501|
|        133.307495|
|        130.351501|
|133.69337463378906|
+------------------+
only showing top 20 rows



### Adding next day stock prices to the current day

In [28]:
from pyspark.sql.functions import col, avg, when, lag
from pyspark.sql.window import Window

# define a window that will order the rows by date and consider the previous five rows
w = Window.orderBy("date").rowsBetween(0, 1)

# define a function that returns the stock price for the next day
def get_next_day_price(prices):
    if len(prices) == 1:
        return None
    return prices[1]

# create a UDF from the get_next_day_price function
get_next_day_price_udf = udf(get_next_day_price, FloatType())
 
# use the UDF to create a new column with the stock price for the next day
ggl_final_df = ggl_final_df.withColumn("next_day_stock_price", get_next_day_price_udf(collect_list("stock_price").over(w)))
ms_final_df = ms_final_df.withColumn("next_day_stock_price", get_next_day_price_udf(collect_list("stock_price").over(w)))
tsla_final_df = tsla_final_df.withColumn("next_day_stock_price", get_next_day_price_udf(collect_list("stock_price").over(w)))
amzn_final_df = amzn_final_df.withColumn("next_day_stock_price", get_next_day_price_udf(collect_list("stock_price").over(w)))
apple_final_df = apple_final_df.withColumn("next_day_stock_price", get_next_day_price_udf(collect_list("stock_price").over(w)))

In [30]:
ggl_final_df.tail(1)

[Row(date='2022-12-30', weighted_avg_sentiment_score=0.0263022075055187, daywise_median_positive_score=0.4728, daywise_median_negative_score=0.6705, daywise_positive_tweet_count=9, daywise_negative_tweet_count=6, daywise_avg_replyCount=4, daywise_avg_retweetCount=5, daywise_avg_likeCount=29, daywise_avg_quoteCount=2, company=1, stock_price=88.230003, next_day_stock_price=None)]

### Replacing last row with median value

In [31]:
from pyspark.sql.functions import col
from pyspark.sql.functions import when

df = ggl_final_df
# assume your PySpark DataFrame is loaded into a variable called `df`
# and the column containing the values to fill is called `my_column`

# calculate the median of the column
quantile_value = df.approxQuantile("next_day_stock_price", [0.5], 0.01)[0]

# replace null values with the median value
ggl_final_df = ggl_final_df.withColumn("next_day_stock_price", when(col("next_day_stock_price").isNull(), quantile_value).otherwise(col("next_day_stock_price")))
ms_final_df = ms_final_df.withColumn("next_day_stock_price", when(col("next_day_stock_price").isNull(), quantile_value).otherwise(col("next_day_stock_price")))
tsla_final_df = tsla_final_df.withColumn("next_day_stock_price", when(col("next_day_stock_price").isNull(), quantile_value).otherwise(col("next_day_stock_price")))
amzn_final_df = amzn_final_df.withColumn("next_day_stock_price", when(col("next_day_stock_price").isNull(), quantile_value).otherwise(col("next_day_stock_price")))
apple_final_df = apple_final_df.withColumn("next_day_stock_price", when(col("next_day_stock_price").isNull(), quantile_value).otherwise(col("next_day_stock_price")))

### Adding trend column (Increased or Decreased)

In [32]:
from pyspark.sql.functions import col, when

# Assume your DataFrame is called 'df' and it contains columns 'stock_price' and 'next_day_stock_price'
# Create a new column called 'trend' which is '1' if stock price increased and '0' if stock price decreased
ggl_final_df = ggl_final_df.withColumn('trend', when(col('stock_price') < col('next_day_stock_price'), 1).otherwise(0))
ms_final_df = ms_final_df.withColumn('trend', when(col('stock_price') < col('next_day_stock_price'), 1).otherwise(0))
tsla_final_df = tsla_final_df.withColumn('trend', when(col('stock_price') < col('next_day_stock_price'), 1).otherwise(0))
amzn_final_df = amzn_final_df.withColumn('trend', when(col('stock_price') < col('next_day_stock_price'), 1).otherwise(0))
apple_final_df = apple_final_df.withColumn('trend', when(col('stock_price') < col('next_day_stock_price'), 1).otherwise(0))

In [33]:
ggl_final_df.show(5)

+----------+----------------------------+-----------------------------+-----------------------------+----------------------------+----------------------------+----------------------+------------------------+---------------------+----------------------+-------+-----------+--------------------+-----+
|      date|weighted_avg_sentiment_score|daywise_median_positive_score|daywise_median_negative_score|daywise_positive_tweet_count|daywise_negative_tweet_count|daywise_avg_replyCount|daywise_avg_retweetCount|daywise_avg_likeCount|daywise_avg_quoteCount|company|stock_price|next_day_stock_price|trend|
+----------+----------------------------+-----------------------------+-----------------------------+----------------------------+----------------------------+----------------------+------------------------+---------------------+----------------------+-------+-----------+--------------------+-----+
|2022-01-03|          0.2750561151079136|                       0.5574|                       0.2732

### Saving Data Frames

In [34]:
%cd /content/drive/MyDrive/cloud_computing/Project

/content/drive/MyDrive/cloud_computing/Project


In [35]:
ggl_final_df.toPandas().to_csv('Merged Data/ggl_twts_stcks.csv', index = False)
ms_final_df.toPandas().to_csv('Merged Data/ms_twts_stcks.csv', index = False)
tsla_final_df.toPandas().to_csv('Merged Data/tsla_twts_stcks.csv', index = False)
amzn_final_df.toPandas().to_csv('Merged Data/amzn_twts_stcks.csv', index = False)
apple_final_df.toPandas().to_csv('Merged Data/apple_twts_stcks.csv', index = False)

### Merging all dataframe into single dataframe

In [36]:
# merge the dataframes using union
merged_final_df = ggl_final_df.union(ms_final_df).union(tsla_final_df).union(amzn_final_df).union(apple_final_df)

In [37]:
merged_final_df.count()

1809

In [38]:
merged_final_df.toPandas().to_csv('Merged Data/merged_twts_stcks.csv', index = False)