# Processing Big Data - Data Ingestion
© Explore Data Science Academy

## Honour Code


I {**YOUR NAME**, **YOUR SURNAME**}, confirm - by submitting this document - that the solutions in this notebook are a result of my own work and that I abide by the [EDSA honour code](https://drive.google.com/file/d/1QDCjGZJ8-FmJE3bZdIQNwnJyQKPhHZBn/view?usp=sharing).
    Non-compliance with the honour code constitutes a material breach of contract.



## Context 

To work constructively with any dataset, one needs to create an ingestion profile to make sure that the data at the source can be readily consumed. For this section of the predict, as the Data Engineer in the team, you will be required to design and implement the ingestion process. For the purposes of the project the AWS cloud storage service, namely, the S3 bucket service will act as your data source. All the data required can be found [here](https://processing-big-data-predict-stocks-data.s3.eu-west-1.amazonaws.com/stocks.zip).

<div align="center" style="width: 600px; font-size: 80%; text-align: center; margin: 0 auto">
<img src="https://raw.githubusercontent.com/Explore-AI/Pictures/master/data_engineering/transform/predict/DataIngestion.jpg"
     alt="Data Ingestion"
     style="float: center; padding-bottom=0.5em"
     width=40%/>
     <p><em>Figure 1. Data Ingestion</em></p>
</div>

Your manager, Gnissecorp Atadgib, knowing very well that you've recently completed your Data Engineering qualification, asks you to make use of Apache Spark for the ingestion as well as the rest of the project. His rationale being, that stock market data is generated every day and is quite time-sensitive and would require scalability when deploying to a production environment. 

## Dataset - US Nasdaq




<div align="center" style="width: 600px; font-size: 80%; text-align: center; margin: 0 auto">
<img src="https://raw.githubusercontent.com/Explore-AI/Pictures/master/data_engineering/transform/predict/Nasdaq.png"
     alt="Nasdaq"
     style="float: center; padding-bottom=0.5em"
     width=50%/>
     <p><em>Figure 2. Nasdaq</em></p>
</div>

The data that you will be working with is a historical snapshot of market data taken from the Nasdaq electronic market. This dataset contains historical daily prices for all tickers currently trading on Nasdaq. The up-to-date list can be found on their [website](https://www.nasdaq.com/)


The provided data contains price data dating back from 02 January 1962 up until 01 April 2020. The data found in the S3 bucket has been stored in the following structure:

```
     stocks/<Year>/<Month>/<Day>/stocks.csv
```
Each CSV file for every trading day contains the following details:
- **Date** - specifies trading date
- **Open** - opening price
- **High** - maximum price during the day
- **Low** - minimum price during the day
- **Close** - close price adjusted for splits
- **Adj Close** - close price adjusted for both dividends and splits
- **Volume** - the number of shares that changed hands during a given day

## Basic initialisation
To get you started, let's import some basic Python libraries as well as Spark modules and functions.

In [24]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import * 


Remember that we need a `SparkContext` and `SparkSession` to interface with Spark.
We will mostly be using the `SparkContext` to interact with RDDs and the `SparkSession` to interface with Python objects.

> ℹ️ **Instructions** ℹ️
>
>Initialise a new **Spark Context** and **Session** that you will use to interface with Spark.

In [25]:
#TODO: Write your code here 
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate() 

sc=spark.sparkContext


## Investigate dataset schema
At this point, it is enough to read in a single file to ascertain the data structure. You will be required to use the information obtained from the small subset to create a data schema. This data schema will be used when reading the entire dataset using Spark.

> ℹ️ **Instructions** ℹ️
>
>Make use of Pandas to read in a single file and investigate the plausible data types to be used when creating a Spark data schema. 
>
>*You may use as many coding cells as necessary.*

In [26]:
import pandas as pd

# Read in the file using Pandas
df = pd.read_csv("../Downloads/stocks (6)/stocks/1962/01/02/stocks.csv") 

# Use Pandas dtypes to inspect the data types of each column
df_dtypes = df.dtypes 


# Create a Spark data schema based on the Pandas dtypes
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

spark_schema = StructType([
    StructField(col_name, StringType() if col_dtype == 'object' else IntegerType() if col_dtype == 'int64' else FloatType(), True)
    for col_name, col_dtype in df_dtypes.items()
])

# Use the Spark data schema to read in the file as a Spark DataFrame 
#data = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("../Downloads/stocks (6)/stocks/1962/*/stocks.csv")
#data = spark.read("../Downloads/stocks (6)/stocks/1962/*/stocks.csv") 
#data.show()
spark_df = spark.read.csv("../Downloads/stocks (6)/stocks/1962/*/*/stocks.csv", schema=spark_schema, header=True)
#df_dtypes = spark_df.dtypes 
spark_df.show() 



+----------+-----------+-----------+----------+-----------+------------+---------+-----+
|      Date|       Open|       High|       Low|      Close|   Adj Close|   Volume|stock|
+----------+-----------+-----------+----------+-----------+------------+---------+-----+
|1962-02-19|    5.83929|   5.907375|   5.83929|    5.86332|   1.3863293|  29900.0|   AA|
|1962-02-19|   5.481634|   5.528486|  5.481634|  5.5167727|   1.2804527|  32000.0| ARNC|
|1962-02-19|  0.9074074| 0.91563785|0.89917696|  0.9032922|  0.16141544| 619400.0|   BA|
|1962-02-19|  1.6770834|  1.6927084| 1.6614584|  1.6770834|   0.1440587| 170400.0|  CAT|
|1962-02-19|        0.0|   3.578869|      20.0|   3.549107| 0.056501225| 273600.0|  CVX|
|1962-02-19|0.099767394|0.099767394|0.09820853| 0.09820853| 0.037543412| 817400.0|  DIS|
|1962-02-19|        0.0|    29.9375|     29.75|    29.9375|  0.49964145|   1600.0|  DTE|
|1962-02-19|        0.0|   9.921875|  9.890625|   9.921875|  0.22499175|   8800.0|   ED|
|1962-02-19|        0

## Read CSV files

When working with big data, it is often not tenable to keep processing an entire data batch when you are in the process of development - this can be quite time-consuming. If the data is uniform, it is sufficient to work with a smaller subset to create basic functionality. Your manager has identified the year **1962** to perform the initial testing for data ingestion. 

> ℹ️ **Instructions** ℹ️
>
>Read in the data for **1962** using a data schema that purely uses string data types. You will be required to convert to the appropriate data types at a later stage.
>
>*You may use as many coding cells as necessary.*

In [27]:
#TODO: Write your code here 


from pyspark.sql.types import StructType, StructField, StringType

# Define the schema with all columns as StringType
schema = StructType([
    StructField("Date", StringType(), True),
    StructField("Open", StringType(), True),
    StructField("High", StringType(), True),
    StructField("low", StringType(), True),
    StructField("Close", StringType(), True),
    StructField("Adj Close", StringType(), True),
    StructField("Volume", StringType(), True),
    StructField("Stock", StringType(), True),
    # Add as many columns as necessary with StringType
])

# Read in the data using the schema
data_1962 = spark.read.csv("../Downloads/stocks (6)/stocks/1962/*/*/stocks.csv", schema=schema, header=True)  
data_1962.show()


+----------+------------------+------------------+------------------+------------------+--------------------+---------+-----+
|      Date|              Open|              High|               low|             Close|           Adj Close|   Volume|Stock|
+----------+------------------+------------------+------------------+------------------+--------------------+---------+-----+
|1962-02-19| 5.839290142059326| 5.907374858856201| 5.839290142059326| 5.863319873809815|  1.3863292932510376|  29900.0|   AA|
|1962-02-19| 5.481634140014648|5.5284857749938965| 5.481634140014648| 5.516772747039795|  1.2804527282714844|  32000.0| ARNC|
|1962-02-19|0.9074074029922484|0.9156378507614136|0.8991769552230835| 0.903292179107666|  0.1614154428243637| 619400.0|   BA|
|1962-02-19|1.6770833730697632|1.6927083730697632|1.6614583730697632|1.6770833730697632|  0.1440587043762207| 170400.0|  CAT|
|1962-02-19|               0.0|3.5788691043853764|              20.0| 3.549107074737549|  0.0565012246370315| 273600.0

## Update column names
To make the data easier to work with, you will need to make a few changes:
1. Column headers should all be in lowercase; and
2. Whitespaces should be replaced with underscores.


> ℹ️ **Instructions** ℹ️
>
>Make sure that the column headers are all in lowercase and that any whitespaces are replaced with underscores.
>
>*You may use as many coding cells as necessary.*

In [28]:
#TODO: Write your code here 

# Read in the data using the schema, and store the original column names
#data_1962 = spark.read.csv("../Downloads/stocks (6)/stocks/1962/01/02/stocks.csv", schema=schema, header=True)
original_columns = data_1962.columns

# Lowercase the column names and replace any whitespaces with underscores
lowercase_columns = [col.lower().replace(" ", "_") for col in original_columns]

# Create a new DataFrame with the modified column names
data_1962 = data_1962.toDF(*lowercase_columns) 
data_1962.show()

+----------+------------------+------------------+------------------+------------------+--------------------+---------+-----+
|      date|              open|              high|               low|             close|           adj_close|   volume|stock|
+----------+------------------+------------------+------------------+------------------+--------------------+---------+-----+
|1962-02-19| 5.839290142059326| 5.907374858856201| 5.839290142059326| 5.863319873809815|  1.3863292932510376|  29900.0|   AA|
|1962-02-19| 5.481634140014648|5.5284857749938965| 5.481634140014648| 5.516772747039795|  1.2804527282714844|  32000.0| ARNC|
|1962-02-19|0.9074074029922484|0.9156378507614136|0.8991769552230835| 0.903292179107666|  0.1614154428243637| 619400.0|   BA|
|1962-02-19|1.6770833730697632|1.6927083730697632|1.6614583730697632|1.6770833730697632|  0.1440587043762207| 170400.0|  CAT|
|1962-02-19|               0.0|3.5788691043853764|              20.0| 3.549107074737549|  0.0565012246370315| 273600.0

## Null Values
Null values often represent missing pieces of data. It is always good to know where your null values lie - so you can quickly identify and remedy any issues stemming from these.

> ℹ️ **Instructions** ℹ️
>
>Write code to count the number of null values found in each column.
>
>*You may use as many coding cells as necessary.*

In [29]:
from pyspark.sql.functions import col,isnan, when, count
data_1962.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data_1962.columns]
   ).show()








    

+----+----+----+---+-----+---------+------+-----+
|date|open|high|low|close|adj_close|volume|stock|
+----+----+----+---+-----+---------+------+-----+
|   0|   0|   0| 22|    0|        0|    21|    0|
+----+----+----+---+-----+---------+------+-----+



## Data type conversion - The final data schema

Now that we have identified the number of missing values in the data set, we'll move on to convert our data schema to the required data types. 

> ℹ️ **Instructions** ℹ️
>
>Use typecasting to convert the string data types in your current data schema to more appropriate data types.
>
>*You may use as many coding cells as necessary.*

In [30]:
#fix

from pyspark.sql.types import IntegerType, FloatType, DateType
from pyspark.sql.functions import ceil, col,when

# Define the updated schema with appropriate data types
#updated_schema = StructType([
    #StructField("date", StringType(), True),
    #StructField("open", FloatType(), True),
    #StructField("high", FloatType(), True),
    #StructField("low", FloatType(), True),
    #StructField("close", FloatType(), True),
    #StructField("adj_close", FloatType(), True),
    #StructField("volume_stock",StringType(), True),
    # Add as many columns as necessary with appropriate data types
#])

# Read in the data using the updated schema 
data_1962_1 = data_1962.withColumn("date", data_1962["date"].cast(StringType())) 
data_1962_1 = data_1962.withColumn("open", data_1962["open"].cast(FloatType())) 
data_1962_1 = data_1962.withColumn("high", data_1962["high"].cast(FloatType())) 
data_1962_1 = data_1962.withColumn("low", data_1962["low"].cast(FloatType()))
data_1962_1 = data_1962.withColumn("close", data_1962["close"].cast(FloatType()))
data_1962_1 = data_1962.withColumn("adj_close", data_1962["adj_close"].cast(FloatType()))
data_1962_1 = data_1962.withColumn("volume", data_1962["volume"].cast(FloatType())) 
data_1962_1 = data_1962.withColumn("stock", data_1962["stock"].cast(StringType()))
#data_1962_ = spark.read.csv("../Downloads/stocks (6)/stocks/1962/01/02/stocks.csv", schema=updated_schema, header=True)
data_1962_1.show()
print(data_1962_1)

from pyspark.sql.functions import col,isnan, when, count
data_1962_1.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data_1962_1.columns]
   ).show()




