# Hadoop & Apache Spark Data Pre-Processing

In the below notebook I chose to leverage both Hadoop and Apache Spark big data solutions for data storage and pre-processing purposes.

By using these tools in collaboration with each other I chose to combine the strengths of both platforms, offering a powerful solution for processing big data.

Hadoop was chosen for it's efficient data storage via the Hadoop Distributed File System (HDFS), which provides a reliable and scalable storage solution that's designed to handle very large datasets.

Whereas Apache Spark was chosen for it's data processing capabilities allowing me to complete the necessary data pre-processing required prior to further analysis and modeling. 

PySpark is a much more user friendly and easy to use Python API in comparison to MapReduce which is difficult to use given it's complex syntax.

In [1]:
# Return SparkContext, connecting Spark Cluster
sc

In [2]:
# sc master - running locally
sc.master

'local[*]'

**Notes:**

The above steps were taken to allow me to connect to Apache Spark via SparkContext by initialising my Spark application which sets up the necessary underlying Spark functionality.

By confirming that Spark is running locally it means that I am utilising all available cores on my machine, which should result in the execution of my Spark application being as fast as possible given my current hardware.

In [22]:
# import packages
from pyspark.sql.functions import col, count, when, to_date, year, month, dayofmonth, row_number, avg, min, max, mean, sum, round

**Notes:**

The above packages were imported to allow for efficient data analysis, manipulation and pre-processing using PySpark's capabilities. Given that I'm using PySpark I cannot rely on commonly used Python libraries for EDA such as Pandas or Numpy.

In [23]:
# Read csv file directly from my Hadoop directory
df = spark.read.csv('/user1/GSK_stock_data.csv', header=True, inferSchema=True)

**Notes:**

Having first previously copied my dataset into my local Hadoop directory and ensured that Hadoop is successfully running on my local environment I then can read it into this Jupyter notebook using PySpark which ensures that I am leveraging both the storage capabilities of Hadoop and processing capabilites of Apache Spark efficiently.

In [5]:
# Display df structure
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Dividends: double (nullable = true)
 |-- Stock Splits: double (nullable = true)



In [6]:
# Show dataframe
df.show()

+-------------------+-------------------+-------------------+-------------------+-------------------+------+---------+------------+
|               Date|               Open|               High|                Low|              Close|Volume|Dividends|Stock Splits|
+-------------------+-------------------+-------------------+-------------------+-------------------+------+---------+------------+
|1980-03-28 06:00:00|                0.0|0.14964139573276042| 0.1392012983560562| 0.1392012983560562|  2400|      0.0|         0.0|
|1980-03-31 06:00:00| 0.1392012983560562| 0.1392012983560562| 0.1392012983560562| 0.1392012983560562|     0|      0.0|         0.0|
|1980-04-01 06:00:00| 0.1392012983560562| 0.1392012983560562| 0.1392012983560562| 0.1392012983560562|     0|      0.0|         0.0|
|1980-04-02 06:00:00|                0.0|0.14964139573276042| 0.1392012983560562| 0.1392012983560562|   800|      0.0|         0.0|
|1980-04-03 06:00:00| 0.1392012983560562| 0.1392012983560562| 0.139201298356

In [7]:
# Describe the data
df.describe().show()

24/03/11 19:13:12 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------+------------------+-------------------+-------------------+-------------------+------------------+--------------------+--------------------+
|summary|              Open|               High|                Low|              Close|            Volume|           Dividends|        Stock Splits|
+-------+------------------+-------------------+-------------------+-------------------+------------------+--------------------+--------------------+
|  count|             11079|              11079|              11079|              11079|             11079|               11079|               11079|
|   mean| 15.99001955803962| 16.187680846803783| 15.962643930768905| 16.077199139687515|1938348.9213827963|0.005214775701778139|7.244336131419804E-4|
| stddev|11.896278348218647| 11.851240420274959| 11.717892849249225| 11.785981846870502|1832170.7216027784| 0.05279006919486163|0.035723332697144526|
|    min|               0.0|0.11484106630086899|0.11484106630086899|0.11484106630086899|            

