# Exporting CSV as Parquet
**Application**: Csv to Parquet<br>
**Creator**: Leandro da Silva<br>
**Date**: 2021-02-12<br>
**Description**:<br>
  - The Application has to load a csv file, from the input folder, to a Spark Dataframe.<br>
  - This CSV file contains duplicated records that must be removed, using the following business rule:<br>
     - Keep only the most recent records based on the Id and Update_Date Columns.<br>
  - After the deduplication, the application has to use a config json file to execute a data type mapping.<br>
  - Finally, the application has to export a parquet file to the output folder.<br>

**Architect decissions**: <br>
  - Why Parquet file as the output? <br>
    Parquet is the most common and widely used columnar file right now. <br>
    It has use cases for data lakes with easy integrations with cloud solutions, such as AWS Glue, and Redshift Spectrum, as an example. <br>
  - The service type used to deploy the spark cluster?<br>
    Due to the small amount of data to handle, there was no need to over complicate the architecture with options  as AWS EMR cluster or even other cloud serveless solutions, such as AWS Glue. Instead, I used a local spark service on Mac OS which was more than capable of handled it. <br>
    

In [1]:
# ------------------------ #
# Loading Python Libraries #
# ------------------------ #
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
import pyspark.sql.functions as F
import json
import gc
from os import listdir

In [2]:
# ------------------------------- #
# Defning the Spark Configuration #
# ------------------------------- #
config = SparkConf().setAll(
    [
        # Defining the number of cores to use on each executor.
        ("spark.executor.cores", "2"),
        # Defining the number of maximum amount of CPU cores to request for the application from across the cluster.
        ("spark.cores.max", "2"),
        # Defining the amount of memory to use per executor process.
        ("spark.executor.memory", "1g"),
        # Defining the compression codec used when writing Parquet files.
        ("spark.sql.parquet.compression.codec", "gzip"),
    ]
)

# -------------------------------------------------------- #
# Spark Session creation using the configuration defintion #
# -------------------------------------------------------- #
spark = SparkSession.builder \
        .master("local[1]") \
        .config(conf=config) \
        .appName('Csv-to-Parquet') \
        .getOrCreate()

In [3]:
# ------------------- #
# CSV Input File Path #
# ------------------- # 
csv_file = './data/input/users/load.csv'

# ----------------------------------------- #
# Loading the CSV file to a Spark DataFrame #
# ----------------------------------------- #
df = spark.read.option("header", True).csv(csv_file)

In [4]:
# -------- #
# Raw Data #
# -------- #
df.show(truncate=False, vertical=True)

-RECORD 0-----------------------------------------------------
 id          | 1                                              
 name        | david.lynch@cognitivo.ai                       
 email       | David Lynch                                    
 phone       | (11) 99999-9997                                
 address     | Mulholland Drive, Los Angeles, CA, US          
 age         | 72                                             
 create_date | 2018-03-03 18:47:01.954752                     
 update_date | 2018-03-03 18:47:01.954752                     
-RECORD 1-----------------------------------------------------
 id          | 1                                              
 name        | david.lynch@cognitivo.ai                       
 email       | David Lynch                                    
 phone       | (11) 99999-9998                                
 address     | Mulholland Drive, Los Angeles, CA, US          
 age         | 72                                      

In [5]:
# --------------- #
# Original Schema #
# --------------- #
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- address: string (nullable = true)
 |-- age: string (nullable = true)
 |-- create_date: string (nullable = true)
 |-- update_date: string (nullable = true)



In [6]:
# -------------------- #
# Deduplicaton process #
# -------------------- #
# Clean Code - Windows function definition to do the PartitionBy clause
# Business Rule (Requirements.txt): 
#  1. Grouping by Id column.
#  2. Desc ordering by Update_date column.
wf_partitionBy = Window.partitionBy("id").orderBy(df["update_date"].desc())

# Running the deduplication
# Keeping only the most recent record based on Id and  Update_date Column (PartitionBy Definition above)
# Where the row_number = 1 will always be for the most recent record. 
df_dedup = df.select("*", F.row_number().over(wf_partitionBy).alias("row_number")) \
    .where('row_number = 1') \
    .drop('row_number') \
    .orderBy("id")

In [7]:
# ------------------------------ #
# Data after removing duplicates #
# ------------------------------ #
df_dedup.show(truncate=False, vertical=True)

-RECORD 0-----------------------------------------------------
 id          | 1                                              
 name        | david.lynch@cognitivo.ai                       
 email       | David Lynch                                    
 phone       | (11) 99999-9999                                
 address     | Mulholland Drive, Los Angeles, CA, US          
 age         | 72                                             
 create_date | 2018-03-03 18:47:01.954752                     
 update_date | 2018-05-23 10:13:59.594752                     
-RECORD 1-----------------------------------------------------
 id          | 2                                              
 name        | sherlock.holmes@cognitivo.ai                   
 email       | Sherlock Holmes                                
 phone       | (11) 94815-1623                                
 address     | 221B Baker Street, London, UK                  
 age         | 34                                      

In [8]:
# ------------------------------------- #
# Data schema after removing duplicates #
# ------------------------------------- #
df_dedup.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- address: string (nullable = true)
 |-- age: string (nullable = true)
 |-- create_date: string (nullable = true)
 |-- update_date: string (nullable = true)



In [9]:
# -------------------- #
# Type Mapping process #
# -------------------- #
# Function responsible to execute the cast of the dataframe data types
def df_cast_column_type(item):
    try:
        json_file = open("./config/types_mapping.json")
        json_object = json.load(json_file)

        for col in item.columns:
            for (key, value) in json_object.items():
                if (col.lower() == key.lower()):                
                    item = item.withColumn(
                        col, F.col(col).cast(value)
                    )
                    
        return item
    except Exception as error:
        print(error)
    finally:
        if json_file:
            json_file.close()

In [10]:
# Executing the type mappings using the function "df_cast_column_type"
df_cast = df_cast_column_type(df_dedup)

In [11]:
# ----------------------------- #
# Data after data type mappings #
# ----------------------------- #
df_cast.show(truncate=False, vertical=True)

-RECORD 0-----------------------------------------------------
 id          | 1                                              
 name        | david.lynch@cognitivo.ai                       
 email       | David Lynch                                    
 phone       | (11) 99999-9999                                
 address     | Mulholland Drive, Los Angeles, CA, US          
 age         | 72                                             
 create_date | 2018-03-03 18:47:01.954752                     
 update_date | 2018-05-23 10:13:59.594752                     
-RECORD 1-----------------------------------------------------
 id          | 2                                              
 name        | sherlock.holmes@cognitivo.ai                   
 email       | Sherlock Holmes                                
 phone       | (11) 94815-1623                                
 address     | 221B Baker Street, London, UK                  
 age         | 34                                      

In [12]:
# ------------------------------------ #
# Data schema after data type mappinds #
# ------------------------------------ #
df_cast.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- address: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- create_date: timestamp (nullable = true)
 |-- update_date: timestamp (nullable = true)



In [13]:
# -------------------- #
# Exporting to Parquet #
# -------------------- #
output = "./data/output"
df_cast.repartition(1).write.mode("overwrite").parquet(output)

In [14]:
# --------------------- #
# Listing output folder #
# --------------------- #
print(listdir(output))

['part-00000-f87e3b1d-9887-4b3f-93e0-80d98178be6a-c000.gz.parquet', '._SUCCESS.crc', '_SUCCESS', '.part-00000-f87e3b1d-9887-4b3f-93e0-80d98178be6a-c000.gz.parquet.crc']


In [15]:
del df_cast
del df_dedup
del df
gc.collect()

438