# COVID 19 ETL

## 1- Importando funciones y clases

In [118]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType
from pyspark.sql.functions import max, col, expr, when, isnan, lower, explode, dayofmonth, month, year, lpad, lit, coalesce
from pyspark.sql import DataFrame

from pydeequ.checks import Check, CheckLevel, ConstrainableDataTypes
from pydeequ.verification import VerificationResult, VerificationSuite
from pydeequ.analyzers import *

import json
import pandas as pd
import datetime

## 2- Definiendo la sesión de Spark

<code><b>DEQUU</b></code>: Deequ es una biblioteca construida sobre Apache Spark para definir "pruebas unitarias para datos", que miden la calidad de los datos en grandes conjuntos de datos. (PyDeequ es una interface para Dequu en Python)  
<code><b>HUDI</b></code>: Apache Hudi (pronunciado Hoodie) significa Hadoop Upserts Deletes and Incrementals. Hudi administra el almacenamiento de grandes conjuntos de datos analíticos en DFS (almacenes en la nube, HDFS, etc)

In [2]:
PYDEEQU = 'jars/deequ-2.0.0-spark-3.1.jar'
HUDI = 'jars/hudi-spark3.1.2-bundle_2.12-0.10.1.jar'

<code>Hudi</code> nos permitira hacer un manejo mas eficiente de los deltas de datos (SCD)

In [3]:
spark = SparkSession \
    .builder \
    .appName("Python Spark basic example") \
    .config("spark.jars", f'{PYDEEQU},{HUDI}') \
    .config("spark.driver.memory", "12g") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.driver.maxResultSize", "4g") \
    .config("spark.sql.sources.partitionOverwriteMode", "dynamic") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.hive.convertMetastoreParquet", "false") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

23/01/10 20:49:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


## 3- Definiendo funciones y clases a utilizar

In [36]:
class RddOperations:
    def __init__(self):
        pass
        
    def readCsvFileToRDD(self, path, sep=','):
        rddFromFile = spark.sparkContext.textFile(path)
        rddCentralMap = rddFromFile.map(
            lambda line: line.split(sep)
        )
        return rddCentralMap
    
    def RDDToDF(self, RDD, skip_head=True):
        dfFields = RDD.first()
        dfSchema = StructType([StructField(field_name, StringType(), True) for field_name in dfFields])
        if skip_head:
            RDD = rddCentralMap.zipWithIndex().filter(lambda kv: kv[1] > 2).keys()
        df = spark.createDataFrame(RDD, dfSchema)
        return df
        # df.write.parquet(
        #     path,
        #     mode='overwrite',
        #     compression='snappy'
        # )
        

# función mas eficiente que el plan de ejecución de .withColumn()
def withColumnOptimized(df: DataFrame, colExpr: str, name: str) -> DataFrame:
    columns = df.columns
    try:
        columns.remove(name)
    except ValueError:
        pass
    
    if type(colExpr) == str:
        columns_ = columns + [expr(colExpr).alias(name)]
    else:
        columns_ = columns + [colExpr.alias(name)]

    return df.select(*columns_)

def applyStructToDF(df: DataFrame, structSchema: StructType) -> DataFrame:
    for schema in structSchema:
        logic = when(
            isnan(col(schema.name)), 
            None
        ).otherwise(col(schema.name).cast(schema.dataType))
        df = withColumnOptimized(df, logic, schema.name)
    return df

def getStructFromJSON(json_schema: dict) -> StructType:
    return StructType.fromJson(json_schema)

def load_json(path):
    f = open(path)
    data = json.load(f)
    return data

class DataQualityAssessment:
    def __init__(self, df: DataFrame, spark: SparkSession):
        self.df = df
        self.spark = spark
    
    def covid_19_clean_complete(self) -> DataFrame:
        check = Check(self.spark, CheckLevel.Error, "Review Check")
        checkResult = VerificationSuite(self.spark) \
        .onData(self.df) \
        .addCheck(
            check.isComplete("province_state")
            # .isUnique("cod_idinmueble")
            .isComplete("date")
            # .isUnique("des_urlanuncio")
            .isComplete("who_region")
            .hasMin("lat", lambda x: x >= -90)
            .hasMax("lat", lambda x: x <= 90)
            .isComplete("lat")
            .hasMin("long", lambda x: x >= -180)
            .hasMax("long", lambda x: x <= 180)
            .isComplete("long")
            .isComplete("date")
            .isNonNegative("confirmed")
            .isComplete("confirmed")
            .isNonNegative("deaths")
            .isComplete("deaths")
            .isNonNegative("recovered")
            .isComplete("recovered")
            # .isNonNegative("active")
            .isComplete("active")
        ).run()

        checkResultDf = VerificationResult.checkResultsAsDataFrame(self.spark, checkResult)
        return checkResultDf
    
    
def addDefaultAuditColumns(df: DataFrame) -> DataFrame:
    df = withColumnOptimized(df, lit('ruben'), 'load_user')
    df = withColumnOptimized(df, expr('now()'), 'load_time')
    df = withColumnOptimized(df, expr('now()'), 'update_time')
    return df

## 4-Iniciando la carga de datos en stage

