In [77]:
from pyspark.sql.types import *
from pyspark.sql.functions import concat, col, lit, sum, avg, max, min, count, udf
import pyspark.sql.functions as F

StatementMeta(sparktaxidata, 36, 3, Finished, Available)

In [78]:
from io import StringIO
import pandas as pd
import requests

headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.76 Safari/537.36'}

StatementMeta(sparktaxidata, 36, 4, Finished, Available)

# ACCEDIENDO A LA DATA

In [79]:
# LEER FALLECIDOS POR COVID
url = 'https://files.minsa.gob.pe/s/t9AFqRbXw3F55Ho/download'
s = requests.get(url, headers= headers).text
data = StringIO(s)
pd_df = pd.read_csv(data, sep=";", encoding='utf_8', dtype = 'str')

pd_df.dropna(subset=['DEPARTAMENTO','PROVINCIA', 'UBIGEO', 'DISTRITO', "EDAD_DECLARADA", "FECHA_FALLECIMIENTO"], inplace=True)
pd_df = pd_df.rename(str.lower, axis='columns')

df_fall = spark.createDataFrame(pd_df)
df_fall.show(5)

StatementMeta(sparktaxidata, 36, 5, Finished, Available)



+-----------+-------------------+--------------+---------+--------------------+------------+---------+--------------------+------+----------+
|fecha_corte|fecha_fallecimiento|edad_declarada|     sexo|   clasificacion_def|departamento|provincia|            distrito|ubigeo|id_persona|
+-----------+-------------------+--------------+---------+--------------------+------------+---------+--------------------+------+----------+
|   20231114|           20210611|            21|MASCULINO|    Criterio SINADEF|  LAMBAYEQUE| CHICLAYO|            CHICLAYO|140101|  24833991|
|   20231114|           20210317|            45|MASCULINO|Criterio serolÃ³gico|       PIURA|  SULLANA|             SULLANA|200601|  24761117|
|   20231114|           20210602|            62| FEMENINO|Criterio virolÃ³gico|         ICA|    PISCO|        SAN CLEMENTE|110507|  24767070|
|   20231114|           20210703|            75|MASCULINO|Criterio virolÃ³gico|    AREQUIPA| AREQUIPA|          MIRAFLORES|040110|  24751741|
|   20

In [80]:
# LEER POSITIVOS POR COVID
url = 'https://files.minsa.gob.pe/s/eRqxR35ZCxrzNgr/download'
s = requests.get(url, headers= headers).text
data = StringIO(s)
pd_df = pd.read_csv(data, sep=";", encoding='utf_8', dtype = 'str')

pd_df.dropna(subset=['DEPARTAMENTO','PROVINCIA', 'UBIGEO', 'DISTRITO', "EDAD","FECHA_RESULTADO"], inplace=True)
pd_df = pd_df.rename(str.lower, axis='columns')

df_posi = spark.createDataFrame(pd_df)
df_posi.show(5)

StatementMeta(sparktaxidata, 36, 6, Finished, Available)



+-----------+------------+---------+--------------------+--------+----+---------+---------------+------+----------+
|fecha_corte|departamento|provincia|            distrito|metododx|edad|     sexo|fecha_resultado|ubigeo|id_persona|
+-----------+------------+---------+--------------------+--------+----+---------+---------------+------+----------+
|   20211213|        LIMA|     LIMA|SAN MARTIN DE PORRES|      PR|  25|MASCULINO|       20201217|150135|  24662153|
|   20211213|         ICA|    PISCO|               PISCO|      PR|  20| FEMENINO|       20200822|110501|  24662175|
|   20211213|     HUANUCO|  HUANUCO|             HUANUCO|      PR|  22| FEMENINO|       20200729|100101|  24662197|
|   20211213|      ANCASH|    SANTA|               SANTA|      AG|  18| FEMENINO|       20210630|021808|  24662204|
|   20211213|      ANCASH|    SANTA|      NUEVO CHIMBOTE|      AG|  17|MASCULINO|       20210404|021809|  24662207|
+-----------+------------+---------+--------------------+--------+----+-

In [81]:
# LEER COVID MUNDIAL
url = 'https://covid.ourworldindata.org/data/owid-covid-data.csv'
s = requests.get(url, headers= headers).text
data = StringIO(s)
pd_df = pd.read_csv(data, sep=",", encoding='utf_8')
pd_df = pd_df.rename(str.lower, axis='columns')

df_mund = spark.createDataFrame(pd_df)
display(df_mund.limit(5))

StatementMeta(sparktaxidata, 36, 7, Finished, Available)



