In [4]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
     -------------------------------------- 310.8/310.8 MB 7.4 MB/s eta 0:00:00
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Collecting py4j==0.10.9.7
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
     ------------------------------------- 200.5/200.5 kB 12.7 MB/s eta 0:00:00
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py): started
  Building wheel for pyspark (setup.py): finished with status 'done'
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285411 sha256=d141f46ab0642cf57bea2ae4aeadfc75b897f3a9090dd67ae989c87751ecb60d
  Stored in directory: c:\users\nicol\appdata\local\pip\cache\wheels\2b\9a\39\d8019ffbfb76a39433455e3d5799e94d3e3cae8f41229f6bf8
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.7 pyspark-3.4.1
Collecting 

In [42]:
from pyspark.sql import SparkSession

In [43]:
spark = SparkSession \
    .builder \
    .appName("Carga_Trusted") \
    .getOrCreate()

In [44]:
filepath_trusted = '/PastaCompartilhadaHost/desafio-data-engineer/datalake/trusted/*/*/*.parquet'

In [45]:
df = spark.read.option("mergeSchema", "true").parquet(filepath_trusted)

In [46]:
df.sort("data","pais").show()

+-------------------+--------------------+---------+----------+-------------------+----------------------+-----------------+----------------------+
|               pais|              estado| latitude| longitude|               data|quantidade_confirmados|quantidade_mortes|quantidade_recuperados|
+-------------------+--------------------+---------+----------+-------------------+----------------------+-----------------+----------------------+
|        Afghanistan|                  -1| 33.93911| 67.709953|2020-01-22 00:00:00|                     0|                0|                     0|
|            Albania|                  -1|  41.1533|   20.1683|2020-01-22 00:00:00|                     0|                0|                     0|
|            Algeria|                  -1|  28.0339|    1.6596|2020-01-22 00:00:00|                     0|                0|                     0|
|            Andorra|                  -1|  42.5063|    1.5218|2020-01-22 00:00:00|                     0|      

In [47]:
# Importa bibliotecas necessárias
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, month, to_timestamp, sum, avg 
from pyspark.sql.functions import array, explode, struct, lit, col
from airflow.decorators import task
import os  

In [48]:
filepath_input = '/PastaCompartilhadaHost/desafio-data-engineer/datalake/raw/covid19'
filepath_output = '/PastaCompartilhadaHost/desafio-data-engineer/datalake/trusted'


# Para os arquivos de input, geera os respectivos dfs
path_confirmados = os.path.join(filepath_input,'time_series_covid19_confirmed_global.csv')
df_confirmados = spark.read.csv(path_confirmados, header=True, inferSchema=True)
path_mortes = os.path.join(filepath_input,'time_series_covid19_deaths_global.csv')
df_mortes = spark.read.csv(path_mortes, header=True, inferSchema=True)
path_recuperados = os.path.join(filepath_input,'time_series_covid19_recovered_global.csv')
df_recuperados = spark.read.csv(path_recuperados, header=True, inferSchema=True)

# Ajusta cada df transformando colunas de datas em linhas
# Para que seja possível join mais a frente sem cartesiano, necessário preencher valores nulos
# Tentou-se estratégias sem preenchimento de valores, como utilizando eqNullSafe com full join, sem sucesso
df_confirmados_aj = ajusta_df(df_confirmados, \
            ["Province/State", "Country/Region", "Lat", "Long"], \
            "data_t", \
            "quantidade_confirmados") \
            .na.fill(value="-1",subset=["Province/State"])\
            .na.fill(value=-1,subset=["Lat", "Long"])
                
df_mortes_aj = ajusta_df(df_mortes, \
                ["Province/State", "Country/Region", "Lat", "Long"], \
                "data_t", \
                "quantidade_mortes") \
                .na.fill(value="-1",subset=["Province/State"])\
                .na.fill(value=-1,subset=["Lat", "Long"])
df_recuperados_aj = ajusta_df(df_recuperados, \
                ["Province/State", "Country/Region", "Lat", "Long"], \
                "data_t", \
                "quantidade_recuperados") \
                .na.fill(value="-1",subset=["Province/State"])\
                .na.fill(value=-1,subset=["Lat", "Long"])



In [49]:
df_confirmados_aj.summary().show()

[Stage 66:>                                                         (0 + 1) / 1]