Iniciando la carga desde una capa <code>landing/raw</code> hacia una de <code>stage</code>, en esta capa tendremos los datos con el formato adecuado y validado, con eso podremos construir nuestro modelo en la siguiente capa (<code>analytics</code>)

Leemos la fuente de datos <code>covid_19_clean_complete.csv</code> y estandarizamos el nombre de las columnas

In [18]:
covid_19_clean_complete = spark.read.csv('raw/covid_19_clean_complete.csv', sep=',', header=True)

#Este código genera un plan físico simple que es fácil de optimizar para Catalyst. También es elegante.
corrected_columns = list(map(lambda x: x.lower().replace("/", "_").replace(' ', '_'), covid_19_clean_complete.columns))

covid_19_clean_complete = covid_19_clean_complete.toDF(*corrected_columns)
# covid_19_clean_complete.explain()

Para agregar el tipo de datos correcto a la tabla nos ayudamos del archivo <code>schema_covid_19_clean_complete.json</code> que tiene la estructura correcta de la tabla  
**nota:** al ser un archivo de tipo control podriamos integrarlo dentro de un modelo de tablas de ***control de procesos y parametros***

In [19]:
schema = load_json('conf_artifacts/schema_covid_19_clean_complete.json')
covid_19_clean_complete = applyStructToDF(covid_19_clean_complete, getStructFromJSON(schema))

#agregamos columnas de auditoria
covid_19_clean_complete = withColumnOptimized(covid_19_clean_complete, lit('ruben'), 'load_user')
covid_19_clean_complete = withColumnOptimized(covid_19_clean_complete, expr('cast(date as timestamp)'), 'load_date')

covid_19_clean_complete.printSchema()

root
 |-- province_state: string (nullable = true)
 |-- country_region: string (nullable = true)
 |-- lat: decimal(8,6) (nullable = true)
 |-- long: decimal(9,6) (nullable = true)
 |-- date: date (nullable = true)
 |-- confirmed: integer (nullable = true)
 |-- deaths: integer (nullable = true)
 |-- recovered: integer (nullable = true)
 |-- active: integer (nullable = true)
 |-- who_region: string (nullable = true)
 |-- load_user: string (nullable = false)
 |-- load_date: timestamp (nullable = true)



En este nivel (raw -> stage) deberiamos validar la calidad de datos recividos, esto lo podemos lograr de forma muy eficiente usando Dequu, a continuación un ejemplo

In [12]:
dqa = DataQualityAssessment(covid_19_clean_complete, spark)
quality = dqa.covid_19_clean_complete()
quality.show(20, False)

Python Callback server started!


                                                                                

+------------+-----------+------------+----------------------------------------------------------------------------------------------------------------------+-----------------+-------------------------------------------------------------------+
|check       |check_level|check_status|constraint                                                                                                            |constraint_status|constraint_message                                                 |
+------------+-----------+------------+----------------------------------------------------------------------------------------------------------------------+-----------------+-------------------------------------------------------------------+
|Review Check|Error      |Error       |CompletenessConstraint(Completeness(province_state,None))                                                             |Failure          |Value: 0.2988505747126437 does not meet the constraint requirement!|
|Review Check|Error 

Podemos observar que una de nuestras reglas de calidad que definimos falla (<code>isComplete(province_state)</code>)  
**nota:** Usando dequu podemos realizar pruebas de calidad complejas ya que el framework las tiene optimizadas y ademas podemos automatizar la limpieza automatica de los datos y rejecciones de los mismos  
**nota2:** Esa tabla de reporte de calidad (<code>quality</code>) se puede ir cargando a una tabla (por ejemplo en alguna del modelo de control de procesos) e ir graficando la evolucion de la calidad de los datos recibidos dia a dia (mes a mes, etc)

Finalmente cargamos la tabla ya validada y con sus respectivos tipos de datos en la capa <code>stage</code>  
Partitionamos por el campo <code>date</code>

In [None]:
%%time
covid_19_clean_complete.repartition(4).write.parquet(
            'stage/covid_19_clean_complete/',
            mode='overwrite',
            compression='snappy',
            partitionBy='date'
        )

### 4.1 Usando Pandas para cargar datos

Usaremos ahora pandas para cargar datos hacia la capa <code>stage</code> en formato <code>parquet</code> con compresión <code>snappy</code>

In [None]:
# !mkdir stage/worldometer_data_reduced

In [None]:
worldometer_data = pd.read_csv('raw/worldometer_data.csv')
worldometer_data_reduced = worldometer_data[['Country/Region', 'Continent']].rename(columns={'Country/Region': 'country_names', 'Continent': 'continent_name'})
worldometer_data_reduced.to_parquet(f'stage/worldometer_data_reduced/0001.snappy.parquet', compression='snappy')

## 5- Costrución del modelo DIM-FACT en la capa Analytics

![SNOWFALL](model.png)

### 5.1- Primera carga - 2020-01-22  
Definimos las opciones de conf para hudi

