In [1]:
import findspark
import urllib
import zipfile
findspark.init()

In [2]:
findspark.add_jars('/app/postgresql-42.1.4.jar')

In [3]:
from pyspark.sql.functions import *
import sys
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
from pyspark.sql import functions as F
from pyspark.sql.window import Window

In [4]:
from pyspark.sql import SparkSession
spark = (
    SparkSession.builder
    .appName("pyspark-etl")
    .config("spark.driver.memory", "512m")
    .config("spark.driver.cores", "1")
    .config("spark.executor.memory", "512m")
    .config("spark.executor.cores", "1")
    .config("spark.sql.shuffle.partitions", "2")
    .getOrCreate()
)

In [7]:
files = ['datos_nomivac_covid19.zip']

In [8]:
def download_file(file):
  with urllib.request.urlopen('https://sisa.msal.gov.ar/datos/descargas/covid-19/files/{f}'.format(f=file)) as response:
    gzipcontent = response.read()
  with open('/tmp/{f}'.format(f=file), 'wb') as f:
    f.write(gzipcontent)

In [9]:
for f in files:
  download_file(f)    

In [10]:
with zipfile.ZipFile('/tmp/datos_nomivac_covid19.zip', "r") as zip_ref:
    zip_ref.extractall('/dataset/')

In [11]:
df = spark.read \
    .option("header", True) \
    .option("inferSchema", True) \
    .csv("/dataset/datos_nomivac_covid19.csv")

In [12]:
df.printSchema()

root
 |-- sexo: string (nullable = true)
 |-- grupo_etario: string (nullable = true)
 |-- jurisdiccion_residencia: string (nullable = true)
 |-- jurisdiccion_residencia_id: integer (nullable = true)
 |-- depto_residencia: string (nullable = true)
 |-- depto_residencia_id: integer (nullable = true)
 |-- jurisdiccion_aplicacion: string (nullable = true)
 |-- jurisdiccion_aplicacion_id: integer (nullable = true)
 |-- depto_aplicacion: string (nullable = true)
 |-- depto_aplicacion_id: integer (nullable = true)
 |-- fecha_aplicacion: timestamp (nullable = true)
 |-- vacuna: string (nullable = true)
 |-- condicion_aplicacion: string (nullable = true)
 |-- orden_dosis: integer (nullable = true)
 |-- lote_vacuna: string (nullable = true)



In [14]:
sorted(df.groupBy('sexo').agg({'sexo':'count'}).collect())

[Row(sexo='F', count(sexo)=37577611),
 Row(sexo='M', count(sexo)=33662224),
 Row(sexo='S.I.', count(sexo)=158007),
 Row(sexo='X', count(sexo)=18)]

In [15]:
df.select("grupo_etario").distinct().show()

+------------+
|grupo_etario|
+------------+
|       70-79|
|       18-29|
|       80-89|
|       50-59|
|       12-17|
|       >=100|
|       40-49|
|       30-39|
|         <12|
|       60-69|
|       90-99|
|        S.I.|
+------------+



In [16]:
df.select("orden_dosis").distinct().show(40)

+-----------+
|orden_dosis|
+-----------+
|          2|
|          4|
|          1|
|          3|
+-----------+



In [17]:
df.select("fecha_aplicacion").show(20)

+-------------------+
|   fecha_aplicacion|
+-------------------+
|2021-03-02 00:00:00|
|2021-08-07 00:00:00|
|2021-03-13 00:00:00|
|2021-09-07 00:00:00|
|2021-07-29 00:00:00|
|2021-09-22 00:00:00|
|2021-07-23 00:00:00|
|2021-06-04 00:00:00|
|2021-09-29 00:00:00|
|2021-07-19 00:00:00|
|2021-06-03 00:00:00|
|2021-06-05 00:00:00|
|2021-09-29 00:00:00|
|2021-07-24 00:00:00|
|2021-09-22 00:00:00|
|2021-09-22 00:00:00|
|2021-06-05 00:00:00|
|2021-06-05 00:00:00|
|2021-09-22 00:00:00|
|2021-10-31 00:00:00|
+-------------------+
only showing top 20 rows



