# IBM Advanced Data Science Capstone: Forecasting Stock Prices

## Extract-Transform-Load (ETL) 

### Data Cleansing

 - Ensure that both the data files - main stocks dataset and the referential sectors dataset are in the same directory for feasible access. 
 - Ensure that the any indexed or unnamed columns from the CSV file are duly dropped or not read into the spark dataframes. This add constraints in checking for data duplication and makes the dataset more noisy.
 - Ensure that the NULL values are correctly imputed for the stock prices and volumes. Based on the univariate analysis, all the features are not normally distributed and are sensitive to outliers. So, we will not impute them with the mean.
 - For imputation, first drop the records where Date is NULL as they are irrelevant. Then, among the non-NULL Date records, impute the numerical features with the median. 
 - Ensure that all the features in the datasets are of the correct datatype.
 - Ensure to rename the column names of the features removing whitespaces, so that we can efficiently run SQL queries on the spark dataframes.

In [1]:
# find and init the spark instance to ensure it is pip installed
import findspark
findspark.init()

# set some HTML display setting 
from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

# import all the pyspark dependencies 
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, StringType, DateType
from pyspark.sql.functions import *
import pyspark.sql.functions as F
from pyspark.sql.window import Window


from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor

# declare a spark object that we will run our spark SQL dataframes on 
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

# init a spark session 
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .getOrCreate()

# import basic data analysis libraries  
import numpy as np
import pandas as pd
import scipy.stats as stats

# ignore warnings
import warnings
warnings.filterwarnings('ignore')

# import libraries for data visualization
import matplotlib.pyplot as plt
import seaborn as sns

sns.set_style("darkgrid")

#### Import the data files in spark dataframes

In [2]:
data_stocks = spark.read.csv('kaggle_stock_data.csv', header=True).drop('_c0') # ensure that the index columns are not read as eatures
data_stocks.show(5)

+----------+--------------------+----------+-----------+----------+-------+
|Instrument|                Date|Price High|Price Close|Price Open| Volume|
+----------+--------------------+----------+-----------+----------+-------+
|   CBKG.DE|2019-01-02T00:00:00Z|     5.804|      5.765|     5.782|7221471|
|   CBKG.DE|2019-01-03T00:00:00Z|      5.95|      5.802|     5.748|8064658|
|   CBKG.DE|2019-01-04T00:00:00Z|     6.168|      6.143|      5.89|8772521|
|   CBKG.DE|2019-01-07T00:00:00Z|     6.249|      6.182|     6.242|6781840|
|   CBKG.DE|2019-01-08T00:00:00Z|      6.39|       6.33|     6.172|8472530|
+----------+--------------------+----------+-----------+----------+-------+
only showing top 5 rows



In [3]:
data_sectors = spark.read.csv('kaggle_stock_sector_information.csv', header=True).drop('_c0') # ensure that the index columns are not read as eatures
data_sectors.show(5)

+----------+--------------------+-------------------------+-------------------------+------------------------+--------------------+--------------------+-------------------------+-------------------------+------------------------+------------------+------------------+
|Instrument| Company Common Name|TRBC Economic Sector Name|TRBC Business Sector Name|TRBC Industry Group Name|  TRBC Industry Name|  TRBC Activity Name|TRBC Economic Sector Code|TRBC Business Sector Code|TRBC Industry Group Code|TRBC Industry Code|TRBC Activity Code|
+----------+--------------------+-------------------------+-------------------------+------------------------+--------------------+--------------------+-------------------------+-------------------------+------------------------+------------------+------------------+
|   CBKG.DE|      Commerzbank AG|               Financials|     Banking & Investm...|        Banking Services|               Banks|         Banks (NEC)|                       55|                  

#### Appropriately format and/or rename the features 

In [4]:
'''
@format_cols() - a function to format the column names, replacing any whitespaces with '_'
@params : df, input parameter that takes in the dataframe whose columns need to be formatted 
@return: output dataframe with the formatted column names 
'''
def format_cols(df):
    new_columns = [col(col_name).alias(col_name.replace(" ", "_")) for col_name in df.columns]
    return df.select(*new_columns)

print("function format_cols() compiled....")

function format_cols() compiled....


In [5]:
'''
format the dataframe for the stocks dataset 
'''
data_stocks_formatted = format_cols(data_stocks)
data_stocks_formatted.show(5)

+----------+--------------------+----------+-----------+----------+-------+
|Instrument|                Date|Price_High|Price_Close|Price_Open| Volume|
+----------+--------------------+----------+-----------+----------+-------+
|   CBKG.DE|2019-01-02T00:00:00Z|     5.804|      5.765|     5.782|7221471|
|   CBKG.DE|2019-01-03T00:00:00Z|      5.95|      5.802|     5.748|8064658|
|   CBKG.DE|2019-01-04T00:00:00Z|     6.168|      6.143|      5.89|8772521|
|   CBKG.DE|2019-01-07T00:00:00Z|     6.249|      6.182|     6.242|6781840|
|   CBKG.DE|2019-01-08T00:00:00Z|      6.39|       6.33|     6.172|8472530|
+----------+--------------------+----------+-----------+----------+-------+
only showing top 5 rows