+-------+--------------+--------------+------------------+------------------+------+----------------------+
|summary|Province/State|Country/Region|               Lat|              Long|data_t|quantidade_confirmados|
+-------+--------------+--------------+------------------+------------------+------+----------------------+
|  count|        131175|        131175|            131175|            131175|131175|                131175|
|   mean|          -1.0|          null|20.378187454548296|22.853393141819883|  null|    178641.85499523536|
| stddev|           0.0|          null| 25.12360910363382| 73.22238037277491|  null|    1223316.1245162636|
|    min|            -1|   Afghanistan|          -51.7963|         -178.1165|1/1/21|                     0|
|    25%|          -1.0|          null|            4.5709|          -19.0208|  null|                    81|
|    50%|          -1.0|          null|         21.521757|         20.902977|  null|                  1291|
|    75%|          -1.0|    

                                                                                

In [50]:
df_mortes_aj.summary().show()

[Stage 69:>                                                         (0 + 1) / 1]

+-------+--------------+--------------+------------------+------------------+------+------------------+
|summary|Province/State|Country/Region|               Lat|              Long|data_t| quantidade_mortes|
+-------+--------------+--------------+------------------+------------------+------+------------------+
|  count|        131175|        131175|            131175|            131175|131175|            131175|
|   mean|          -1.0|          null|20.378187454548296|22.853393141819883|  null| 4311.777091671432|
| stddev|           0.0|          null| 25.12360910363382| 73.22238037277491|  null|24884.883496583665|
|    min|            -1|   Afghanistan|          -51.7963|         -178.1165|1/1/21|                 0|
|    25%|          -1.0|          null|            4.5709|          -19.0208|  null|                 1|
|    50%|          -1.0|          null|         21.521757|         20.902977|  null|                20|
|    75%|          -1.0|          null|           41.1129|      

                                                                                

In [51]:
df_recuperados_aj.summary().show()

[Stage 72:>                                                         (0 + 1) / 1]

+-------+--------------+--------------+------------------+-----------------+------+----------------------+
|summary|Province/State|Country/Region|               Lat|             Long|data_t|quantidade_recuperados|
+-------+--------------+--------------+------------------+-----------------+------+----------------------+
|  count|        124020|        124020|            124020|           124020|124020|                124020|
|   mean|          -1.0|          null|19.025420719234248|28.38237537693291|  null|    110515.60972423802|
| stddev|           0.0|          null|24.577543028073567| 70.6865522696299|  null|     657530.0087847405|
|    min|            -1|   Afghanistan|          -51.7963|        -178.1165|1/1/21|                     0|
|    25%|          -1.0|          null|            4.5353|        -9.429499|  null|                    29|
|    50%|          -1.0|          null|           19.3133|          23.8813|  null|                   923|
|    75%|          -1.0|          nul

                                                                                

In [52]:
df_recuperados_aj.withColumnRenamed("Country/Region","pais") \
.withColumnRenamed("Province/State","estado") \
.withColumnRenamed("Lat","latitude") \
.withColumnRenamed("Long","longitude").createOrReplaceTempView('rec')

In [53]:
df_confirmados_aj.withColumnRenamed("Country/Region","pais") \
.withColumnRenamed("Province/State","estado") \
.withColumnRenamed("Lat","latitude") \
.withColumnRenamed("Long","longitude").createOrReplaceTempView('conf')

In [54]:
spark.sql("select rec.* from rec left join conf on rec.estado == conf.estado and rec.pais == conf.pais and rec.data_t == conf.data_t and rec.latitude == conf.latitude and rec.longitude == conf.longitude where conf.pais is null and conf.estado is null and conf.data_t is null and conf.latitude is null and conf.longitude is null").count()

2862

In [37]:
spark.sql("select rec.* from rec left join conf on rec.estado == conf.estado and rec.pais == conf.pais and rec.data_t == conf.data_t and rec.latitude == conf.latitude and rec.longitude == conf.longitude where conf.pais is null and conf.estado is null and conf.data_t is null and conf.latitude is null and conf.longitude is null").show()

+------+------+--------+---------+-------+----------------------+
|estado|  pais|latitude|longitude| data_t|quantidade_recuperados|
+------+------+--------+---------+-------+----------------------+
|    -1|Canada| 56.1304|-106.3468|1/22/20|                     0|
|    -1|Canada| 56.1304|-106.3468|1/23/20|                     0|
|    -1|Canada| 56.1304|-106.3468|1/24/20|                     0|
|    -1|Canada| 56.1304|-106.3468|1/25/20|                     0|
|    -1|Canada| 56.1304|-106.3468|1/26/20|                     0|
|    -1|Canada| 56.1304|-106.3468|1/27/20|                     0|
|    -1|Canada| 56.1304|-106.3468|1/28/20|                     0|
|    -1|Canada| 56.1304|-106.3468|1/29/20|                     0|
|    -1|Canada| 56.1304|-106.3468|1/30/20|                     0|
|    -1|Canada| 56.1304|-106.3468|1/31/20|                     0|
|    -1|Canada| 56.1304|-106.3468| 2/1/20|                     0|
|    -1|Canada| 56.1304|-106.3468| 2/2/20|                     0|
|    -1|Ca