SynapseWidget(Synapse.DataFrame, 026b142a-b4d0-499c-bd5e-ec03b9db4da6)

In [82]:
# LEER DEPARTAMENTOS
pd_df = pd.read_excel('abfss://covid@mksynapsedatalake.dfs.core.windows.net/raw_data/departamentos.xlsx')
pd_df = pd_df.rename(str.lower, axis='columns')

df_dep = spark.createDataFrame(pd_df)
df_dep.show(5)

StatementMeta(sparktaxidata, 36, 8, Finished, Available)



+----+--------+------------+----------------+
|pais|paiscode|departamento|departamentocode|
+----+--------+------------+----------------+
|PERU|      PE|    AMAZONAS|          PE-AMA|
|PERU|      PE|      ANCASH|          PE-ANC|
|PERU|      PE|    APURIMAC|          PE-APU|
|PERU|      PE|    AREQUIPA|          PE-ARE|
|PERU|      PE|    AYACUCHO|          PE-AYA|
+----+--------+------------+----------------+
only showing top 5 rows



image.png

# Transformacion

In [83]:
# -----------------------------------------------------------------
# ETL DE FALLECIDOS POR COVID
# -----------------------------------------------------------------
df_fal = df_fall

# CAMBIANDO EL CAMPO EDAD A TIPO INTEGER, DARLE FORMATO AAAA-MM-01
df_fal = df_fal.withColumn("edad_declarada", col("edad_declarada").cast("integer"))
df_fal = df_fal.withColumn("fecha_fallecimiento", concat(col("fecha_fallecimiento").substr(1,4), lit("-"), col("fecha_fallecimiento").substr(5,2), lit("-01")))

# HALLANDO 'etapa_de_vida'
df_fal = df_fal.withColumn("etapa_de_vida", udf(lambda x: 'NIÑO' if x<=9 else ('ADOLESCENTE' if x<=19 else ('JOVEN' if x<=29 else ('ADULTO' if x<=59 else 'MAYOR'))))("edad_declarada"))

# JOIN ENTRE DATAFRAME FALLECIDOS Y DEPARTAMENTOS
# Y CREANDO DATAFRAME FALLECIDOS POR SEXO Y ETAPA DE VIDA Y POR DIA
df_fallecido = df_fal.join(df_dep, df_fal.departamento == df_dep.departamento, "left") \
    .select(df_dep.departamentocode, df_fal.fecha_fallecimiento.alias("fecha"), df_fal.sexo, df_fal.etapa_de_vida) \
    .groupBy(df_dep.departamentocode, "fecha", df_fal.sexo, df_fal.etapa_de_vida,) \
    .agg(count(df_dep.departamentocode).alias("cantidad")) \
    .orderBy(df_dep.departamentocode, "fecha", df_fal.sexo,  ascending = True)
    
df_fallecido.show(5)

StatementMeta(sparktaxidata, 36, 9, Finished, Available)

+----------------+----------+---------+-------------+--------+
|departamentocode|     fecha|     sexo|etapa_de_vida|cantidad|
+----------------+----------+---------+-------------+--------+
|          PE-AMA|2020-04-01| FEMENINO|        MAYOR|       1|
|          PE-AMA|2020-04-01|MASCULINO|  ADOLESCENTE|       1|
|          PE-AMA|2020-05-01| FEMENINO|        MAYOR|       2|
|          PE-AMA|2020-05-01| FEMENINO|       ADULTO|       5|
|          PE-AMA|2020-05-01| FEMENINO|  ADOLESCENTE|       2|
+----------------+----------+---------+-------------+--------+
only showing top 5 rows



In [84]:
# -----------------------------------------------------------------
# ETL DE POSITIVOS POR COVID
# -----------------------------------------------------------------
df_pos = df_posi

# CAMBIANDO EL CAMPO EDAD A TIPO INTEGER, DARLE FORMATO AAAA-MM-01
df_pos = df_pos.withColumn("edad", col("edad").cast("integer"))
df_pos = df_pos.withColumn("fecha_resultado", concat(col("fecha_resultado").substr(1,4), lit("-"), col("fecha_resultado").substr(5,2), lit("-01")))

# HALLANDO 'etapa_de_vida'
df_pos = df_pos.withColumn("etapa_de_vida", udf(lambda x: 'NIÑO' if x<=9 else ('ADOLESCENTE' if x<=19 else ('JOVEN' if x<=29 else ('ADULTO' if x<=59 else 'MAYOR'))))("edad"))