In [6]:
'''
format the dataframe for the sectors info dataset 
'''
data_sectors_formatted = format_cols(data_sectors)
data_sectors_formatted.show(5)

+----------+--------------------+-------------------------+-------------------------+------------------------+--------------------+--------------------+-------------------------+-------------------------+------------------------+------------------+------------------+
|Instrument| Company_Common_Name|TRBC_Economic_Sector_Name|TRBC_Business_Sector_Name|TRBC_Industry_Group_Name|  TRBC_Industry_Name|  TRBC_Activity_Name|TRBC_Economic_Sector_Code|TRBC_Business_Sector_Code|TRBC_Industry_Group_Code|TRBC_Industry_Code|TRBC_Activity_Code|
+----------+--------------------+-------------------------+-------------------------+------------------------+--------------------+--------------------+-------------------------+-------------------------+------------------------+------------------+------------------+
|   CBKG.DE|      Commerzbank AG|               Financials|     Banking & Investm...|        Banking Services|               Banks|         Banks (NEC)|                       55|                  

#### Assign the appropriate data type(s)

In [7]:
# check the data types in the dataframe currently 
data_stocks_formatted.dtypes

[('Instrument', 'string'),
 ('Date', 'string'),
 ('Price_High', 'string'),
 ('Price_Close', 'string'),
 ('Price_Open', 'string'),
 ('Volume', 'string')]

We can notice that all of them are strings. Lets us convert them to the correct datatypes. 

In [8]:
# assign the appropriate data type to each feature 
data_stocks_typeConverted = data_stocks_formatted.select(
    col("Instrument").cast(StringType()).alias("Instrument"),
    col("Date").cast(DateType()).alias("Date"),
    col("Price_High").cast(DoubleType()).alias("Price_High"),
    col("Price_Close").cast(DoubleType()).alias("Price_Close"),
    col("Price_Open").cast(DoubleType()).alias("Price_Open"),
    col("Volume").cast(IntegerType()).alias("Volume")
    
)
data_stocks_typeConverted.dtypes

[('Instrument', 'string'),
 ('Date', 'date'),
 ('Price_High', 'double'),
 ('Price_Close', 'double'),
 ('Price_Open', 'double'),
 ('Volume', 'int')]

The datatypes for each feature have been appropriately converted.

In [9]:
data_stocks_typeConverted.show(5)

+----------+----------+----------+-----------+----------+-------+
|Instrument|      Date|Price_High|Price_Close|Price_Open| Volume|
+----------+----------+----------+-----------+----------+-------+
|   CBKG.DE|2019-01-02|     5.804|      5.765|     5.782|7221471|
|   CBKG.DE|2019-01-03|      5.95|      5.802|     5.748|8064658|
|   CBKG.DE|2019-01-04|     6.168|      6.143|      5.89|8772521|
|   CBKG.DE|2019-01-07|     6.249|      6.182|     6.242|6781840|
|   CBKG.DE|2019-01-08|      6.39|       6.33|     6.172|8472530|
+----------+----------+----------+-----------+----------+-------+
only showing top 5 rows



#### Drop the duplicate records

Recalling the findings from the EDA, there were a lot of duplicates in both datasets. Let us recount them. 

In [10]:
data_stocks_typeConverted.groupBy(data_stocks_typeConverted.columns).count().filter("count > 1").count()

15257

In [11]:
data_sectors_formatted.groupBy(data_sectors_formatted.columns).count().filter("count > 1").count()

12

Now, let us remove these duplicates and reverify whether all the duplicates records have been dropped. 

In [12]:
data_stocks_dropped = data_stocks_typeConverted.dropDuplicates()
data_sectors_dropped = data_sectors_formatted.dropDuplicates()

print(data_stocks_dropped.groupBy(data_stocks_dropped.columns).count().filter("count > 1").count())
print(data_sectors_dropped.groupBy(data_sectors_dropped.columns).count().filter("count > 1").count())

0
0


Now, there are no duplicate records in both stock and sector datasets. We can proceed to handle NULL values next. 

#### Handle NULL values 

Based on the recommendations from the univariate analysis of the EDA: 

- Drop the records that do not a date i.e. the Date columns in NULL. 
- After dropping the NULL dates, impute the NULL entries in the numerical columns by the median for each company. 

In [13]:
'''
count_nulls() : a function count the number of in each column of a dataframe 
@param: an input dataframe on which we have to count the nulls in its columns 
@return: an output dataframe the contains the nulls counts for each column 
'''
def count_nulls(df):
    null_counts = df.select([sum(col(column).isNull().cast("int")).alias(column) for column in df.columns])
    return null_counts

print("function count_nulls() compiled ...")

function count_nulls() compiled ...


In [14]:
# count the nulls in the initial cleaned 
count_nulls(data_stocks_dropped).show()