+----------+------------------+------------------+------------------+------------------+--------------------+---------+-----+
|      date|              open|              high|               low|             close|           adj_close|   volume|stock|
+----------+------------------+------------------+------------------+------------------+--------------------+---------+-----+
|1962-02-19| 5.839290142059326| 5.907374858856201| 5.839290142059326| 5.863319873809815|  1.3863292932510376|  29900.0|   AA|
|1962-02-19| 5.481634140014648|5.5284857749938965| 5.481634140014648| 5.516772747039795|  1.2804527282714844|  32000.0| ARNC|
|1962-02-19|0.9074074029922484|0.9156378507614136|0.8991769552230835| 0.903292179107666|  0.1614154428243637| 619400.0|   BA|
|1962-02-19|1.6770833730697632|1.6927083730697632|1.6614583730697632|1.6770833730697632|  0.1440587043762207| 170400.0|  CAT|
|1962-02-19|               0.0|3.5788691043853764|              20.0| 3.549107074737549|  0.0565012246370315| 273600.0

## Consolidate missing values
We have to check if the data type conversion above was done correctly.
If the casting was not successful, a null value gets inserted into the dataframe. You can thus check for successful conversion by determining if any null values are included in the resulting dataframe.


> ℹ️ **Instructions** ℹ️
>
>Write code to compare the number of invalid entries (nulls) pre-conversion and post-conversion.
>
>*You may use as many coding cells as necessary.*