In [75]:
hudiOptions = {
    "hoodie.table.name": "my_hudi_table",
    # "hoodie.datasource.write.recordkey.field": "source_partner_id",
    "hoodie.datasource.write.partitionpath.field": "load_user",
    "hoodie.datasource.write.precombine.field": "update_time",
    "hoodie.parquet.compression.codec": "snappy",
    "hoodie.datasource.write.storage.type": "COPY_ON_WRITE",
    "hoodie.datasource.write.operation": "insert",
    # "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.ComplexKeyGenerator",
    # "hoodie.datasource.hive_sync.enable": "true",
    # "hoodie.datasource.hive_sync.table": "my_hudi_table",
    # "hoodie.datasource.hive_sync.partition_fields": "creation_date",
    # "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
    "hoodie.index.type": "GLOBAL_BLOOM",  # This is required if we want to ensure we upsert a record, even if the partition changes
    "hoodie.bloom.index.update.partition.path": "true",  # This is required to write the data into the new partition (defaults to false in 0.8.0, true in 0.9.0)
}

leemos nuestras fuentes de <code>stage</code>

In [22]:
covid_19_clean_complete = spark.read.option("basePath", "stage/covid_19_clean_complete/").parquet('stage/covid_19_clean_complete/date=2020-01-22/')
worldometer_data_reduced = spark.read.parquet('stage/worldometer_data_reduced/')

#### 5.1.1 DIM_DATE
Tabla que se crea una sola vez

In [57]:
date_df = spark.sql("SELECT sequence(to_date('2018-01-01'), to_date('2050-01-01'), interval 1 day) as date").withColumn("date", explode(col("date")))
dim_date = date_df.select('date', lpad(dayofmonth('date'), 2, '0').alias('day'), lpad(month('date'), 2, '0').alias('month'), year('date').alias('year'))
dim_date = addDefaultAuditColumns(dim_date)
dim_date.show(5)

+----------+---+-----+----+---------+--------------------+--------------------+
|      date|day|month|year|load_user|           load_time|         update_time|
+----------+---+-----+----+---------+--------------------+--------------------+
|2018-01-01| 01|   01|2018|    ruben|2023-01-10 22:34:...|2023-01-10 22:34:...|
|2018-01-02| 02|   01|2018|    ruben|2023-01-10 22:34:...|2023-01-10 22:34:...|
|2018-01-03| 03|   01|2018|    ruben|2023-01-10 22:34:...|2023-01-10 22:34:...|
|2018-01-04| 04|   01|2018|    ruben|2023-01-10 22:34:...|2023-01-10 22:34:...|
|2018-01-05| 05|   01|2018|    ruben|2023-01-10 22:34:...|2023-01-10 22:34:...|
+----------+---+-----+----+---------+--------------------+--------------------+
only showing top 5 rows



La guardamos en la capa <code>analytics</code>

In [58]:
dim_date.write.parquet(
            'analytics/dim_date/',
            mode='overwrite',
            compression='snappy'
        )

#### 5.1.2 DIM_CONTINENT

Igualmente cargamos una única vez la dimensión continent

In [59]:
dim_continent = worldometer_data_reduced.select(lower('continent_name').alias('continent_name'), col('continent_name').alias('description'))
dim_continent = addDefaultAuditColumns(dim_continent)
dim_continent.show(5)

+--------------+-------------+---------+--------------------+--------------------+
|continent_name|  description|load_user|           load_time|         update_time|
+--------------+-------------+---------+--------------------+--------------------+
| north america|North America|    ruben|2023-01-10 22:34:...|2023-01-10 22:34:...|
| south america|South America|    ruben|2023-01-10 22:34:...|2023-01-10 22:34:...|
|          asia|         Asia|    ruben|2023-01-10 22:34:...|2023-01-10 22:34:...|
|        europe|       Europe|    ruben|2023-01-10 22:34:...|2023-01-10 22:34:...|
|        africa|       Africa|    ruben|2023-01-10 22:34:...|2023-01-10 22:34:...|
+--------------+-------------+---------+--------------------+--------------------+
only showing top 5 rows



In [78]:
dim_continent.write.parquet(
            'analytics/dim_continent/',
            mode='overwrite',
            compression='snappy'
        )

#### 5.1.3 DIM_REGION

In [76]:
dim_region = covid_19_clean_complete.select(
    lower('who_region').alias('region_name'),
    col('who_region').alias('description')
).withColumn(
    'seq',
    expr('row_number() over(partition by region_name order by description desc)')
).filter('seq = 1').drop('seq')
dim_region = addDefaultAuditColumns(dim_region)
dim_region.show()

+--------------------+--------------------+---------+--------------------+--------------------+
|         region_name|         description|load_user|           load_time|         update_time|
+--------------------+--------------------+---------+--------------------+--------------------+
|     western pacific|     Western Pacific|    ruben|2023-01-10 23:07:...|2023-01-10 23:07:...|
|              europe|              Europe|    ruben|2023-01-10 23:07:...|2023-01-10 23:07:...|
|eastern mediterra...|Eastern Mediterra...|    ruben|2023-01-10 23:07:...|2023-01-10 23:07:...|
|     south-east asia|     South-East Asia|    ruben|2023-01-10 23:07:...|2023-01-10 23:07:...|
|              africa|              Africa|    ruben|2023-01-10 23:07:...|2023-01-10 23:07:...|
|            americas|            Americas|    ruben|2023-01-10 23:07:...|2023-01-10 23:07:...|
+--------------------+--------------------+---------+--------------------+--------------------+



