In [0]:
import pyspark.sql.functions as F

In [0]:
#Criação de dataframes a partir da eitura dos json gravados a partir do mysql Country e Statistic
#gravados como arquivos json na etapa anterior
stat_jsonDF = spark.read.json('/raw/statistic.json')
country_jsonDF = spark.read.json('/raw/country.json')

In [0]:
stat_jsonDF.show()


In [0]:
#Realizando a join entre os dataframes Country e Statistic
stat_jsonDF = stat_jsonDF.withColumnRenamed('countryCode', 'countryCode_stat')
df_join = country_jsonDF.join(stat_jsonDF, on=country_jsonDF.countryCode==stat_jsonDF.countryCode_stat).drop('countryCode_stat')


In [0]:

df_join.show()

In [0]:
df_join.printSchema()

In [0]:
#Convertendo a coluna date para o tipo date e criacao das colunas mes, dia e ano a partir dela
df_join = df_join.withColumn('date', F.to_date('date')).withColumn('ano', F.year(F.col('date')))\
    .withColumn('mes', F.month(F.col('date'))).withColumn('dia', F.dayofmonth(F.col('date')))


In [0]:
help(df_join.withColumn)

In [0]:
#Renomeando algumas colunas para os nomes em portugues
df_join = df_join.withColumnRenamed('country','pais').withColumnRenamed('countryCode','sigla')\
            .withColumnRenamed('lat','latitude').withColumnRenamed('lon','longitude')\
            .withColumnRenamed('date','data')

In [0]:
df_join.printSchema()

In [0]:
#Temos dados que formam o acumulado diario de mortes, recuperados e confirmados, assim
# fazemos o recalculo diario dos dados subtraindo o valor do dia com o dia anterior.
#Para isso, utilizamos o Windows para particionar, agrupando o pais e ordenando por data e
# criando novas colunas com a informacao do dia anterior.
 
from pyspark.sql.window import Window
from pyspark.sql.functions import lit
 
dfu = df_join.toDF(*df_join.columns)
 
dfu = dfu.withColumn('prev_day_deaths',
                        F.lag(dfu['deaths'])
                                 .over(Window.partitionBy("pais").orderBy(dfu.data)))
dfu = dfu.withColumn('prev_day_confirmed',
                        F.lag(dfu['confirmed'])
                                 .over(Window.partitionBy("pais").orderBy(dfu.data)))
dfu = dfu.withColumn('prev_day_recovered',
                        F.lag(dfu['recovered'])
                                 .over(Window.partitionBy("pais").orderBy(dfu.data)))
 
#Calcula-se o dado diario e removemos as colunas com os dados do dia anterior
result = dfu.withColumn('daily_deaths', 
          (dfu['deaths'] - dfu['prev_day_deaths'])) \
          .withColumn('daily_confirmed', 
          (dfu['confirmed'] - dfu['prev_day_confirmed']))\
          .withColumn('daily_recovered', 
          (dfu['recovered'] - dfu['prev_day_recovered']))\
          .drop('prev_day_deaths','prev_day_confirmed','prev_day_recovered')

In [0]:
#Para o primeiro dia da serie particionada de cada pais, temos o valor nulo agora. Preenchemos com o valor 0. 
result = result.na.fill(0)

In [0]:
#Verificacao dos dados para o dia 02/06/2021 ordenado pela quantidade de mortes diaria decrescente
result.filter(result.data == '2021-06-02').orderBy(result.daily_deaths.desc()).show()

In [0]:
#Verificacao do pico anomalo de registros de mortes no Peru no dia 02/06/2021.
result.filter(result.pais == 'Peru').orderBy(result.data.desc()).show()

In [0]:
help(df_join.toDF) ...

In [0]:
from datetime import datetime
#Leitura do dado de vacina do dia
curr_date = datetime.now().strftime("%Y%m%d")
vacinas_csvDF = spark.read.csv(f"/raw/owid-covid-data_{curr_date}.csv", header='true', \
                      inferSchema='true')
vacinas_csvDF = vacinas_csvDF.withColumn('data_vacina', F.to_date(F.col('date'))).drop('date')


In [0]:
#Leitura do json de country_codes para mapeamento de siglas dos paises
country_codes_DF = spark.read.json("/raw/country_codes_iso3.json")
#Mapeamento para substituir as siglas dos paises de 2 letras para 3
from itertools import chain
import json
 
country_codes_json = json.loads(country_codes_DF.toJSON().first())
map_sigla_expr = F.create_map([F.lit(x) for x in chain(*country_codes_json.items())])

In [0]:
#Criando nova coluna com a sigla de cada pais com 3 letras
df_covid_join = result.withColumn('sigla_iso3', map_sigla_expr[result['sigla']])

In [0]:
df_covid_vacinas = df_covid_join.join(vacinas_csvDF, on=((df_covid_join['sigla_iso3']==vacinas_csvDF['iso_code'])&\
                                                      (df_covid_join['data']==vacinas_csvDF['data_vacina'])), how='left')


In [0]:
df_covid_vacinas.printSchema()

In [0]:
display(df_covid_vacinas)

In [0]:
display(df_covid_vacinas.filter(df_covid_vacinas.pais == 'Brazil').orderBy(df_covid_vacinas.data.desc()))

In [0]:
#Removendo os parquet anteriores para evitar problemas em relacao ao overwrite c ...
help(df_covid_vacinas.write.saveAsTable) ...

In [0]:
df_covid_vacinas.write.saveAsTable('covid_info', format='parquet', mode='overwrite',\
                                   partitionBy=['ano','mes'], path='/cleaned/covidinfo.parquet')

In [0]:
%fs
ls /raw 

In [0]:
%fs
ls /cleaned