In [31]:
#preconverison 
from pyspark.sql.functions import col,isnan, when, count
data_1962.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data_1962.columns]
   ).show() 


#post-conversion 
from pyspark.sql.functions import col,isnan, when, count
data_1962_1.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data_1962_1.columns]
   ).show()



+----+----+----+---+-----+---------+------+-----+
|date|open|high|low|close|adj_close|volume|stock|
+----+----+----+---+-----+---------+------+-----+
|   0|   0|   0| 22|    0|        0|    21|    0|
+----+----+----+---+-----+---------+------+-----+

+----+----+----+---+-----+---------+------+-----+
|date|open|high|low|close|adj_close|volume|stock|
+----+----+----+---+-----+---------+------+-----+
|   0|   0|   0| 22|    0|        0|    21|    0|
+----+----+----+---+-----+---------+------+-----+



Here you should be able to see if any of your casts went wrong. 
Do not attempt to correct any missing values at this point. This will be dealt with in later sections of the predict.

## Generate parquet files
When writing in Spark, we typically use parquet format. This format allows parallel writing using Spark's optimisation while maintaining other useful things like metadata.

When writing, it is good to make sure that the data is sufficiently partitioned. 

Generally, data should be partitioned with one partition for every 200MB of data, but this also depends on the size of your cluster and executors. 


