In [0]:
# Import libraries
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
# Mapping the path of the directories
bronze_path = '/mnt/death-cases-covid19/Bronze'
silver_path = '/mnt/death-cases-covid19/Silver'
gold_path = '/mnt/death-cases-covid19/Gold'

In [0]:
# Mapping file path
file_path = '/mnt/death-cases-covid19/Bronze/death_cases_covid19'

In [0]:
# Load data
raw_df = spark.read.format("delta").load(file_path)
raw_df.show(10)

+-----------+-------------------+--------------+---------+--------------------+------------+----------+--------------------+------+----------+
|FECHA_CORTE|FECHA_FALLECIMIENTO|EDAD_DECLARADA|     SEXO|   CLASIFICACION_DEF|DEPARTAMENTO| PROVINCIA|            DISTRITO|UBIGEO|id_persona|
+-----------+-------------------+--------------+---------+--------------------+------------+----------+--------------------+------+----------+
|   20240102|           20210611|            21|MASCULINO|    Criterio SINADEF|  LAMBAYEQUE|  CHICLAYO|            CHICLAYO|140101|  24833991|
|   20240102|           20210317|            45|MASCULINO|Criterio serolÃ³gico|       PIURA|   SULLANA|             SULLANA|200601|  24761117|
|   20240102|           20210602|            62| FEMENINO|Criterio virolÃ³gico|         ICA|     PISCO|        SAN CLEMENTE|110507|  24767070|
|   20240102|           20210703|            75|MASCULINO|Criterio virolÃ³gico|    AREQUIPA|  AREQUIPA|          MIRAFLORES|040110|  24751741|

In [0]:
# Details fields
raw_df.printSchema()

root
 |-- FECHA_CORTE: integer (nullable = true)
 |-- FECHA_FALLECIMIENTO: integer (nullable = true)
 |-- EDAD_DECLARADA: integer (nullable = true)
 |-- SEXO: string (nullable = true)
 |-- CLASIFICACION_DEF: string (nullable = true)
 |-- DEPARTAMENTO: string (nullable = true)
 |-- PROVINCIA: string (nullable = true)
 |-- DISTRITO: string (nullable = true)
 |-- UBIGEO: string (nullable = true)
 |-- id_persona: integer (nullable = true)



In [0]:
# Remove unnecessary columns
filter_columns_df = raw_df.drop(col('FECHA_CORTE'), col('CLASIFICACION_DEF'), col('id_persona'))
filter_columns_df.show(10)

+-------------------+--------------+---------+------------+----------+--------------------+------+
|FECHA_FALLECIMIENTO|EDAD_DECLARADA|     SEXO|DEPARTAMENTO| PROVINCIA|            DISTRITO|UBIGEO|
+-------------------+--------------+---------+------------+----------+--------------------+------+
|           20210611|            21|MASCULINO|  LAMBAYEQUE|  CHICLAYO|            CHICLAYO|140101|
|           20210317|            45|MASCULINO|       PIURA|   SULLANA|             SULLANA|200601|
|           20210602|            62| FEMENINO|         ICA|     PISCO|        SAN CLEMENTE|110507|
|           20210703|            75|MASCULINO|    AREQUIPA|  AREQUIPA|          MIRAFLORES|040110|
|           20210506|            66| FEMENINO|        LIMA|      LIMA|SAN JUAN DE LURIG...|150132|
|           20210321|            39|MASCULINO|        LIMA|      LIMA|SAN JUAN DE LURIG...|150132|
|           20210325|            39|MASCULINO|      ANCASH|   HUARMEY|             HUARMEY|021101|
|         

In [0]:
# Rename columns
name_columns = ['death_date', 'age', 'sex', 'department', 'province', 'district', 'ubigeo']
rename_columns_df = filter_columns_df.toDF(*name_columns)

rename_columns_df.show(10)

