### Load, transformation and few selects with PySpark and Jupyter

In [266]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as Func
from pyspark.sql.types import *
from pyspark.sql.functions import *
import pandas as pd
from sqlalchemy import create_engine

In [267]:
# iniciando uma Spark Session
spark = SparkSession.builder \
    .master("local") \
        .appName("etl") \
            .config("spark.executer.memory","1gb") \
                .getOrCreate()

# changing settings of pyspark for date transformation                
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

DataFrame[key: string, value: string]

In [268]:
engine = create_engine("postgresql+psycopg2://postgres:root@localhost:5432/covid?client_encoding=utf8")

In [269]:
df_nwd = pd.read_sql('SELECT * FROM national_weekly_data', engine)
df_cotw = pd.read_sql('SELECT * FROM countries_of_the_world', engine)

In [270]:
national_weekly_data_spk = spark.createDataFrame(df_nwd)
countries_of_the_world_spk = spark.createDataFrame(df_cotw)

In [273]:
national_weekly_data_spk.schema

StructType(List(StructField(index,LongType,true),StructField(country,StringType,true),StructField(country_code,StringType,true),StructField(continent,StringType,true),StructField(population,LongType,true),StructField(indicator,StringType,true),StructField(weekly_count,DoubleType,true),StructField(year_week,StringType,true),StructField(cumulative_count,LongType,true),StructField(source,StringType,true),StructField(rate_14_day,DoubleType,true),StructField(note,StringType,true)))

In [274]:
countries_of_the_world_spk.schema

StructType(List(StructField(index,LongType,true),StructField(Country,StringType,true),StructField(Region,StringType,true),StructField(Population,LongType,true),StructField(Area (sq. mi.),LongType,true),StructField(Pop. Density (per sq. mi.),StringType,true),StructField(Coastline (coast/area ratio),StringType,true),StructField(Net migration,StringType,true),StructField(Infant mortality (per 1000 births),StringType,true),StructField(GDP ($ per capita),DoubleType,true),StructField(Literacy (%),StringType,true),StructField(Phones (per 1000),StringType,true),StructField(Arable (%),StringType,true),StructField(Crops (%),StringType,true),StructField(Other (%),StringType,true),StructField(Climate,StringType,true),StructField(Birthrate,StringType,true),StructField(Deathrate,StringType,true),StructField(Agriculture,StringType,true),StructField(Industry,StringType,true),StructField(Service,StringType,true)))

In [275]:
national_weekly_data_spk = national_weekly_data_spk.withColumn("cumulative_count", national_weekly_data_spk["cumulative_count"].cast(IntegerType()))
national_weekly_data_spk = national_weekly_data_spk.withColumn("population", national_weekly_data_spk["population"].cast(IntegerType()))
national_weekly_data_spk = national_weekly_data_spk.withColumn("weekly_count", national_weekly_data_spk["weekly_count"].cast(IntegerType()))
national_weekly_data_spk = national_weekly_data_spk.withColumn("rate_14_day", national_weekly_data_spk["rate_14_day"].cast(DoubleType()))
national_weekly_data_spk = national_weekly_data_spk.withColumn("day_of_year", Func.to_date(Func.col("year_week"), "yyyy-ww"))

In [276]:
tempList = []
for col in countries_of_the_world_spk.columns:
    new_name = col.strip()
    new_name = "".join(new_name.split())
    new_name = new_name.replace('.','')
    tempList.append(new_name) 

In [277]:
tempList

['index',
 'Country',
 'Region',
 'Population',
 'Area(sqmi)',
 'PopDensity(persqmi)',
 'Coastline(coast/arearatio)',
 'Netmigration',
 'Infantmortality(per1000births)',
 'GDP($percapita)',
 'Literacy(%)',
 'Phones(per1000)',
 'Arable(%)',
 'Crops(%)',
 'Other(%)',
 'Climate',
 'Birthrate',
 'Deathrate',
 'Agriculture',
 'Industry',
 'Service']

In [278]:
countries_of_the_world_spk = countries_of_the_world_spk.toDF(*tempList)

In [280]:
countries_of_the_world_spk.show()

+-----+------------------+--------------------+----------+----------+-------------------+--------------------------+------------+------------------------------+---------------+-----------+---------------+---------+--------+--------+-------+---------+---------+-----------+--------+-------+
|index|           Country|              Region|Population|Area(sqmi)|PopDensity(persqmi)|Coastline(coast/arearatio)|Netmigration|Infantmortality(per1000births)|GDP($percapita)|Literacy(%)|Phones(per1000)|Arable(%)|Crops(%)|Other(%)|Climate|Birthrate|Deathrate|Agriculture|Industry|Service|
+-----+------------------+--------------------+----------+----------+-------------------+--------------------------+------------+------------------------------+---------------+-----------+---------------+---------+--------+--------+-------+---------+---------+-----------+--------+-------+
|    0|      Afghanistan |ASIA (EX. NEAR EA...|  31056997|    647500|               48,0|                      0,00|       23,06| 

In [281]:
national_weekly_data_spk = national_weekly_data_spk.select("country", "country_code", "continent", \
    "population", "indicator", "weekly_count", "year_week", "rate_14_day", \
        "cumulative_count", "source", "day_of_year")

In [282]:
# q1
national_weekly_data_spk.select("country", "cumulative_count", "year_week")\
    .where(Func.col("year_week") == "2020-31")\
        .where(Func.col("indicator") == "cases")\
            .orderBy(Func.col("cumulative_count").desc())\
                .show()

22/05/20 13:06:52 WARN TaskSetManager: Stage 183 contains a task of very large size (5670 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+--------------------+----------------+---------+
|             country|cumulative_count|year_week|
+--------------------+----------------+---------+
|     America (total)|         9802912|  2020-31|
|United States Of ...|         4667955|  2020-31|
|        Asia (total)|         4155673|  2020-31|
|      Europe (total)|         3233940|  2020-31|
|              Brazil|         2733677|  2020-31|
|               India|         1803695|  2020-31|
|      EU/EEA (total)|         1459173|  2020-31|
|      Africa (total)|          957830|  2020-31|
|              Russia|          850870|  2020-31|
|        South Africa|          511485|  2020-31|
|              Mexico|          488250|  2020-31|
|                Peru|          428850|  2020-31|
|               Chile|          359731|  2020-31|
|            Colombia|          317651|  2020-31|
|               Spain|          309631|  2020-31|
|                Iran|          309437|  2020-31|
|      United Kingdom|          306830|  2020-31|


In [283]:
# q2
national_weekly_data_spk.select("country", "cumulative_count", "year_week")\
    .where(Func.col("year_week") == "2020-31")\
        .where(Func.col("indicator") == "cases")\
            .orderBy(Func.col("cumulative_count").asc())\
                .show(10)

22/05/20 13:06:58 WARN TaskSetManager: Stage 184 contains a task of very large size (5670 KiB). The maximum recommended task size is 1000 KiB.


+--------------------+----------------+---------+
|             country|cumulative_count|year_week|
+--------------------+----------------+---------+
|            Anguilla|               3|  2020-31|
|British Virgin Is...|               9|  2020-31|
|Bonaire, Saint Eu...|              11|  2020-31|
|            Holy See|              12|  2020-31|
|Falkland Islands ...|              13|  2020-31|
|          Montserrat|              13|  2020-31|
|           Greenland|              14|  2020-31|
|Saint Kitts And N...|              17|  2020-31|
|            Dominica|              18|  2020-31|
|                Laos|              20|  2020-31|
+--------------------+----------------+---------+
only showing top 10 rows