# JOIN ENTRE DATAFRAME FALLECIDOS Y DEPARTAMENTOS
# Y CREANDO DATAFRAME FALLECIDOS POR SEXO Y ETAPA DE VIDA Y POR DIA
df_positivo = df_pos.join(df_dep, df_pos.departamento == df_dep.departamento, "left") \
    .select(df_dep.departamentocode, df_pos.fecha_resultado.alias("fecha"), df_pos.sexo, df_pos.etapa_de_vida) \
    .groupBy(df_dep.departamentocode, "fecha", df_pos.sexo, df_pos.etapa_de_vida,) \
    .agg(count(df_dep.departamentocode).alias("cantidad")) \
    .orderBy(df_dep.departamentocode, "fecha", df_pos.sexo,  ascending = True)
    
df_positivo.show(5)

StatementMeta(sparktaxidata, 36, 10, Finished, Available)

+----------------+----------+--------+-------------+--------+
|departamentocode|     fecha|    sexo|etapa_de_vida|cantidad|
+----------------+----------+--------+-------------+--------+
|          PE-AMA|2020-04-01|FEMENINO|        JOVEN|      14|
|          PE-AMA|2020-04-01|FEMENINO|        MAYOR|       9|
|          PE-AMA|2020-04-01|FEMENINO|         NIÑO|       6|
|          PE-AMA|2020-04-01|FEMENINO|  ADOLESCENTE|       3|
|          PE-AMA|2020-04-01|FEMENINO|       ADULTO|      50|
+----------------+----------+--------+-------------+--------+
only showing top 5 rows



In [85]:
# -----------------------------------------------------------------
# JOIN DE POSITIVOS Y FALLECIDOS POR COVID
# -----------------------------------------------------------------

df_covid = df_positivo.join(df_fallecido, ["departamentocode","fecha", "sexo", "etapa_de_vida"], "left") \
    .select(df_positivo.departamentocode, df_positivo.fecha, df_positivo.sexo, df_positivo.etapa_de_vida, df_positivo.cantidad, df_fallecido.cantidad) \
    .groupBy(df_positivo.departamentocode, df_positivo.fecha, df_positivo.sexo, df_positivo.etapa_de_vida) \
    .agg(sum(df_positivo.cantidad).alias("positivo"), sum(df_fallecido.cantidad).alias("fallecido") ) \
    .orderBy(df_positivo.departamentocode, df_positivo.fecha, df_positivo.sexo,  ascending = True)

df_covid = df_covid.fillna(value=0,subset=["fallecido"])
df_covid.show(5)

StatementMeta(sparktaxidata, 36, 11, Finished, Available)

+----------------+----------+--------+-------------+--------+---------+
|departamentocode|     fecha|    sexo|etapa_de_vida|positivo|fallecido|
+----------------+----------+--------+-------------+--------+---------+
|          PE-AMA|2020-04-01|FEMENINO|        JOVEN|      14|        0|
|          PE-AMA|2020-04-01|FEMENINO|        MAYOR|       9|        1|
|          PE-AMA|2020-04-01|FEMENINO|         NIÑO|       6|        0|
|          PE-AMA|2020-04-01|FEMENINO|  ADOLESCENTE|       3|        0|
|          PE-AMA|2020-04-01|FEMENINO|       ADULTO|      50|        0|
+----------------+----------+--------+-------------+--------+---------+
only showing top 5 rows



In [99]:
# -----------------------------------------------------------------
# ETL MUNDIAL COVID
# -----------------------------------------------------------------
df_mun = df_mund

# CAMBIANDO FECHA DARLE FORMATO AAAA-MM-01
df_mun = df_mun.withColumn("date", concat(col("date").substr(1,7), lit("-01")))

# CAMBIANDO TIPO DATA ENTERO
df_mun = df_mun.withColumn("new_cases", col("new_cases").cast("integer"))
df_mun = df_mun.withColumn("new_deaths", col("new_deaths").cast("integer"))

# RENOMBRANDO CAMPOS PARA FACILIDAD DE LECTURA
df_mun = df_mun.withColumnRenamed("iso_code","paisCode") \
    .withColumnRenamed("location","pais") \
    .withColumnRenamed("continent","continente") \
    .withColumnRenamed("date","fecha")

# AGRUPANDO PARA CREAR TABLE HECHOS COVID MUNDIAL
df_covid_mundial = df_mun.groupBy(df_mun.paisCode, df_mun.fecha) \
    .agg(sum(df_mun.new_cases).alias("positivo"), sum(df_mun.new_deaths).alias("fallecido") ) \
    .orderBy(df_mun.paisCode, df_mun.fecha, ascending = True)