In [18]:
df_vaccine_application = sorted(df.groupBy('fecha_aplicacion','orden_dosis','jurisdiccion_residencia','sexo').agg({'vacuna':'count'}).collect())

In [19]:
df_vaccine_application = spark.createDataFrame(df_vaccine_application)

In [20]:
df_vaccine_application.show(5)

+-------------------+-----------+-----------------------+----+-------------+
|   fecha_aplicacion|orden_dosis|jurisdiccion_residencia|sexo|count(vacuna)|
+-------------------+-----------+-----------------------+----+-------------+
|2020-12-29 00:00:00|          1|           Buenos Aires|   F|         1322|
|2020-12-29 00:00:00|          1|           Buenos Aires|   M|          767|
|2020-12-29 00:00:00|          1|                   CABA|   F|          326|
|2020-12-29 00:00:00|          1|                   CABA|   M|          222|
|2020-12-29 00:00:00|          1|              Catamarca|   F|          164|
+-------------------+-----------+-----------------------+----+-------------+
only showing top 5 rows



In [21]:
df_vaccine_application.count()

46700

In [22]:
df_vaccine_application.printSchema()

root
 |-- fecha_aplicacion: timestamp (nullable = true)
 |-- orden_dosis: long (nullable = true)
 |-- jurisdiccion_residencia: string (nullable = true)
 |-- sexo: string (nullable = true)
 |-- count(vacuna): long (nullable = true)



In [23]:
df_vaccine_application.agg({'count(vacuna)':'sum'}).show()

+------------------+
|sum(count(vacuna))|
+------------------+
|          71397860|
+------------------+



In [24]:
df_vaccine_application.printSchema()

root
 |-- fecha_aplicacion: timestamp (nullable = true)
 |-- orden_dosis: long (nullable = true)
 |-- jurisdiccion_residencia: string (nullable = true)
 |-- sexo: string (nullable = true)
 |-- count(vacuna): long (nullable = true)



In [25]:
df_vaccine_application = df_vaccine_application \
                         .withColumnRenamed('count(vacuna)',"cantidad_vacunas") \
                         .withColumnRenamed('jurisdiccion_residencia',"provincia") \
                         .withColumn('fecha_aplicacion',to_date("fecha_aplicacion"))

In [26]:
df_vaccine_application = df_vaccine_application.selectExpr(
                            'fecha_aplicacion',
                            'provincia',
                            'sexo',
                            'cantidad_vacunas',
                            'cast(orden_dosis as string) orden_dosis'
)

In [27]:
df_vaccine_application.printSchema()

root
 |-- fecha_aplicacion: date (nullable = true)
 |-- provincia: string (nullable = true)
 |-- sexo: string (nullable = true)
 |-- cantidad_vacunas: long (nullable = true)
 |-- orden_dosis: string (nullable = true)



In [28]:
df_vaccine_application.show(5)

+----------------+------------+----+----------------+-----------+
|fecha_aplicacion|   provincia|sexo|cantidad_vacunas|orden_dosis|
+----------------+------------+----+----------------+-----------+
|      2020-12-29|Buenos Aires|   F|            1322|          1|
|      2020-12-29|Buenos Aires|   M|             767|          1|
|      2020-12-29|        CABA|   F|             326|          1|
|      2020-12-29|        CABA|   M|             222|          1|
|      2020-12-29|   Catamarca|   F|             164|          1|
+----------------+------------+----+----------------+-----------+
only showing top 5 rows



In [29]:
output_dir = '/dataset/output.parquet'

In [30]:
df_vaccine_application.write.mode('overwrite').partitionBy("fecha_aplicacion","orden_dosis").parquet(output_dir)

In [31]:
spark.read.parquet(output_dir)

DataFrame[provincia: string, sexo: string, cantidad_vacunas: bigint, fecha_aplicacion: date, orden_dosis: int]

In [32]:
df_vaccine_application \
    .write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://postgres/covid") \
    .option("dbtable", "covid.vaccine") \
    .option("user", "covid") \
    .option("password", "c0v1d") \
    .option("driver", "org.postgresql.Driver") \
    .mode('append') \
    .save()


In [None]:
spark.stop() 