In [77]:
hudiOptions['hoodie.datasource.write.recordkey.field'] = 'region_name'
dim_region.write.format("org.apache.hudi").options(**hudiOptions).mode("overwrite").save("/home/jovyan/work/analytics/dim_region")

#### 5.1.4 DIM_COUNTRY

In [79]:
dim_country = covid_19_clean_complete.select(
    lower('country_region').alias('country_name'),
    lower('who_region').alias('region_name'),
    col('country_region').alias('description'),
    col('lat').alias('latitude'),
    col('long').alias('longitude'),
).withColumn(
    'seq', 
    expr('row_number() over(partition by country_name, region_name order by latitude desc)')
).filter('seq = 1').drop('seq')
dim_country_complete = dim_country.alias('a').join(
    worldometer_data_reduced.alias('b'),
    col('a.country_name') == lower('b.country_names'),
    'left'
).drop('country_names').select(col('a.*'), lower('continent_name').alias('continent_name'))
dim_country_complete = addDefaultAuditColumns(dim_country_complete)
dim_country_complete.show(5)

+------------+---------------+-----------+------------------+------------------+--------------+---------+--------------------+--------------------+
|country_name|    region_name|description|          latitude|         longitude|continent_name|load_user|           load_time|         update_time|
+------------+---------------+-----------+------------------+------------------+--------------+---------+--------------------+--------------------+
|        laos|western pacific|       Laos|19.856270000000002|        102.495496|          asia|    ruben|2023-01-10 23:10:...|2023-01-10 23:10:...|
|      guyana|       americas|     Guyana| 4.860416000000002|-58.93018000000001| south america|    ruben|2023-01-10 23:10:...|2023-01-10 23:10:...|
|       japan|western pacific|      Japan|         36.204824|        138.252924|          asia|    ruben|2023-01-10 23:10:...|2023-01-10 23:10:...|
|    portugal|         europe|   Portugal|           39.3999|           -8.2245|        europe|    ruben|2023-01

In [80]:
hudiOptions['hoodie.datasource.write.recordkey.field'] = 'country_name,region_name'
hudiOptions['hoodie.datasource.write.keygenerator.class'] = 'org.apache.hudi.keygen.ComplexKeyGenerator'
dim_country_complete.write.format("org.apache.hudi").options(**hudiOptions).mode("overwrite").save("/home/jovyan/work/analytics/dim_country")

#### 5.1.5 DIM_STATE

In [81]:
dim_state = covid_19_clean_complete.select(
    lower('province_state').alias('state_name'),
    lower('country_region').alias('country_name'),
    lower('who_region').alias('region_name'),
    col('province_state').alias('description'),
    col('lat').alias('latitude'),
    col('long').alias('longitude'),
).withColumn(
    'seq', 
    expr('row_number() over(partition by state_name, country_name, region_name order by latitude desc)')
).filter('seq = 1').drop('seq').filter('state_name is not null')
dim_state_complete = dim_state.alias('a').join(
    worldometer_data_reduced.alias('b'),
    col('a.country_name') == lower('b.country_names'),
    'left'
).drop('country_names').select(col('a.*'), lower('continent_name').alias('continent_name'))
dim_state_complete = addDefaultAuditColumns(dim_state_complete)
dim_state_complete.show(5)

+----------------+------------+---------------+----------------+------------------+---------+--------------+---------+--------------------+--------------------+
|      state_name|country_name|    region_name|     description|          latitude|longitude|continent_name|load_user|           load_time|         update_time|
+----------------+------------+---------------+----------------+------------------+---------+--------------+---------+--------------------+--------------------+
|british columbia|      canada|       americas|British Columbia|           53.7267|-127.6476| north america|    ruben|2023-01-10 23:10:...|2023-01-10 23:10:...|
|           gansu|       china|western pacific|           Gansu|           35.7518| 104.2861|          null|    ruben|2023-01-10 23:10:...|2023-01-10 23:10:...|
|        shandong|       china|western pacific|        Shandong|           36.3427| 118.1498|          null|    ruben|2023-01-10 23:10:...|2023-01-10 23:10:...|
|    heilongjiang|       china|wes

In [82]:
hudiOptions['hoodie.datasource.write.recordkey.field'] = 'state_name,country_name,region_name'
hudiOptions['hoodie.datasource.write.keygenerator.class'] = 'org.apache.hudi.keygen.ComplexKeyGenerator'
dim_state_complete.write.format("org.apache.hudi").options(**hudiOptions).mode("overwrite").save("/home/jovyan/work/analytics/dim_state")

#### 5.1.6 FCT_REPORT

In [143]:
fct_report = covid_19_clean_complete.select(
    lower('province_state').alias('state_name'),
    lower('country_region').alias('country_name'),
    lower('who_region').alias('region_name'),
    col('date'),
    col('confirmed'),
    col('deaths'),
    col('recovered'),
    col('active'),
    lit(None).cast('integer').alias('new_cases'),
    lit(None).cast('integer').alias('new_deaths'),
    lit(None).cast('integer').alias('new_recoved'),
    (col('deaths')/col('confirmed')*100).cast('decimal(6,2)').alias('deaths_cases'),
    (col('recovered')/col('confirmed')*100).cast('decimal(6,2)').alias('recovered_cases'),
    (col('deaths')/col('recovered')*100).cast('decimal(6,2)').alias('deaths_recovered')
)
fct_report = addDefaultAuditColumns(fct_report)
fct_report.show(5)