+----------+---+---------+----------+----------+--------------------+------+
|death_date|age|      sex|department|  province|            district|ubigeo|
+----------+---+---------+----------+----------+--------------------+------+
|  20210611| 21|MASCULINO|LAMBAYEQUE|  CHICLAYO|            CHICLAYO|140101|
|  20210317| 45|MASCULINO|     PIURA|   SULLANA|             SULLANA|200601|
|  20210602| 62| FEMENINO|       ICA|     PISCO|        SAN CLEMENTE|110507|
|  20210703| 75|MASCULINO|  AREQUIPA|  AREQUIPA|          MIRAFLORES|040110|
|  20210506| 66| FEMENINO|      LIMA|      LIMA|SAN JUAN DE LURIG...|150132|
|  20210321| 39|MASCULINO|      LIMA|      LIMA|SAN JUAN DE LURIG...|150132|
|  20210325| 39|MASCULINO|    ANCASH|   HUARMEY|             HUARMEY|021101|
|  20210706| 31| FEMENINO|      LIMA|      LIMA|                LIMA|150101|
|  20210530| 75| FEMENINO|  APURIMAC|CHINCHEROS|           COCHARCAS|030603|
|  20210320| 57|MASCULINO|      LIMA|      LIMA|               RIMAC|150128|

In [0]:
#Checking for null columns
death_date_nulls = rename_columns_df.filter(col('death_date').isNull()).count()
age_nulls = rename_columns_df.filter(col('age').isNull()).count()
sex_nulls = rename_columns_df.filter(col('sex').isNull()).count()
department_nulls = rename_columns_df.filter(col('department').isNull()).count()
province_nulls = rename_columns_df.filter(col('province').isNull()).count()
district_nulls = rename_columns_df.filter(col('district').isNull()).count()
ubigeo_nulls = rename_columns_df.filter(col('ubigeo').isNull()).count()

print('death_date: ', death_date_nulls)
print('age: ', age_nulls)
print('sex: ', sex_nulls)
print('department: ', department_nulls)
print('province: ', province_nulls)
print('district: ', district_nulls)
print('ubigeo: ', ubigeo_nulls)

death_date:  0
age:  0
sex:  0
department:  0
province:  5
district:  5
ubigeo:  0


In [0]:
# Replace nulls - column province
# -- calculta mode
province_mode = rename_columns_df.groupBy('province').count().orderBy('count', ascending=False).first()['province']
print('mode: ', province_mode)

# --replace
replace_null_province_df = rename_columns_df.fillna(province_mode, subset=['province'])

# --checkinig
checkin_null = replace_null_province_df.filter(col('province').isNull()).count()
print('count null: ', checkin_null)

mode:  LIMA
count null:  0


In [0]:
# Replace nulls - column district
# --calculate mode
district_mode = replace_null_province_df.groupBy('district').count().orderBy('count', ascending=False).first()['district']
print('mode: ', district_mode)

# --replace
replace_null_district_df = replace_null_province_df.fillna(district_mode, subset=['district'])

# --checking
checking_null = replace_null_district_df.filter(col('district').isNull()).count()
print('count null: ', checking_null)

mode:  LIMA
count null:  0


In [0]:
#Checking for null columns
death_date_nulls = replace_null_district_df.filter(col('death_date').isNull()).count()
age_nulls = replace_null_district_df.filter(col('age').isNull()).count()
sex_nulls = replace_null_district_df.filter(col('sex').isNull()).count()
department_nulls = replace_null_district_df.filter(col('department').isNull()).count()
province_nulls = replace_null_district_df.filter(col('province').isNull()).count()
district_nulls = replace_null_district_df.filter(col('district').isNull()).count()
ubigeo_nulls = replace_null_district_df.filter(col('ubigeo').isNull()).count()

print('death_date: ', death_date_nulls)
print('age: ', age_nulls)
print('sex: ', sex_nulls)
print('department: ', department_nulls)
print('province: ', province_nulls)
print('district: ', district_nulls)
print('ubigeo: ', ubigeo_nulls)