df_covid_mundial.show(5)

# CREANDO DIMENSION PAIS
df_pais = df_mun.groupBy(df_mun.paisCode, df_mun.pais, df_mun.continente) \
    .agg(count(df_mun.paisCode).alias("cantidad")) \
    .orderBy(df_mun.paisCode, ascending = True)
df_pais = df_pais.select(df_pais.paisCode, df_pais.pais, df_pais.continente)

StatementMeta(sparktaxidata, 36, 25, Finished, Available)

+--------+----------+--------+---------+
|paisCode|     fecha|positivo|fallecido|
+--------+----------+--------+---------+
|     ABW|2020-01-01|       0|        0|
|     ABW|2020-02-01|       0|        0|
|     ABW|2020-03-01|      50|        0|
|     ABW|2020-04-01|      50|        2|
|     ABW|2020-05-01|       1|        1|
+--------+----------+--------+---------+
only showing top 5 rows



In [87]:
# -----------------------------------------------------------------
# ETL CREANDO DIMENSION CALENDARIO
# -----------------------------------------------------------------

# HALLANDO FECHA MAXIMA
fec_max1 = df_covid.select(max(df_covid.fecha)).collect()[0]['max(fecha)']
fec_max2 = df_covid_mundial.select(max(df_covid_mundial.fecha)).collect()[0]['max(fecha)']
fec_max =  fec_max1 if fec_max1 > fec_max2 else fec_max2
fec_max = pd.to_datetime(fec_max).date()

# HALLANDO FECHA MINIMA
fec_min1 = df_covid.select(min(df_covid.fecha)).collect()[0]['min(fecha)']
fec_min2 = df_covid_mundial.select(min(df_covid_mundial.fecha)).collect()[0]['min(fecha)']

fec_min =  fec_min1 if fec_min1 > fec_min2 else fec_min2
fec_min = pd.to_datetime(fec_min).date()

mes_dict = {
    i + 1: mes
    for i, mes in enumerate(['Enero', 'Febrero', 'Marzo', 'Abril', 'Mayo', 'Junio', 'Julio', 'Agosto', 'Setiembre', 'Octubre', 'Noviembre', 'Diciembre'])
}

df_calendario = pd.DataFrame({'fecha': pd.date_range(fec_min, fec_max)})
df_calendario['Año'] = df_calendario.fecha.dt.year
df_calendario['Trimestre'] = df_calendario.fecha.dt.quarter.map(lambda x :  'Trim. ' + str(x))
df_calendario['Mes'] = df_calendario.fecha.dt.month.map(lambda x: mes_dict[x])
df_calendario['Dia'] = df_calendario.fecha.dt.day
df_calendario['AñoMes'] = df_calendario.fecha.astype('string').str[:4] + df_calendario.fecha.astype('string').str[5:7]

df_calendario = spark.createDataFrame(df_calendario)
df_calendario.show(5)

StatementMeta(sparktaxidata, 36, 13, Finished, Available)



+-------------------+----+---------+-----+---+------+
|              fecha| Año|Trimestre|  Mes|Dia|AñoMes|
+-------------------+----+---------+-----+---+------+
|2020-03-01 00:00:00|2020|  Trim. 1|Marzo|  1|202003|
|2020-03-02 00:00:00|2020|  Trim. 1|Marzo|  2|202003|
|2020-03-03 00:00:00|2020|  Trim. 1|Marzo|  3|202003|
|2020-03-04 00:00:00|2020|  Trim. 1|Marzo|  4|202003|
|2020-03-05 00:00:00|2020|  Trim. 1|Marzo|  5|202003|
+-------------------+----+---------+-----+---+------+
only showing top 5 rows



In [88]:
# -----------------------------------------------------------------
# ETL CREANDO DIMENSION DEPARTAMENTO
# -----------------------------------------------------------------

# EN POWER BI, EN GRAFICO TIPO MAPAS, PARA QUE IDENTIFICA UN DEPARTAMENTO DE UN PAIS ES 
# NECESARIO CREAR UN CAMPO CON FORMATO >> DEPARTAMENTO (DEPARTAMENTO), (PAIS)
df_dep = df_dep.withColumn("departamentoPais", concat(lit("DEPARTAMENTO "), col("departamento"), lit(", PERU")) )

StatementMeta(sparktaxidata, 36, 14, Finished, Available)

# Load CSV