+----------+--------------+---------------+----------+---------+------+---------+------+---------+----------+-----------+------------+---------------+----------------+---------+--------------------+--------------------+
|state_name|  country_name|    region_name|      date|confirmed|deaths|recovered|active|new_cases|new_deaths|new_recoved|deaths_cases|recovered_cases|deaths_recovered|load_user|           load_time|         update_time|
+----------+--------------+---------------+----------+---------+------+---------+------+---------+----------+-----------+------------+---------------+----------------+---------+--------------------+--------------------+
|      null|   netherlands|         europe|2020-01-22|        0|     0|        0|     0|     null|      null|       null|        null|           null|            null|    ruben|2023-01-11 06:10:...|2023-01-11 06:10:...|
|      null|        belize|       americas|2020-01-22|        0|     0|        0|     0|     null|      null|       null

In [144]:
fct_report.write.parquet(
            'analytics/fct_report/',
            mode='overwrite',
            compression='snappy',
            partitionBy='date'
        )

## 5.2- Posteriores cargas  
En las posteriores cargas se va necesitar identificar los deltas de datos para las tablas DIM (SCD) y las incrementales para la tabla FACT

La carga diaria se puede orquestar usando Apache Airflow o AWS Stepfunctions, pero ese desarrollo esta fuera del alcance del Challenge

In [121]:
date_today = '2020-01-23'
date = datetime.datetime.strptime(date_today, '%Y-%m-%d')
date_before = (date + datetime.timedelta(days=-1)).strftime("%Y-%m-%d")

In [88]:
hudiOptions = {
    "hoodie.table.name": "my_hudi_table",
    "hoodie.datasource.write.recordkey.field": "region_name",
    "hoodie.datasource.write.partitionpath.field": "load_user",
    "hoodie.datasource.write.precombine.field": "update_time",
    "hoodie.parquet.compression.codec": "snappy",
    "hoodie.datasource.write.storage.type": "COPY_ON_WRITE",
    "hoodie.datasource.write.operation": "upsert",
    # "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.ComplexKeyGenerator",
    # "hoodie.datasource.hive_sync.enable": "true",
    # "hoodie.datasource.hive_sync.table": "my_hudi_table",
    # "hoodie.datasource.hive_sync.partition_fields": "creation_date",
    # "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
    "hoodie.index.type": "GLOBAL_BLOOM",  # This is required if we want to ensure we upsert a record, even if the partition changes
    "hoodie.bloom.index.update.partition.path": "true",  # This is required to write the data into the new partition (defaults to false in 0.8.0, true in 0.9.0)
}

### 5.2.1 Leemos el nuevo BATCH

In [95]:
new_batch = spark.read.option("basePath", "stage/covid_19_clean_complete/").parquet(f'stage/covid_19_clean_complete/date={date_today}/')
worldometer_data_reduced = spark.read.parquet('stage/worldometer_data_reduced/')

#### DIM_REGION DELTA

In [63]:
dim_region_batch = new_batch.select(
    lower('who_region').alias('region_name'),
    col('who_region').alias('description')
).withColumn(
    'seq',
    expr('row_number() over(partition by region_name order by description desc)')
).filter('seq = 1').drop('seq')
dim_region_batch = addDefaultAuditColumns(dim_region_batch)
dim_region_batch.show()

+--------------------+--------------------+---------+--------------------+--------------------+
|         region_name|         description|load_user|           load_time|         update_time|
+--------------------+--------------------+---------+--------------------+--------------------+
|     western pacific|     Western Pacific|    ruben|2023-01-10 22:59:...|2023-01-10 22:59:...|
|              europe|              Europe|    ruben|2023-01-10 22:59:...|2023-01-10 22:59:...|
|eastern mediterra...|Eastern Mediterra...|    ruben|2023-01-10 22:59:...|2023-01-10 22:59:...|
|     south-east asia|     South-East Asia|    ruben|2023-01-10 22:59:...|2023-01-10 22:59:...|
|              africa|              Africa|    ruben|2023-01-10 22:59:...|2023-01-10 22:59:...|
|            americas|            Americas|    ruben|2023-01-10 22:59:...|2023-01-10 22:59:...|
+--------------------+--------------------+---------+--------------------+--------------------+



Leemos nuestra dimension para poder actualizar los registros

In [91]:
dim_region = (
    spark
    .read
    .format('org.apache.hudi')
    .load("/home/jovyan/work/analytics/dim_region/")
)

In [92]:
dim_region.show()