### Check the size of the dataframe before partitioning

In [32]:
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer

In [33]:
rdd = data_1962.rdd._reserialize(AutoBatchedSerializer(PickleSerializer()))
obj = rdd.ctx._jvm.org.apache.spark.mllib.api.python.SerDe.pythonToJava(rdd._jrdd, True)
size = sc._jvm.org.apache.spark.util.SizeEstimator.estimate(obj)
size_MB = size/1000000
partitions = max(int(size_MB/200), 2)
print(f'The dataframe is {size_MB} MB')  
print(partitions, )


The dataframe is 37.215544 MB
2


### Write parquet files to the local directory
> ℹ️ **Instructions** ℹ️
>
> Use the **coalesce** function and the number of **partitions** derived above to write parquet files to your local directory 
>
>*You may use as many coding cells as necessary.*

In [34]:
#from pyspark.sql import SparkSession
print(rdd)
# Create a Spark session
#spark = SparkSession.builder.appName("WriteParquetFiles").getOrCreate()



# Get the number of partitions
#num_partitions = df.rdd.getNumPartitions()

# Use the coalesce function
#df = df.coalesce(1)

# Write the data as parquet files to the local directory
#output_path = "../Downloads/output_file"
#df.write.parquet(output_path, mode="overwrite")

# Stop the Spark session
#spark.stop()
 


#rdd_coalesced = rdd.coalesce(partitions)

#rdd_coalesced.toDF().write.parquet("output_file1")
 
#TODO: Write your code here
save_parquet = data_1962.coalesce(2).write.parquet("processing_big_data_final")
#where i wrote 8 it will be the number of partitions
#and the stocks1962 will be your dataframe 



#number_of_partitions = data_1962.rdd.getNumPartitions()
#data_1962.coalesce(number_of_partitions // 2).write.parquet("/path/to/local/directory/kamza")





PythonRDD[290] at RDD at PythonRDD.scala:53


AnalysisException: path file:/C:/Users/lehumo  MOGOBA/Downloads/processing_big_data_final already exists.

In [46]:


# Import the necessary functions
from pyspark.sql.functions import count

# Count the number of times each stock appears in the DataFrame
stock_counts = data_1962.groupBy('stock').count()

# Count the number of unique stocks in the DataFrame
num_stocks = stock_counts.count()

# Print the results
print(f"Number of unique stocks: {num_stocks}")
print("Stock counts:")
stock_counts.show()



Number of unique stocks: 21
Stock counts:
+-----+-----+
|stock|count|
+-----+-----+
|   AA|  252|
|  XOM|  252|
|  DIS|  252|
|   PG|  252|
|   GT|  252|
|   MO|  252|
|  IBM|  252|
|  JNJ|  252|
|  CVX|  252|
|  DTE|  252|
|   BA|  242|
|   GE|  252|
|  HPQ|  309|
| ARNC|  231|
|  CAT|  252|
|   IP|  252|
|   FL|  252|
|   ED|  271|
|  NAV|  252|
|   KO|  252|
+-----+-----+
only showing top 20 rows