death_date:  0
age:  0
sex:  0
department:  0
province:  0
district:  0
ubigeo:  0


In [0]:
filter_df = replace_null_district_df
filter_df.show(10)

+----------+---+---------+----------+----------+--------------------+------+
|death_date|age|      sex|department|  province|            district|ubigeo|
+----------+---+---------+----------+----------+--------------------+------+
|  20210611| 21|MASCULINO|LAMBAYEQUE|  CHICLAYO|            CHICLAYO|140101|
|  20210317| 45|MASCULINO|     PIURA|   SULLANA|             SULLANA|200601|
|  20210602| 62| FEMENINO|       ICA|     PISCO|        SAN CLEMENTE|110507|
|  20210703| 75|MASCULINO|  AREQUIPA|  AREQUIPA|          MIRAFLORES|040110|
|  20210506| 66| FEMENINO|      LIMA|      LIMA|SAN JUAN DE LURIG...|150132|
|  20210321| 39|MASCULINO|      LIMA|      LIMA|SAN JUAN DE LURIG...|150132|
|  20210325| 39|MASCULINO|    ANCASH|   HUARMEY|             HUARMEY|021101|
|  20210706| 31| FEMENINO|      LIMA|      LIMA|                LIMA|150101|
|  20210530| 75| FEMENINO|  APURIMAC|CHINCHEROS|           COCHARCAS|030603|
|  20210320| 57|MASCULINO|      LIMA|      LIMA|               RIMAC|150128|

#### From the bronze layer to the silver layer

In [0]:
# Writing the data to the silver layer in delta format
filter_df.write.mode("overwrite").format("delta").save(f"{silver_path}/death_cases_covid19")

#### reading data from the silver layer

In [0]:
# Mapping file path
file_path_silver = '/mnt/death-cases-covid19/Silver/death_cases_covid19'

In [0]:
# Load data from silver layer
df_silver = spark.read.format("delta").load(file_path_silver)
df_silver.show(10)

+----------+---+---------+----------+--------+--------------------+------+
|death_date|age|      sex|department|province|            district|ubigeo|
+----------+---+---------+----------+--------+--------------------+------+
|  20210127| 65| FEMENINO|       ICA| CHINCHA|        CHINCHA ALTA|110201|
|  20200618| 62| FEMENINO|  AREQUIPA|AREQUIPA|   ALTO SELVA ALEGRE|040102|
|  20200522| 73| FEMENINO|      LIMA|    LIMA|SAN MARTIN DE PORRES|150135|
|  20200618| 68|MASCULINO|      LIMA|    LIMA|          CARABAYLLO|150106|
|  20200730| 74|MASCULINO|      LIMA|    LIMA|SAN JUAN DE LURIG...|150132|
|  20200730| 63| FEMENINO|  AREQUIPA|AREQUIPA|            AREQUIPA|040101|
|  20200402| 75| FEMENINO|       ICA|     ICA|                 ICA|110101|
|  20210226| 58|MASCULINO|      LIMA|    LIMA|          LOS OLIVOS|150117|
|  20200802| 50|MASCULINO|  AMAZONAS|   BAGUA|               BAGUA|010201|
|  20210117| 71| FEMENINO|      LIMA|    LIMA|          CHORRILLOS|150108|
+----------+---+---------

In [0]:
# Filter sex column
filter_sex_columns_df =  df_silver.withColumn('sex', when(lower(col('sex')) == 'masculino', 'M')\
                        .when(lower(col('sex')) == 'femenino', 'F').otherwise(col('sex')) )
filter_sex_columns_df.show()

