In [1]:
# Import findspark 
import findspark

findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.getOrCreate()
import pandas as pd
pd.read_csv("../datasources/caso_covid_full.csv")

Unnamed: 0,date,state,city,place_type,confirmed,deaths,order_for_place,is_last,estimated_population_2019,estimated_population,city_ibge_code,confirmed_per_100k_inhabitants,death_rate
0,2020-12-31,AP,,state,68201,925,283,False,845731.0,861773.0,16.0,7914.03305,0.0136
1,2020-12-30,AP,,state,67702,919,282,False,845731.0,861773.0,16.0,7856.12917,0.0136
2,2020-12-29,AP,,state,67405,913,281,False,845731.0,861773.0,16.0,7821.66533,0.0135
3,2020-12-28,AP,,state,67149,907,280,False,845731.0,861773.0,16.0,7791.95914,0.0135
4,2020-12-27,AP,,state,66724,901,279,False,845731.0,861773.0,16.0,7742.64220,0.0135
...,...,...,...,...,...,...,...,...,...,...,...,...,...
1332236,2021-01-08,SP,Óleo,city,6,0,181,False,2496.0,2471.0,3533809.0,242.81667,0.0000
1332237,2021-01-07,SP,Óleo,city,6,0,180,False,2496.0,2471.0,3533809.0,242.81667,0.0000
1332238,2021-01-05,SP,Óleo,city,6,0,179,False,2496.0,2471.0,3533809.0,242.81667,0.0000
1332239,2021-01-04,SP,Óleo,city,6,0,178,False,2496.0,2471.0,3533809.0,242.81667,0.0000


In [2]:
spark.conf.set("spark.sql.shuffle.partitions", 8)

df = spark \
        .read \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .option("sep", ",") \
        .csv("../datasources/caso_covid_full.csv")

In [3]:
df.printSchema()

root
 |-- date: string (nullable = true)
 |-- state: string (nullable = true)
 |-- city: string (nullable = true)
 |-- place_type: string (nullable = true)
 |-- confirmed: integer (nullable = true)
 |-- deaths: integer (nullable = true)
 |-- order_for_place: integer (nullable = true)
 |-- is_last: boolean (nullable = true)
 |-- estimated_population_2019: integer (nullable = true)
 |-- estimated_population: integer (nullable = true)
 |-- city_ibge_code: integer (nullable = true)
 |-- confirmed_per_100k_inhabitants: double (nullable = true)
 |-- death_rate: double (nullable = true)



## Data Preprocessing

In [4]:
df = df \
        .filter(df.place_type == "city") \
        .drop("is_last", "estimated_population_2019", "order_for_place")

### Write transformed Data

In [5]:
df \
    .write \
    .mode("overwrite") \
    .option("compression", "snappy") \
    .parquet("../datasources/casos_covid_preprocess.parquet")

## Create grouping

In [6]:
df \
  .filter(df.place_type == "city") \
  .groupBy("state", "city") \
  .agg({"confirmed": "sum", "deaths": "sum", "confirmed_per_100k_inhabitants": "avg"}) \
  .toPandas()

Unnamed: 0,state,city,avg(confirmed_per_100k_inhabitants),sum(confirmed),sum(deaths)
0,AP,Itaubal,4118.844022,59227,0
1,AP,Mazagão,5051.995381,296355,1648
2,AM,Apuí,930.629120,52436,479
3,AM,Beruri,3359.915427,181604,2371
4,AM,Boca do Acre,2820.522984,288182,2926
...,...,...,...,...,...
5584,SP,Timburi,467.603464,2939,167
5585,SP,Torrinha,1271.451301,29535,887
5586,SP,Três Fronteiras,2781.807665,35205,1278
5587,SP,Vera Cruz,758.132048,15590,712


In [7]:
df.toPandas()['place_type'].value_counts()

city    1323917
Name: place_type, dtype: int64

In [8]:
df \
  .where(col("city").isNull()).toPandas()["place_type"].value_counts()

Series([], Name: place_type, dtype: int64)

In [9]:
df \
  .where(col("city").isNotNull()).toPandas()["place_type"].value_counts()

city    1323917
Name: place_type, dtype: int64

In [10]:
df.where(col("place_type") == "city").where(col("city").isNull()).collect()

[]