## 1. Introduction

This notebook shows how to connect Jupyter notebooks to a Spark Cluster, read a local CSV and store it to Hadoop as partitioned parquet files.

## 2. Connection to Spark Cluster

To connect to the Spark cluster, create a SparkSession object with the following params:

+ **appName:** application name displayed at the [Spark Master Web UI](http://localhost:8080/);
+ **master:** Spark Master URL, same used by Spark Workers;
+ **spark.executor.memory:** must be less than or equals to docker compose SPARK_WORKER_MEMORY config.

In [None]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.\
        builder.\
        appName("pyspark-notebook").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "512m").\
        getOrCreate()

## 3. Load and Store Data
We will now load data from a local CSV and store it to Hadoop partitioned by column.
Afterward you can access Hadoop UI to explore the saved parquet files.
Access Hadoop UI on 'http://localhost:9870' (Utilities -> Browse the files system )

In [None]:
import pandas
from pyspark.sql.types import *
from pyspark.sql import functions as F
import os
import time    
epochNow = int(time.time())

In [None]:
#Iterate over all files until we find the sales file and then creates a Pandas dataframe.
for path, subdirs, files in os.walk('./data/'):
    for name in files:
        if "salesRecord" in name:
            csvName = name
            csvPath = os.path.join(path, name)
            print("Loading data from csv {}".format(csvPath))
            salesDfPandas = pandas.read_csv(csvPath)

In [None]:
#Create PySpark DataFrame from Pandas
salesDfSpark=spark.createDataFrame(salesDfPandas)

In [None]:
#Remove spaces in column names
salesDfSpark = salesDfSpark.select([F.col(col).alias(col.replace(' ', '_')) for col in salesDfSpark.columns])
print("Sales Dataframe created with schema : ")
salesDfSpark.printSchema()

In [None]:
# Write Dataframe into HDFS
# Repartition it by "Country" column before storing as parquet files in Hadoop
salesDfSpark.write.option("header",True) \
        .partitionBy("Country") \
        .mode("overwrite") \
        .parquet("hdfs://hadoop-namenode:9000/sales/{}_{}.parquet".format(csvName,epochNow))
print("Sales Dataframe stored in Hadoop.")

In [None]:
# Read from HDFS to confirm it was successfully stored
df_load = spark.read.parquet("hdfs://hadoop-namenode:9000/sales/{}_{}.parquet".format(csvName,epochNow))
print("Sales Dataframe read from Hadoop : ")
df_load.show()