+-------------------+--------------------+--------------------+----------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---------+
|_hoodie_commit_time|_hoodie_commit_seqno|  _hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|         region_name|         description|           load_time|         update_time|load_user|
+-------------------+--------------------+--------------------+----------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---------+
|  20230110232430601|20230110232430601...|     western pacific|                 ruben|101cc647-bf96-435...|     western pacific|     Western Pacific|2023-01-10 23:09:...|2023-01-10 23:24:...|    ruben|
|  20230110232430601|20230110232430601...|              europe|                 ruben|101cc647-bf96-435...|              europe|              Europe|2023-01-10 23:09:...|2023-01-10 23:24:...| 

Calculamos el delta

In [89]:
dim_region_delta = dim_region_batch.alias('a').join(
    dim_region.alias('b'),
    col('a.region_name') == col('b.region_name'),
    'left'
).select(
    col('a.region_name'),
    col('a.description'),
    coalesce(col('b.load_time'), col('a.load_time')).alias('load_time'),
    expr('now()').alias('update_time'),
    col('a.load_user')
)
dim_region_delta.show()

+--------------------+--------------------+--------------------+--------------------+---------+
|         region_name|         description|           load_time|         update_time|load_user|
+--------------------+--------------------+--------------------+--------------------+---------+
|     western pacific|     Western Pacific|2023-01-10 23:09:...|2023-01-10 23:22:...|    ruben|
|              europe|              Europe|2023-01-10 23:09:...|2023-01-10 23:22:...|    ruben|
|eastern mediterra...|Eastern Mediterra...|2023-01-10 23:09:...|2023-01-10 23:22:...|    ruben|
|     south-east asia|     South-East Asia|2023-01-10 23:09:...|2023-01-10 23:22:...|    ruben|
|              africa|              Africa|2023-01-10 23:09:...|2023-01-10 23:22:...|    ruben|
|            americas|            Americas|2023-01-10 23:09:...|2023-01-10 23:22:...|    ruben|
+--------------------+--------------------+--------------------+--------------------+---------+



Nos apoyamos en hudi para hacer el **UPSERT** a nivel de registro

In [90]:
dim_region_delta.write.format("org.apache.hudi").options(**hudiOptions).mode("append").save("/home/jovyan/work/analytics/dim_region")

Podemos ver la metadata generada por Hudi

In [94]:
!ls -lah analytics/dim_region/ruben/

total 880K
drwxr-xr-x 2 jovyan users 4.0K Jan 10 23:24 .
drwxr-xr-x 4 jovyan users 4.0K Jan 10 23:09 ..
-rw-r--r-- 1 jovyan users 426K Jan 10 23:09 101cc647-bf96-4358-b5e5-aa2d062bb9a1-0_0-201-7627_20230110230913978.parquet
-rw-r--r-- 1 jovyan users 3.4K Jan 10 23:09 .101cc647-bf96-4358-b5e5-aa2d062bb9a1-0_0-201-7627_20230110230913978.parquet.crc
-rw-r--r-- 1 jovyan users 426K Jan 10 23:24 101cc647-bf96-4358-b5e5-aa2d062bb9a1-0_0-313-12528_20230110232430601.parquet
-rw-r--r-- 1 jovyan users 3.4K Jan 10 23:24 .101cc647-bf96-4358-b5e5-aa2d062bb9a1-0_0-313-12528_20230110232430601.parquet.crc
-rw-r--r-- 1 jovyan users   96 Jan 10 23:09 .hoodie_partition_metadata
-rw-r--r-- 1 jovyan users   12 Jan 10 23:09 ..hoodie_partition_metadata.crc


Procedemos igualmente con todas las dimensiones que requieran actualización

#### DIM_COUNTRY DELTA

In [96]:
dim_country_batch = new_batch.select(
    lower('country_region').alias('country_name'),
    lower('who_region').alias('region_name'),
    col('country_region').alias('description'),
    col('lat').alias('latitude'),
    col('long').alias('longitude'),
).withColumn(
    'seq', 
    expr('row_number() over(partition by country_name, region_name order by latitude desc)')
).filter('seq = 1').drop('seq')
dim_country_batch = dim_country_batch.alias('a').join(
    worldometer_data_reduced.alias('b'),
    col('a.country_name') == lower('b.country_names'),
    'left'
).drop('country_names').select(col('a.*'), lower('continent_name').alias('continent_name'))
dim_country_batch = addDefaultAuditColumns(dim_country_batch)
dim_country_batch.show(5)

+------------+---------------+-----------+------------------+------------------+--------------+---------+--------------------+--------------------+
|country_name|    region_name|description|          latitude|         longitude|continent_name|load_user|           load_time|         update_time|
+------------+---------------+-----------+------------------+------------------+--------------+---------+--------------------+--------------------+
|        laos|western pacific|       Laos|19.856270000000002|        102.495496|          asia|    ruben|2023-01-10 23:33:...|2023-01-10 23:33:...|
|      guyana|       americas|     Guyana| 4.860416000000002|-58.93018000000001| south america|    ruben|2023-01-10 23:33:...|2023-01-10 23:33:...|
|       japan|western pacific|      Japan|         36.204824|        138.252924|          asia|    ruben|2023-01-10 23:33:...|2023-01-10 23:33:...|
|    portugal|         europe|   Portugal|           39.3999|           -8.2245|        europe|    ruben|2023-01

In [97]:
dim_country = (
    spark
    .read
    .format('org.apache.hudi')
    .load("/home/jovyan/work/analytics/dim_country/")
)
dim_country.show()