In [55]:
spark.sql("select * from conf where pais = 'Canada' and estado ='-1' ").show()

+------+----+--------+---------+------+----------------------+
|estado|pais|latitude|longitude|data_t|quantidade_confirmados|
+------+----+--------+---------+------+----------------------+
+------+----+--------+---------+------+----------------------+



In [56]:
spark.sql("select * from rec where pais = 'Canada' and estado ='-1' ").show()

+------+------+--------+---------+-------+----------------------+
|estado|  pais|latitude|longitude| data_t|quantidade_recuperados|
+------+------+--------+---------+-------+----------------------+
|    -1|Canada| 56.1304|-106.3468|1/22/20|                     0|
|    -1|Canada| 56.1304|-106.3468|1/23/20|                     0|
|    -1|Canada| 56.1304|-106.3468|1/24/20|                     0|
|    -1|Canada| 56.1304|-106.3468|1/25/20|                     0|
|    -1|Canada| 56.1304|-106.3468|1/26/20|                     0|
|    -1|Canada| 56.1304|-106.3468|1/27/20|                     0|
|    -1|Canada| 56.1304|-106.3468|1/28/20|                     0|
|    -1|Canada| 56.1304|-106.3468|1/29/20|                     0|
|    -1|Canada| 56.1304|-106.3468|1/30/20|                     0|
|    -1|Canada| 56.1304|-106.3468|1/31/20|                     0|
|    -1|Canada| 56.1304|-106.3468| 2/1/20|                     0|
|    -1|Canada| 56.1304|-106.3468| 2/2/20|                     0|
|    -1|Ca

In [58]:
# Unifica os dataframes
merged_df = df_confirmados_aj.join(df_mortes_aj, \
                            ["Province/State", "Country/Region", "Lat", "Long", "data_t"], \
                                how="full") \
                            .join(df_recuperados_aj, \
                                ["Province/State", "Country/Region", "Lat", "Long", "data_t"], \
                                how="full")

# Gera df com colunas e estruturas desejadas
merged_df = merged_df.select("*") \
.withColumnRenamed("Country/Region","pais") \
.withColumnRenamed("Province/State","estado") \
.withColumnRenamed("Lat","latitude") \
.withColumnRenamed("Long","longitude") \
.withColumn("data", to_timestamp(col("data_t"),"M/d/yy")) \
.drop("data_t") \
.withColumn("ano",year(col("data"))) \
.withColumn("mes",month(col("data")))

# Gera df com ordem de colunas desejada e campos de qtd com tipagem desejada
merged_df = merged_df.selectExpr("pais", \
                        "estado", \
                        "latitude", \
                        "longitude", \
                        "data", \
                        "cast(quantidade_confirmados as long) quantidade_confirmados", \
                        "cast(quantidade_mortes as long) quantidade_mortes", \
                        "cast(quantidade_recuperados as long) quantidade_recuperados", \
                        "ano",
                        "mes"
                        )



In [59]:
merged_df.createOrReplaceTempView('merged')

In [61]:
spark.sql('select * from merged where pais="Canada" and estado = -1').show()

[Stage 94:>                                                         (0 + 1) / 1]

+------+------+--------+---------+-------------------+----------------------+-----------------+----------------------+----+---+
|  pais|estado|latitude|longitude|               data|quantidade_confirmados|quantidade_mortes|quantidade_recuperados| ano|mes|
+------+------+--------+---------+-------------------+----------------------+-----------------+----------------------+----+---+
|Canada|    -1| 56.1304|-106.3468|2021-01-01 00:00:00|                  null|             null|                494437|2021|  1|
|Canada|    -1| 56.1304|-106.3468|2021-01-10 00:00:00|                  null|             null|                565049|2021|  1|
|Canada|    -1| 56.1304|-106.3468|2021-01-11 00:00:00|                  null|             null|                575152|2021|  1|
|Canada|    -1| 56.1304|-106.3468|2021-01-13 00:00:00|                  null|             null|                591131|2021|  1|
|Canada|    -1| 56.1304|-106.3468|2021-01-14 00:00:00|                  null|             null|         

23/07/23 13:46:16 WARN DAGScheduler: Broadcasting large task binary with size 1146.5 KiB
                                                                                