In [8]:
# Count lines
print('number of lines in file: %s' % df.count())

number of lines in file: 11079


**Notes:**

The above Exploratory Data Analysis (EDA) steps were taken to ensure that my data has loaded successfully into a Spark DataFrame as well as confirming the datatypes, number of records and general descriptive statistics. 

Apache Spark packages were imported to allow for efficient data analysis, manipulation and pre-processing using PySpark's capabilities. Given that I'm using PySpark I cannot rely on commonly used Python libraries for EDA such as Pandas or Numpy.

In [9]:
# Check for missing values
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+----+----+----+---+-----+------+---------+------------+
|Date|Open|High|Low|Close|Volume|Dividends|Stock Splits|
+----+----+----+---+-----+------+---------+------------+
|   0|   0|   0|  0|    0|     0|        0|           0|
+----+----+----+---+-----+------+---------+------------+



**Notes:**

The above step were taken to ensure assess whether or not my data has any missing values which thankfully it doesn't meaning I don't need to consider this as part of future modeling.

In [10]:
# Convert 'Date' column from Datetime to regular Date format
df = df.withColumn('Date', to_date(col('Date'), 'yyyy-MM-dd'))

df.show(5)                         

+----------+------------------+-------------------+------------------+------------------+------+---------+------------+
|      Date|              Open|               High|               Low|             Close|Volume|Dividends|Stock Splits|
+----------+------------------+-------------------+------------------+------------------+------+---------+------------+
|1980-03-28|               0.0|0.14964139573276042|0.1392012983560562|0.1392012983560562|  2400|      0.0|         0.0|
|1980-03-31|0.1392012983560562| 0.1392012983560562|0.1392012983560562|0.1392012983560562|     0|      0.0|         0.0|
|1980-04-01|0.1392012983560562| 0.1392012983560562|0.1392012983560562|0.1392012983560562|     0|      0.0|         0.0|
|1980-04-02|               0.0|0.14964139573276042|0.1392012983560562|0.1392012983560562|   800|      0.0|         0.0|
|1980-04-03|0.1392012983560562| 0.1392012983560562|0.1392012983560562|0.1392012983560562|     0|      0.0|         0.0|
+----------+------------------+---------

In [11]:
# Extract different date compoments for different levels of analysis
df = df.withColumn('Year', year(col('Date')))\
    .withColumn('Month', month(col('Date')))\
    .withColumn('Day', dayofmonth(col('Date')))

df.show(5)

+----------+------------------+-------------------+------------------+------------------+------+---------+------------+----+-----+---+
|      Date|              Open|               High|               Low|             Close|Volume|Dividends|Stock Splits|Year|Month|Day|
+----------+------------------+-------------------+------------------+------------------+------+---------+------------+----+-----+---+
|1980-03-28|               0.0|0.14964139573276042|0.1392012983560562|0.1392012983560562|  2400|      0.0|         0.0|1980|    3| 28|
|1980-03-31|0.1392012983560562| 0.1392012983560562|0.1392012983560562|0.1392012983560562|     0|      0.0|         0.0|1980|    3| 31|
|1980-04-01|0.1392012983560562| 0.1392012983560562|0.1392012983560562|0.1392012983560562|     0|      0.0|         0.0|1980|    4|  1|
|1980-04-02|               0.0|0.14964139573276042|0.1392012983560562|0.1392012983560562|   800|      0.0|         0.0|1980|    4|  2|
|1980-04-03|0.1392012983560562| 0.1392012983560562|0.13

In [12]:
# Create new feature to calculate the daily change in price
df = df.withColumn('DailyChange', col('Close') - col('Open'))

df.show(5)