In [101]:
df_covid.repartition(1).write.mode("overwrite").option("header", "true").csv("abfss://covid@mksynapsedatalake.dfs.core.windows.net/transformed_data/covid")
df_covid_mundial.repartition(1).write.mode("overwrite").option("header", "true").csv("abfss://covid@mksynapsedatalake.dfs.core.windows.net/transformed_data/covid_mundial")
df_pais.repartition(1).write.mode("overwrite").option("header", "true").csv("abfss://covid@mksynapsedatalake.dfs.core.windows.net/transformed_data/dim_pais")
df_dep.repartition(1).write.mode("overwrite").option("header", "true").csv("abfss://covid@mksynapsedatalake.dfs.core.windows.net/transformed_data/dim_departamento")
df_calendario.repartition(1).write.mode("overwrite").option("header", "true").csv("abfss://covid@mksynapsedatalake.dfs.core.windows.net/transformed_data/dim_calendario")

StatementMeta(sparktaxidata, 36, 27, Finished, Available)

# Load Azure SQL DB

In [90]:
#Detalles del connection string
logicalServername = "mkdatafactory.database.windows.net"
databaseName = "covidsql"
userName = "mkadmin@mkdatafactory"
password = "" # Please specify password here
jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2}".format(logicalServername, 1433, databaseName)
connectionProperties = {
  "user" : userName,
  "password" : password,
  "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

StatementMeta(sparktaxidata, 36, 16, Finished, Available)

In [102]:
#Creando schema, usado para crear el DataFrame
dim_departamento_schema = StructType([
    StructField("pais", StringType()),
    StructField("paiscode", StringType()),
    StructField("departamento", StringType()),
    StructField("departamentocode", StringType()),
    StructField("departamentoPais", StringType())
])

dim_pais_schema = StructType([
    StructField("paisCode", StringType()),
    StructField("pais", StringType()),
    StructField("continente", StringType())
])

dim_calendario_schema = StructType([
    StructField("fecha", DateType()),
    StructField("Año", StringType()),
    StructField("Trimestre", StringType()),
    StructField("Mes", StringType()),
    StructField("Dia", StringType()),
    StructField("AñoMes", StringType())
])

covid_schema = StructType([
    StructField("departamentocode", StringType()),
    StructField("fecha", DateType()),
    StructField("sexo", StringType()),
    StructField("etapa_de_vida", StringType()),
    StructField("positivo", IntegerType()),
    StructField("fallecido", IntegerType())
])

covid_mundial_schema = StructType([
    StructField("paisCode", StringType()),
    StructField("fecha", DateType()),
    StructField("positivo", IntegerType()),
    StructField("fallecido", IntegerType())
])


# Load csv files en un DataFrame. Este Dataframe sera usado para escribir en table Azure SQL DB
dim_departamento= spark.read.format("csv").option("header",True).schema(dim_departamento_schema).load("abfss://covid@mksynapsedatalake.dfs.core.windows.net/transformed_data/dim_departamento/")
dim_pais= spark.read.format("csv").option("header",True).schema(dim_pais_schema).load("abfss://covid@mksynapsedatalake.dfs.core.windows.net/transformed_data/dim_pais/")
dim_calendario= spark.read.format("csv").option("header",True).schema(dim_calendario_schema).load("abfss://covid@mksynapsedatalake.dfs.core.windows.net/transformed_data/dim_calendario/")
covid= spark.read.format("csv").option("header",True).schema(covid_schema).load("abfss://covid@mksynapsedatalake.dfs.core.windows.net/transformed_data/covid/")
covid_mundial= spark.read.format("csv").option("header",True).schema(covid_mundial_schema).load("abfss://covid@mksynapsedatalake.dfs.core.windows.net/transformed_data/covid_mundial/")


StatementMeta(sparktaxidata, 36, 28, Finished, Available)

In [93]:

# Escribir el Dataframe en table Azure SQL DB
dim_departamento.write.jdbc(jdbcUrl,  
                   mode ="overwrite", # overwrite append
                   table="dim_departamento", 
                   properties=connectionProperties)

dim_pais.write.jdbc(jdbcUrl,  
                   mode ="overwrite", # overwrite append
                   table="dim_pais", 
                   properties=connectionProperties)

dim_calendario.write.jdbc(jdbcUrl,  
                   mode ="overwrite", # overwrite append
                   table="dim_calendario", 
                   properties=connectionProperties)

covid.write.jdbc(jdbcUrl,  
                   mode ="overwrite", # overwrite append
                   table="covid", 
                   properties=connectionProperties)

covid_mundial.write.jdbc(jdbcUrl,  
                   mode ="overwrite", # overwrite append
                   table="covid_mundial", 
                   properties=connectionProperties)

StatementMeta(sparktaxidata, 36, 19, Finished, Available)