+-------------------+--------------------+--------------------+----------------------+--------------------+-------------------+--------------------+-------------------+------------------+------------------+-----------------+--------------------+--------------------+---------+
|_hoodie_commit_time|_hoodie_commit_seqno|  _hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|       country_name|         region_name|        description|          latitude|         longitude|   continent_name|           load_time|         update_time|load_user|
+-------------------+--------------------+--------------------+----------------------+--------------------+-------------------+--------------------+-------------------+------------------+------------------+-----------------+--------------------+--------------------+---------+
|  20230110231018825|20230110231018825...|country_name:laos...|                 ruben|0c7ae843-5eff-47a...|               laos|     western pacific|               Laos|1

In [102]:
dim_country_delta = dim_country_batch.alias('a').join(
    dim_country.alias('b'),
    (col('a.region_name') == col('b.region_name')) &  (col('a.country_name') == col('b.country_name')),
    'left'
).select(
    col('a.country_name'),
    col('a.region_name'),
    col('a.description'),
    col('a.latitude'),
    col('a.longitude'),
    col('a.continent_name'),
    coalesce(col('b.load_time'), col('a.load_time')).alias('load_time'),
    expr('now()').alias('update_time'),
    col('a.load_user')
)
dim_country_delta.show()

+-------------------+--------------------+-------------------+------------------+------------------+-----------------+--------------------+--------------------+---------+
|       country_name|         region_name|        description|          latitude|         longitude|   continent_name|           load_time|         update_time|load_user|
+-------------------+--------------------+-------------------+------------------+------------------+-----------------+--------------------+--------------------+---------+
|               laos|     western pacific|               Laos|19.856270000000002|        102.495496|             asia|2023-01-10 23:10:...|2023-01-10 23:40:...|    ruben|
|             guyana|            americas|             Guyana| 4.860416000000002|-58.93018000000001|    south america|2023-01-10 23:10:...|2023-01-10 23:40:...|    ruben|
|              japan|     western pacific|              Japan|         36.204824|        138.252924|             asia|2023-01-10 23:10:...|2023-0

In [103]:
hudiOptions['hoodie.datasource.write.recordkey.field'] = 'country_name,region_name'
hudiOptions['hoodie.datasource.write.keygenerator.class'] = 'org.apache.hudi.keygen.ComplexKeyGenerator'
dim_country_delta.write.format("org.apache.hudi").options(**hudiOptions).mode("append").save("/home/jovyan/work/analytics/dim_country")

#### DIM_STATE DELTA

In [104]:
dim_state_batch = new_batch.select(
    lower('province_state').alias('state_name'),
    lower('country_region').alias('country_name'),
    lower('who_region').alias('region_name'),
    col('province_state').alias('description'),
    col('lat').alias('latitude'),
    col('long').alias('longitude'),
).withColumn(
    'seq', 
    expr('row_number() over(partition by state_name, country_name, region_name order by latitude desc)')
).filter('seq = 1').drop('seq').filter('state_name is not null')
dim_state_batch = dim_state_batch.alias('a').join(
    worldometer_data_reduced.alias('b'),
    col('a.country_name') == lower('b.country_names'),
    'left'
).drop('country_names').select(col('a.*'), lower('continent_name').alias('continent_name'))
dim_state_batch = addDefaultAuditColumns(dim_state_batch)
dim_state_batch.show(5)

+----------------+------------+---------------+----------------+------------------+---------+--------------+---------+--------------------+--------------------+
|      state_name|country_name|    region_name|     description|          latitude|longitude|continent_name|load_user|           load_time|         update_time|
+----------------+------------+---------------+----------------+------------------+---------+--------------+---------+--------------------+--------------------+
|british columbia|      canada|       americas|British Columbia|           53.7267|-127.6476| north america|    ruben|2023-01-10 23:50:...|2023-01-10 23:50:...|
|           gansu|       china|western pacific|           Gansu|           35.7518| 104.2861|          null|    ruben|2023-01-10 23:50:...|2023-01-10 23:50:...|
|        shandong|       china|western pacific|        Shandong|           36.3427| 118.1498|          null|    ruben|2023-01-10 23:50:...|2023-01-10 23:50:...|
|    heilongjiang|       china|wes

In [105]:
dim_state = (
    spark
    .read
    .format('org.apache.hudi')
    .load("/home/jovyan/work/analytics/dim_state/")
)
dim_state.show()

+-------------------+--------------------+--------------------+----------------------+--------------------+------------------+--------------+---------------+------------------+------------------+---------+-----------------+--------------------+--------------------+---------+
|_hoodie_commit_time|_hoodie_commit_seqno|  _hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|        state_name|  country_name|    region_name|       description|          latitude|longitude|   continent_name|           load_time|         update_time|load_user|
+-------------------+--------------------+--------------------+----------------------+--------------------+------------------+--------------+---------------+------------------+------------------+---------+-----------------+--------------------+--------------------+---------+
|  20230110231044571|20230110231044571...|state_name:britis...|                 ruben|8107e35d-a067-48f...|  british columbia|        canada|       americas|  British Colum