In [64]:
spark.sql('select * from merged where pais="Canada" and estado <> "-1"').show()

[Stage 137:>                                                        (0 + 1) / 1]

+------+-------+--------+---------+-------------------+----------------------+-----------------+----------------------+----+---+
|  pais| estado|latitude|longitude|               data|quantidade_confirmados|quantidade_mortes|quantidade_recuperados| ano|mes|
+------+-------+--------+---------+-------------------+----------------------+-----------------+----------------------+----+---+
|Canada|Alberta| 53.9333|-116.5765|2021-01-10 00:00:00|                111452|             1284|                  null|2021|  1|
|Canada|Alberta| 53.9333|-116.5765|2021-01-12 00:00:00|                112743|             1345|                  null|2021|  1|
|Canada|Alberta| 53.9333|-116.5765|2021-01-14 00:00:00|                114585|             1389|                  null|2021|  1|
|Canada|Alberta| 53.9333|-116.5765|2021-01-15 00:00:00|                115370|             1402|                  null|2021|  1|
|Canada|Alberta| 53.9333|-116.5765|2021-01-17 00:00:00|                116837|             1436| 

23/07/23 13:47:16 WARN DAGScheduler: Broadcasting large task binary with size 1146.3 KiB
                                                                                

In [6]:
df.summary().show()



+-------+-----------+--------+-----------------+-----------------+----------------------+-----------------+----------------------+
|summary|       pais|  estado|         latitude|        longitude|quantidade_confirmados|quantidade_mortes|quantidade_recuperados|
+-------+-----------+--------+-----------------+-----------------+----------------------+-----------------+----------------------+
|  count|     134037|  134037|           134037|           134037|                131175|           131175|                124020|
|   mean|       null|    -1.0|20.42409911031997|23.51284133096057|    178641.85499523536|4311.777091671432|    110515.60972423802|
| stddev|       null|     0.0| 25.1637945552342|73.51623563186946|    1223316.1245162634|24884.88349658362|      657530.008784745|
|    min|Afghanistan|      -1|         -51.7963|        -178.1165|                     0|                0|                     0|
|    25%|       null|    -1.0|           4.5709|         -15.3101|                 

                                                                                

In [109]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, sum, avg, days 

In [110]:
df_trusted = df.groupBy(col("pais"), \
            col("data"), \
            col("ano")) \
    .agg(sum(col("quantidade_confirmados")).alias("soma_confirmados"), \
         sum(col("quantidade_mortes")).alias("soma_mortes"), \
         sum(col("quantidade_recuperados")).alias("soma_recuperados") )

In [111]:
#create window by casting timestamp to long (number of seconds)
w = (Window.orderBy(col("data")).partitionBy(col("pais")).rowsBetween(-7, 0))

df_trusted = df_trusted.withColumn("media_movel_confirmados",avg(col("soma_confirmados")).over(w)) \
    .withColumn("media_movel_mortes",avg(col("soma_mortes")).over(w)) \
    .withColumn("media_movel_recuperados",avg(col("soma_recuperados")).over(w)) \
    .select(col("pais"), col("data"), col("media_movel_confirmados"), col("media_movel_mortes"), col("media_movel_recuperados"), col("ano"))
#.filter(col("media_confirmados")>0) \ 
#type(df_trusted)
#df_trusted.select("*")

In [112]:
df_trusted.show()

+-----------+-------------------+-----------------------+------------------+-----------------------+----+
|       pais|               data|media_movel_confirmados|media_movel_mortes|media_movel_recuperados| ano|
+-----------+-------------------+-----------------------+------------------+-----------------------+----+
|Afghanistan|2020-01-22 00:00:00|                    0.0|               0.0|                    0.0|2020|
|Afghanistan|2020-01-23 00:00:00|                    0.0|               0.0|                    0.0|2020|
|Afghanistan|2020-01-24 00:00:00|                    0.0|               0.0|                    0.0|2020|
|Afghanistan|2020-01-25 00:00:00|                    0.0|               0.0|                    0.0|2020|
|Afghanistan|2020-01-26 00:00:00|                    0.0|               0.0|                    0.0|2020|
|Afghanistan|2020-01-27 00:00:00|                    0.0|               0.0|                    0.0|2020|
|Afghanistan|2020-01-28 00:00:00|             

In [101]:
df_trusted.write.csv('/opt/airflow/dags/desafio-data-engineer/datalake/refined/teste',header=True,mode="overwrite")

In [102]:
df.write.csv('/opt/airflow/dags/desafio-data-engineer/datalake/refined/teste_leitura',header=True,mode="overwrite")