+----------+---+---+-----------+--------+--------------------+------+
|death_date|age|sex| department|province|            district|ubigeo|
+----------+---+---+-----------+--------+--------------------+------+
|  20210127| 65|  F|        ICA| CHINCHA|        CHINCHA ALTA|110201|
|  20200618| 62|  F|   AREQUIPA|AREQUIPA|   ALTO SELVA ALEGRE|040102|
|  20200522| 73|  F|       LIMA|    LIMA|SAN MARTIN DE PORRES|150135|
|  20200618| 68|  M|       LIMA|    LIMA|          CARABAYLLO|150106|
|  20200730| 74|  M|       LIMA|    LIMA|SAN JUAN DE LURIG...|150132|
|  20200730| 63|  F|   AREQUIPA|AREQUIPA|            AREQUIPA|040101|
|  20200402| 75|  F|        ICA|     ICA|                 ICA|110101|
|  20210226| 58|  M|       LIMA|    LIMA|          LOS OLIVOS|150117|
|  20200802| 50|  M|   AMAZONAS|   BAGUA|               BAGUA|010201|
|  20210117| 71|  F|       LIMA|    LIMA|          CHORRILLOS|150108|
|  20200614| 67|  M|LA LIBERTAD|TRUJILLO|   FLORENCIA DE MORA|130103|
|  20200715| 69|  M|

In [0]:
# Filter column death_date
filter_date_columns_df = filter_sex_columns_df.withColumn('death_date', expr("to_date(cast(death_date as string), 'yyyyMMdd')"))
filter_date_columns_df.show()

+----------+---+---+-----------+--------+--------------------+------+
|death_date|age|sex| department|province|            district|ubigeo|
+----------+---+---+-----------+--------+--------------------+------+
|2021-01-27| 65|  F|        ICA| CHINCHA|        CHINCHA ALTA|110201|
|2020-06-18| 62|  F|   AREQUIPA|AREQUIPA|   ALTO SELVA ALEGRE|040102|
|2020-05-22| 73|  F|       LIMA|    LIMA|SAN MARTIN DE PORRES|150135|
|2020-06-18| 68|  M|       LIMA|    LIMA|          CARABAYLLO|150106|
|2020-07-30| 74|  M|       LIMA|    LIMA|SAN JUAN DE LURIG...|150132|
|2020-07-30| 63|  F|   AREQUIPA|AREQUIPA|            AREQUIPA|040101|
|2020-04-02| 75|  F|        ICA|     ICA|                 ICA|110101|
|2021-02-26| 58|  M|       LIMA|    LIMA|          LOS OLIVOS|150117|
|2020-08-02| 50|  M|   AMAZONAS|   BAGUA|               BAGUA|010201|
|2021-01-17| 71|  F|       LIMA|    LIMA|          CHORRILLOS|150108|
|2020-06-14| 67|  M|LA LIBERTAD|TRUJILLO|   FLORENCIA DE MORA|130103|
|2020-07-15| 69|  M|

In [0]:
# add columns year, month, day
df = filter_date_columns_df.withColumn('year_death', year(col('death_date')))\
    .withColumn('month_death', date_format(col('death_date'), 'MMMM'))\
    .withColumn('day_death', date_format(col('death_date'), 'EEEE'))
df.show()

+----------+---+---+-----------+----------------+--------------------+------+----------+-----------+---------+
|death_date|age|sex| department|        province|            district|ubigeo|year_death|month_death|day_death|
+----------+---+---+-----------+----------------+--------------------+------+----------+-----------+---------+
|2021-02-28| 68|  M|       LIMA|            LIMA|                 ATE|150103|      2021|   February|   Sunday|
|2020-09-16| 84|  M|LA LIBERTAD|        TRUJILLO|            TRUJILLO|130101|      2020|  September|Wednesday|
|2020-05-08| 65|  M|     LORETO|          MAYNAS|             IQUITOS|160101|      2020|        May|   Friday|
|2020-12-15| 88|  M|     CALLAO|          CALLAO|          BELLAVISTA|070102|      2020|   December|  Tuesday|
|2021-02-20| 63|  F|   AYACUCHO|        HUAMANGA|            AYACUCHO|050101|      2021|   February| Saturday|
|2020-04-28| 63|  M|LA LIBERTAD|        TRUJILLO|              LAREDO|130106|      2020|      April|  Tuesday|
|

