In [None]:
import os
import pandas as pd
import csv

from pyspark.sql import SparkSession

from helper import spark_rename_column, pandas_rename_column
from static import COLUMN_NAMES_DICT, COLUMN_TYPES_DICT

## Load source files to Spark and generate parquet files

In this first step, we have several files in the `archive/` directory, we want to load these CSV files as Spark DataFrames and then transform them to parquet files inside the `spark_output/` folder.

In order to achieve this, we have the following steps:

* We build a SparkSession object (This is connected to my Dockerized environment: JupyterLab + 1 Master Node + 2 Worker Nodes)
* We get a list of the files in the source folder programmatically, using a list comprehension with methods from the `os` library.
* We also make sure the output directory exists before processing the files, using a simple `if` statement.
* This might not be the tidiest way to do it, but for each source file, I defined a nested dictionary to write the column name and type changes we need to perform so Spark is able to write the parquet files. The dictionaries are in the `static.py` file. There are additional helping functions on the `helper.py` file as well.
* Finally, we process each file by reading the CSV into a Spark DataFrame, change the column names/types and then generate the parquet file.

In [None]:
# We build the Spark Session object
spark = SparkSession.builder.master("local[*]").appName("n5-challenge").getOrCreate()

In [None]:
# Get a list of all the source files inside the folder
filenames = sorted([f.replace('.csv','') for f in os.listdir('archive') if '.csv' in f])

# We make sure the spark_output folder exists, so we can store the resulting parquets
if not os.path.isdir('spark_output'):
    os.mkdir('spark_output')
    
# We process each one of the source files
for f in filenames:
    # We read each CSV file into a Spark DataFrame
    sdf = spark.read.csv('archive/' + f + '.csv', header=True)
    
    # We perform the column changes using the name and type dictionaries
    names = COLUMN_NAMES_DICT[f]
    for c in names.keys():
        sdf = sdf.withColumnRenamed(c, names[c])
        
    for col_type in COLUMN_TYPES_DICT[f].keys():
        for col_name in COLUMN_TYPES_DICT[f][col_type]:
            sdf = spark_rename_column(sdf, col_name, col_type)
        
    # We generate the parquet file
    sdf.write.mode('overwrite').parquet('spark_output/' + f)

## Spark-generated parquet files validation

In [None]:
for f in filenames:
    sdf = spark.read.parquet('spark_output/' + f)
    sdf.show(5)
    sdf.printSchema()

## Load the source files into `pandas` and generate parquet files

Following the same logic, we want to load these CSV files as pandas DataFrames and then transform them to parquet files inside the `pandas_output/` folder.

In order to achieve this, we have the following steps:

* We use the list of the files in the source folder that we got in the previous step.
* We also make sure the output directory exists before processing the files, using a simple `if` statement.
* Finally, we process each file by reading the CSV into a pandas DataFrame, change the column names/types and then generate the parquet file.

In [None]:
# We make sure the pandas_output folder exists, so we can store the resulting parquets
if not os.path.isdir('pandas_output'):
    os.mkdir('pandas_output')
    
# We process each one of the source files
for f in filenames:
    # We read each CSV file into a pandas DataFrame
    pdf = pd.read_csv('archive/' + f + '.csv')
    
    # We perform the column changes using the name and type dictionaries
    pdf = pdf.rename(columns=COLUMN_NAMES_DICT[f])
    
    for col_type in COLUMN_TYPES_DICT[f].keys():
        for col_name in COLUMN_TYPES_DICT[f][col_type]:
            pdf[col_name] = pandas_rename_column(pdf, col_name, col_type)
        
    # We generate the parquet file
    pdf.to_parquet('pandas_output/' + f + '.parquet')

## pandas-generated parquet files validation

In [None]:
for f in filenames:
    pdf = pd.read_parquet('pandas_output/' + f + '.parquet')
    print(pdf.head())
    print(pdf.dtypes)