+----------+------------------+-------------------+------------------+------------------+------+---------+------------+----+-----+---+------------------+
|      Date|              Open|               High|               Low|             Close|Volume|Dividends|Stock Splits|Year|Month|Day|       DailyChange|
+----------+------------------+-------------------+------------------+------------------+------+---------+------------+----+-----+---+------------------+
|1980-03-28|               0.0|0.14964139573276042|0.1392012983560562|0.1392012983560562|  2400|      0.0|         0.0|1980|    3| 28|0.1392012983560562|
|1980-03-31|0.1392012983560562| 0.1392012983560562|0.1392012983560562|0.1392012983560562|     0|      0.0|         0.0|1980|    3| 31|               0.0|
|1980-04-01|0.1392012983560562| 0.1392012983560562|0.1392012983560562|0.1392012983560562|     0|      0.0|         0.0|1980|    4|  1|               0.0|
|1980-04-02|               0.0|0.14964139573276042|0.1392012983560562|0.1392

In [13]:
# Create new feature to indicate the price trend
df = df.withColumn('PriceTrend', when(col('Close') > col('Open'), 'Up').otherwise('Down'))

df.show(5)

+----------+------------------+-------------------+------------------+------------------+------+---------+------------+----+-----+---+------------------+----------+
|      Date|              Open|               High|               Low|             Close|Volume|Dividends|Stock Splits|Year|Month|Day|       DailyChange|PriceTrend|
+----------+------------------+-------------------+------------------+------------------+------+---------+------------+----+-----+---+------------------+----------+
|1980-03-28|               0.0|0.14964139573276042|0.1392012983560562|0.1392012983560562|  2400|      0.0|         0.0|1980|    3| 28|0.1392012983560562|        Up|
|1980-03-31|0.1392012983560562| 0.1392012983560562|0.1392012983560562|0.1392012983560562|     0|      0.0|         0.0|1980|    3| 31|               0.0|      Down|
|1980-04-01|0.1392012983560562| 0.1392012983560562|0.1392012983560562|0.1392012983560562|     0|      0.0|         0.0|1980|    4|  1|               0.0|      Down|
|1980-04-0