In [0]:
# remove column death_date
df = df.drop(col('death_date'))
df.show()

+---+---+-----------+----------------+--------------------+------+----------+-----------+---------+
|age|sex| department|        province|            district|ubigeo|year_death|month_death|day_death|
+---+---+-----------+----------------+--------------------+------+----------+-----------+---------+
| 68|  M|       LIMA|            LIMA|                 ATE|150103|      2021|   February|   Sunday|
| 84|  M|LA LIBERTAD|        TRUJILLO|            TRUJILLO|130101|      2020|  September|Wednesday|
| 65|  M|     LORETO|          MAYNAS|             IQUITOS|160101|      2020|        May|   Friday|
| 88|  M|     CALLAO|          CALLAO|          BELLAVISTA|070102|      2020|   December|  Tuesday|
| 63|  F|   AYACUCHO|        HUAMANGA|            AYACUCHO|050101|      2021|   February| Saturday|
| 63|  M|LA LIBERTAD|        TRUJILLO|              LAREDO|130106|      2020|      April|  Tuesday|
| 82|  M|      PIURA|           PAITA|            VICHAYAL|200507|      2020|        May|   Monday|


In [0]:
# Sort columns
columns_order = ['year_death', 'month_death', 'day_death', 'age', 'sex', 'ubigeo', 'department', 'province', 'district']
df = df.select(*columns_order)
df.show()

+----------+-----------+---------+---+---+------+-----------+----------------+--------------------+
|year_death|month_death|day_death|age|sex|ubigeo| department|        province|            district|
+----------+-----------+---------+---+---+------+-----------+----------------+--------------------+
|      2021|   February|   Sunday| 68|  M|150103|       LIMA|            LIMA|                 ATE|
|      2020|  September|Wednesday| 84|  M|130101|LA LIBERTAD|        TRUJILLO|            TRUJILLO|
|      2020|        May|   Friday| 65|  M|160101|     LORETO|          MAYNAS|             IQUITOS|
|      2020|   December|  Tuesday| 88|  M|070102|     CALLAO|          CALLAO|          BELLAVISTA|
|      2021|   February| Saturday| 63|  F|050101|   AYACUCHO|        HUAMANGA|            AYACUCHO|
|      2020|      April|  Tuesday| 63|  M|130106|LA LIBERTAD|        TRUJILLO|              LAREDO|
|      2020|        May|   Monday| 82|  M|200507|      PIURA|           PAITA|            VICHAYAL|


#### From the Silver layer to the Gold layer

In [0]:
df.write.mode("overwrite").format("delta").save(f"{gold_path}/death_cases_covid19")

In [0]:
# Checking data of the Gold layer
path = '/mnt/death-cases-covid19/Gold/death_cases_covid19'
gold_df = spark.read.format('delta').load(path)

gold_df.show(10)

+----------+-----------+---------+---+---+------+-----------+---------+--------------------+
|year_death|month_death|day_death|age|sex|ubigeo| department| province|            district|
+----------+-----------+---------+---+---+------+-----------+---------+--------------------+
|      2021|       June| Thursday| 75|  M|130704|LA LIBERTAD|PACASMAYO|           PACASMAYO|
|      2021|       June|  Tuesday| 79|  M|130101|LA LIBERTAD| TRUJILLO|            TRUJILLO|
|      2021|        May| Thursday| 61|  M|140101| LAMBAYEQUE| CHICLAYO|            CHICLAYO|
|      2021|      April|   Friday| 37|  M|110501|        ICA|    PISCO|               PISCO|
|      2021|      March|  Tuesday| 83|  M|140108| LAMBAYEQUE| CHICLAYO|             MONSEFU|
|      2021|   November|   Sunday| 61|  F|150142|       LIMA|     LIMA|   VILLA EL SALVADOR|
|      2021|      April|   Friday| 52|  F|150103|       LIMA|     LIMA|                 ATE|
|      2021|      March| Thursday| 84|  F|150135|       LIMA|     LIMA