`author:` Prashant Prasad Kanth</br>
`date:` 25/09/2022 'MM/DD/YYY'

In [7]:
from pyspark.sql.functions import col, length, lit
import os
import re
from datetime import datetime

In [29]:
spark.sql("CREATE DATABASE IF NOT EXISTS wdi_curated")
spark.sql("CREATE DATABASE IF NOT EXISTS eea_curated")

DataFrame[]

In [30]:
# spark.sql("DROP DATABASE wdi_curated")
# spark.sql("DROP DATABASE eea_curated")

### World Development Indicators data

In [65]:
wdi_data = ['datalake/raw/world_development_indicators/date=20220922/WDIData.csv',
            'datalake/raw/world_development_indicators/date=20220922/WDICountry.csv',
            'datalake/raw/world_development_indicators/date=20220922/WDISeries.csv']
wdi_write_path = ['datalake/curated/wdi/data', 'datalake/curated/wdi/country', 'datalake/curated/wdi/series']

In [26]:
spark.sql("DESC DATABASE extended eea_curated")

DataFrame[info_name: string, info_value: string]

In [83]:
for readpath, writepath in zip(wdi_data, wdi_write_path):
    filename = readpath.split('/')[-1]
    print(f"{'--'*20}{filename}{'--'*20}")
    
    # read files into dataframe
    df = spark.read.options(header=True, inferSchema=True).csv(readpath)
    print(f"Total rows: {df.count()}")
    
    # Replace spaces in column names with underscores (“_”) for all DataFrames
    column_names = [col_name.replace(' ','_') for col_name in df.columns]
    df = df.toDF(*column_names)
    # null column read with null header
    if filename == "WDIData.csv":
        df = df.drop("_c66")
    
    # Drop records that only consist of null values (records with null values on all columns)
    df = df.na.drop("all")
    print(f"Rows after dropping null records: {df.count()}")
    
    # Drop duplicate records
    df = df.dropDuplicates()
    print(f"Rows after dropping duplicate records: {df.count()}")
    
    # For the WDICountry.csv and WDIData.csv files, drop all records that have a country code
    # (column: Country_Code) with a size other than three
    if filename in ['WDICountry.csv', 'WDIData.csv']:
        cnt = df.filter(length(col('Country_Code')) != 3).count()
        print(f"Number of records that have country code with a size other than 3: {cnt}")
        if cnt > 0:
            df = df.filter(length(col('Country_Code')) == 3)
            print(f"Rows after dropping records with len(country_code) !=3: {df.count()}")
    
    # For WDISeries.csv, drop all records that contain a space character (" ") in the Series_Code column
    if filename == 'WDISeries.csv':
        cnt = df.filter(col('Series_Code').contains(" ")).count()
        print(f"Number of records that contain a space character in Series_Code column: {cnt}")
        if cnt > 0:
            df = df.filter(~col('Series_Code').contains(" "))
            print(f"Rows after dropping records with a space in series_code column: {df.count()}")
    print(f"Rows retained after cleaning: {df.count()}")
    
    # Write data to curated layer
    current_time = datetime.now()
    df.withColumn("year", lit(current_time.year))\
    .withColumn("month", lit(current_time.month))\
    .withColumn("day", lit(current_time.day))\
    .write.partitionBy("year","month","day").parquet(writepath)
    print("Data written in Parquet format at: {}".format(writepath))

----------------------------------------WDIData.csv----------------------------------------
Total rows: 383572
Rows after dropping null records: 383572
Rows after dropping duplicate records: 383572
Number of records that have country code with a size other than 3: 0
Rows retained after cleaning: 383572
Data written in Parquet format at: datalake/curated/wdi/data
----------------------------------------WDICountry.csv----------------------------------------
Total rows: 278
Rows after dropping null records: 278
Rows after dropping duplicate records: 278
Number of records that have country code with a size other than 3: 13
Rows after dropping records with len(country_code) !=3: 265
Rows retained after cleaning: 265
Data written in Parquet format at: datalake/curated/wdi/country
----------------------------------------WDISeries.csv----------------------------------------
Total rows: 4282
Rows after dropping null records: 4282
Rows after dropping duplicate records: 2323
Number of records tha

### CO2 emissions data

In [1]:
co2emission_data_path = 'datalake/raw/co2_passenger_cars_emissions'
co2emission_write_path = 'datalake/curated/co2_emissions'

In [2]:
co2emission_data = []
for year in os.listdir(co2emission_data_path):
    year_data = os.path.join(co2emission_data_path, year)
    for file in os.listdir(year_data):
        co2emission_data.append(os.path.join(year_data, file))
print(co2emission_data)

['datalake/raw/co2_passenger_cars_emissions/year=2019/co2_emissions_passenger_cars_2019.json', 'datalake/raw/co2_passenger_cars_emissions/year=2017/co2_emissions_passenger_cars_2017.json', 'datalake/raw/co2_passenger_cars_emissions/year=2018/co2_emissions_passenger_cars_2018.json']


In [9]:
print(f"{'--'*10}Processing co2emissions data{'--'*10}")
co2_cars_emissions = spark.read.json(co2emission_data)
print(f"Total rows: {co2_cars_emissions.count()}")

# Replace spaces in column names with underscores (“_”) for all DataFrames
column_names = [col_name.replace(' ','_') for col_name in co2_cars_emissions.columns]
# '(', ')' and '/' errors out when saving as parquet, with invalid characters, replacing them
column_names = [col_name.replace('(','').replace(')','').replace('/','per') for col_name in column_names]
co2_cars_emissions = co2_cars_emissions.toDF(*column_names)

# Drop records that only consist of null values (records with null values on all columns)
co2_cars_emissions = co2_cars_emissions.na.drop("all")
print(f"Rows after dropping null records: {co2_cars_emissions.count()}")

# Drop duplicate records
co2_cars_emissions = co2_cars_emissions.dropDuplicates()
print(f"Rows after dropping duplicate records: {co2_cars_emissions.count()}")

# Drop all records that have a member state code size other than two (column: MS) 
# and that contain any character other than uppercase letters in this column
co2_cars_emissions = co2_cars_emissions.filter((length(col('MS'))==2) & (col('MS').rlike("^[A-Z]*$")))
print(f"Rows after processing MS column: {co2_cars_emissions.count()}")
print(f"Rows retained after cleaning: {co2_cars_emissions.count()}")

# Write data to curated layer
co2_cars_emissions.write.partitionBy("year").parquet(co2emission_write_path)
print("Data written in Parquet format at: {}".format(co2emission_write_path))

--------------------Processing co2emissions data--------------------
Total rows: 300000
Rows after dropping null records: 300000
Rows after dropping duplicate records: 300000
Rows after processing MS column: 299996
Rows retained after cleaning: 299996
Data written in Parquet format at: datalake/curated/co2_emissions