In [14]:
# Display df structure
df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Dividends: double (nullable = true)
 |-- Stock Splits: double (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Day: integer (nullable = true)
 |-- DailyChange: double (nullable = true)
 |-- PriceTrend: string (nullable = false)



**Notes:**

The above steps were taken to ensure my datatypes are in the desirable format for analysis purposes. I decided to create additional features also which will help with my future analysis and modeling, as well as showcasing some of the data transformation functionality of Apache Spark.

In [15]:
# Create new aggregated df for exploration
df_grouped = df.groupBy('Year', 'Month').agg(max('High').alias('MaxHigh'),
                                             min('Low').alias('MinLow'),
                                             mean('Close').alias('AvgClose'),
                                             sum('Volume').alias('TotalVolume'))

df_grouped.show(5)

+----+-----+-------------------+-------------------+------------------+-----------+
|Year|Month|            MaxHigh|             MinLow|          AvgClose|TotalVolume|
+----+-----+-------------------+-------------------+------------------+-----------+
|1990|    7|  3.857992510157307| 3.4841568830634366|3.6835364954812184|   35596600|
|1997|   11|  14.66772187775155| 12.727858049966924|13.767591526633815|    8155100|
|2022|   10| 31.555332678789046| 28.057563806574937|29.472206115722656|  125076700|
|1980|    8|0.16008152379546053|0.14268136024475098|  0.14715566663515|      42400|
|1987|   10|   3.25360755468237| 1.7594612962321232| 2.561120417985049|   65422000|
+----+-----+-------------------+-------------------+------------------+-----------+
only showing top 5 rows



In [16]:
# Display grouped_df structure
df_grouped.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- MaxHigh: double (nullable = true)
 |-- MinLow: double (nullable = true)
 |-- AvgClose: double (nullable = true)
 |-- TotalVolume: long (nullable = true)



In [17]:
# Create new features rounded to 2 decimal points
df_grouped = df_grouped.withColumn('MaxHighRounded', round(col('MaxHigh'), 2))
df_grouped = df_grouped.withColumn('MinLowRounded', round(col('MinLow'), 2))
df_grouped = df_grouped.withColumn('AvgCloseRounded', round(col('AvgClose'), 2))

df_grouped.show(5)

+----+-----+-------------------+-------------------+------------------+-----------+--------------+-------------+---------------+
|Year|Month|            MaxHigh|             MinLow|          AvgClose|TotalVolume|MaxHighRounded|MinLowRounded|AvgCloseRounded|
+----+-----+-------------------+-------------------+------------------+-----------+--------------+-------------+---------------+
|1990|    7|  3.857992510157307| 3.4841568830634366|3.6835364954812184|   35596600|          3.86|         3.48|           3.68|
|1997|   11|  14.66772187775155| 12.727858049966924|13.767591526633815|    8155100|         14.67|        12.73|          13.77|
|2022|   10| 31.555332678789046| 28.057563806574937|29.472206115722656|  125076700|         31.56|        28.06|          29.47|
|1980|    8|0.16008152379546053|0.14268136024475098|  0.14715566663515|      42400|          0.16|         0.14|           0.15|
|1987|   10|   3.25360755468237| 1.7594612962321232| 2.561120417985049|   65422000|          3.25

In [18]:
# List columns to drop
columns_to_drop = ['MaxHigh', 'MinLow', 'AvgClose']

# Drop columns
df_grouped = df_grouped.drop(*columns_to_drop)

df_grouped.show(5)

[Stage 22:>                                                         (0 + 1) / 1]

+----+-----+-----------+--------------+-------------+---------------+
|Year|Month|TotalVolume|MaxHighRounded|MinLowRounded|AvgCloseRounded|
+----+-----+-----------+--------------+-------------+---------------+
|1990|    7|   35596600|          3.86|         3.48|           3.68|
|1997|   11|    8155100|         14.67|        12.73|          13.77|
|2022|   10|  125076700|         31.56|        28.06|          29.47|
|1980|    8|      42400|          0.16|         0.14|           0.15|
|1987|   10|   65422000|          3.25|         1.76|           2.56|
+----+-----+-----------+--------------+-------------+---------------+
only showing top 5 rows



                                                                                

In [19]:
# Define new column names without the 'Rounded' suffix
new_column_names = {'MaxHighRounded': 'MaxHigh',
                    'MinLowRounded': 'MinLow',
                    'AvgCloseRounded': 'AvgClose'}

# Rename columns
for old_name, new_name in new_column_names.items():
    df_grouped = df_grouped.withColumnRenamed(old_name, new_name)

# Show the first few rows of the DataFrame to verify the column names are renamed
df_grouped.show(5)

+----+-----+-----------+-------+------+--------+
|Year|Month|TotalVolume|MaxHigh|MinLow|AvgClose|
+----+-----+-----------+-------+------+--------+
|1990|    7|   35596600|   3.86|  3.48|    3.68|
|1997|   11|    8155100|  14.67| 12.73|   13.77|
|2022|   10|  125076700|  31.56| 28.06|   29.47|
|1980|    8|      42400|   0.16|  0.14|    0.15|
|1987|   10|   65422000|   3.25|  1.76|    2.56|
+----+-----+-----------+-------+------+--------+
only showing top 5 rows



In [20]:
# Specify output path
df_path = '/user1/pyspark_df.csv'

# Export df to csv
df.write.csv(path = df_path, mode = 'overwrite', header = True)

                                                                                

In [21]:
# Specify output path
df_grouped_path = '/user1/pyspark_df_grouped.csv'

# Export df to csv
df_grouped.write.csv(path = df_grouped_path, mode = 'overwrite', header = True)

**Notes:**

I decided to create and output as csv two datasets, one at the lowest level of granularity where one record equals a single day timestamp which will be essential for my deep learning algorithms. The second is a grouped level which is for analysis and visualisation purposes which will be completed in a seperate notebook.

Irregardless of the size of a dataset in terms of number of records, big data principles and techniques can still be applied to benefit from the benefits of file storage and processing available by using Hadoop and Apache Spark.