In [107]:
dim_state_delta = dim_state_batch.alias('a').join(
    dim_state.alias('b'),
    (col('a.region_name') == col('b.region_name')) & (col('a.country_name') == col('b.country_name')) & (col('a.state_name') == col('b.state_name')),
    'left'
).select(
    col('a.state_name'),
    col('a.country_name'),
    col('a.region_name'),
    col('a.description'),
    col('a.latitude'),
    col('a.longitude'),
    col('a.continent_name'),
    coalesce(col('b.load_time'), col('a.load_time')).alias('load_time'),
    expr('now()').alias('update_time'),
    col('a.load_user')
)
dim_state_delta.show()

+------------------+--------------+---------------+------------------+------------------+---------+-----------------+--------------------+--------------------+---------+
|        state_name|  country_name|    region_name|       description|          latitude|longitude|   continent_name|           load_time|         update_time|load_user|
+------------------+--------------+---------------+------------------+------------------+---------+-----------------+--------------------+--------------------+---------+
|  british columbia|        canada|       americas|  British Columbia|           53.7267|-127.6476|    north america|2023-01-10 23:10:...|2023-01-10 23:55:...|    ruben|
|             gansu|         china|western pacific|             Gansu|           35.7518| 104.2861|             null|2023-01-10 23:10:...|2023-01-10 23:55:...|    ruben|
|          shandong|         china|western pacific|          Shandong|           36.3427| 118.1498|             null|2023-01-10 23:10:...|2023-01-10 2

In [142]:
hudiOptions['hoodie.datasource.write.recordkey.field'] = 'state_name,country_name,region_name'
hudiOptions['hoodie.datasource.write.keygenerator.class'] = 'org.apache.hudi.keygen.ComplexKeyGenerator'
dim_state_delta.write.format("org.apache.hudi").options(**hudiOptions).mode("append").save("/home/jovyan/work/analytics/dim_state")

#### FCT_REPORT INCREMENTAL

In [122]:
fct_report = spark.read.parquet(f'analytics/fct_report/date={date_before}')

In [126]:
new_batch.show()

+------------------+--------------+------------------+-------------------+---------+------+---------+------+--------------------+----------+
|    province_state|country_region|               lat|               long|confirmed|deaths|recovered|active|          who_region|      date|
+------------------+--------------+------------------+-------------------+---------+------+---------+------+--------------------+----------+
|              null|      Dominica|            15.415|            -61.371|        0|     0|        0|     0|            Americas|2020-01-23|
|          Shanghai|         China|31.201999999999998|           121.4491|       16|     0|        0|    16|     Western Pacific|2020-01-23|
|   New South Wales|     Australia|          -33.8688|           151.2093|        0|     0|        0|     0|     Western Pacific|2020-01-23|
|     French Guiana|        France|            3.9339|           -53.1258|        0|     0|        0|     0|              Europe|2020-01-23|
|            

In [145]:
incremental_fact = new_batch.alias('a').join(
    fct_report.alias('b'),
    (lower(col('a.province_state')) == col('b.state_name')) & (lower(col('a.country_region')) == col('b.country_name')) & (lower(col('a.who_region')) == col('b.region_name')),
    'left'
).select(
    lower(col('a.province_state')).alias('state_name'),
    lower(col('a.country_region')).alias('country_name'),
    lower(col('a.who_region')).alias('region_name'),
    col('a.confirmed'),
    col('a.deaths'),
    col('a.recovered'),
    col('a.active'),
    (col('a.confirmed') - coalesce(col('b.confirmed'), lit(0))).alias('new_cases'),
    (col('a.deaths') - coalesce(col('b.deaths'), lit(0))).alias('new_deaths'),
    (col('a.recovered') - coalesce(col('b.recovered'), lit(0))).alias('new_recoved'),
    (col('a.deaths')/col('a.confirmed')*100).cast('decimal(6,2)').alias('deaths_cases'),
    (col('a.recovered')/col('a.confirmed')*100).cast('decimal(6,2)').alias('recovered_cases'),
    (col('a.deaths')/col('a.recovered')*100).cast('decimal(6,2)').alias('deaths_recovered'),
    col('a.date')
)
incremental_fact = addDefaultAuditColumns(incremental_fact)
incremental_fact.show()

+------------------+--------------+--------------------+---------+------+---------+------+---------+----------+-----------+------------+---------------+----------------+----------+---------+--------------------+--------------------+
|        state_name|  country_name|         region_name|confirmed|deaths|recovered|active|new_cases|new_deaths|new_recoved|deaths_cases|recovered_cases|deaths_recovered|      date|load_user|           load_time|         update_time|
+------------------+--------------+--------------------+---------+------+---------+------+---------+----------+-----------+------------+---------------+----------------+----------+---------+--------------------+--------------------+
|              null|      dominica|            americas|        0|     0|        0|     0|        0|         0|          0|        null|           null|            null|2020-01-23|    ruben|2023-01-11 06:10:...|2023-01-11 06:10:...|
|          shanghai|         china|     western pacific|       16|  

In [146]:
incremental_fact.write.parquet(
            'analytics/fct_report/',
            mode='overwrite',
            compression='snappy',
            partitionBy='date'
        )