+----------+----+----------+-----------+----------+------+
|Instrument|Date|Price_High|Price_Close|Price_Open|Volume|
+----------+----+----------+-----------+----------+------+
|         0|  72|       415|        380|       772| 57783|
+----------+----+----------+-----------+----------+------+



In [15]:
# drop all those 72 NULL date records 
data_stocks_cleaned_1 = data_stocks_dropped.filter(col("Date").isNotNull())
count_nulls(data_stocks_cleaned_1).show()

+----------+----+----------+-----------+----------+------+
|Instrument|Date|Price_High|Price_Close|Price_Open|Volume|
+----------+----+----------+-----------+----------+------+
|         0|   0|       343|        308|       700| 57783|
+----------+----+----------+-----------+----------+------+



Before we do the median imputations, lets extract the year and month from the Date column.

In [16]:
# Extract the 'year' and 'month' from the 'date_column'
data_stocks_cleaned_1 = data_stocks_cleaned_1.withColumn("Year", F.year("Date"))
data_stocks_cleaned_1 = data_stocks_cleaned_1.withColumn("Month", F.month("Date"))
data_stocks_cleaned_1 = data_stocks_cleaned_1.filter(col('Year')>2018)
data_stocks_cleaned_1.show(5)

+----------+----------+----------+-----------+----------+--------+----+-----+
|Instrument|      Date|Price_High|Price_Close|Price_Open|  Volume|Year|Month|
+----------+----------+----------+-----------+----------+--------+----+-----+
|   CBKG.DE|2020-03-30|     3.601|      3.338|     3.585|18925755|2020|    3|
|   CBKG.DE|2022-02-21|     9.513|      9.153|       9.4|10191050|2022|    2|
|   CBKG.DE|2022-11-03|     8.162|       8.08|     8.104| 4897012|2022|   11|
|  DTEGn.DE|2020-10-20|    13.935|      13.68|      13.9|11136709|2020|   10|
|  DTEGn.DE|2021-05-21|    17.224|      17.17|    17.004|10933087|2021|    5|
+----------+----------+----------+-----------+----------+--------+----+-----+
only showing top 5 rows



Now, we will impute the NULLs with median for each RIC by the year and month.

In [19]:
# Define a window specification for the DataFrame
window_spec = Window.partitionBy("Instrument","Year", "Month")
data_stocks_cleaned_1 = data_stocks_cleaned_1.sort(["Instrument", "Year", "Month"], ascending=[True, True, True])

data_stocks_cleaned_1 = data_stocks_cleaned_1.withColumn("Price_Open", F.when(data_stocks_cleaned_1["Price_Open"].isNull(),\
                        F.expr("percentile(Price_Open, 0.5)").over(window_spec)).otherwise(data_stocks_cleaned_1["Price_Open"]))

data_stocks_cleaned_1 = data_stocks_cleaned_1.withColumn("Price_Close", F.when(data_stocks_cleaned_1["Price_Close"].isNull(),\
                        F.expr("percentile(Price_Close, 0.5)").over(window_spec)).otherwise(data_stocks_cleaned_1["Price_Close"]))

data_stocks_cleaned_1 = data_stocks_cleaned_1.withColumn("Price_High", F.when(data_stocks_cleaned_1["Price_High"].isNull(),\
                        F.expr("percentile(Price_High, 0.5)").over(window_spec)).otherwise(data_stocks_cleaned_1["Price_High"]))

# data_stocks_cleaned_1 = data_stocks_cleaned_1.withColumn("Volume", F.when(data_stocks_cleaned_1["Volume"].isNull(),\
#                         F.expr("percentile(Volume, 0.5)").over(window_spec)).otherwise(data_stocks_cleaned_1["Volume"]))

In [20]:
count_nulls(data_stocks_cleaned_1).show()

+----------+----+----------+-----------+----------+------+----+-----+
|Instrument|Date|Price_High|Price_Close|Price_Open|Volume|Year|Month|
+----------+----+----------+-----------+----------+------+----+-----+
|         0|   0|        22|          0|       379| 57783|   0|    0|
+----------+----+----------+-----------+----------+------+----+-----+



As we can see although we performed some imputation, but still some NULLs exists. This may be noisy data it is better to drop them. 

In [21]:
data_stocks_cleaned_2 = data_stocks_cleaned_1.dropna()

In [22]:
count_nulls(data_stocks_cleaned_2).show()

+----------+----+----------+-----------+----------+------+----+-----+
|Instrument|Date|Price_High|Price_Close|Price_Open|Volume|Year|Month|
+----------+----+----------+-----------+----------+------+----+-----+
|         0|   0|         0|          0|         0|     0|   0|    0|
+----------+----+----------+-----------+----------+------+----+-----+



### Save the cleansed dataframes 

In [23]:
data_stocks_cleaned_2.toPandas().to_csv('kaggle_stock_data_cleansed.csv', header=True, index=False, mode='w')

In [24]:
data_sectors_dropped.toPandas().to_csv('kaggle_stock_sector_information_cleansed.csv', header=True, index=False, mode='w')