## Missing packages installation

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz && rm spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

# To see the commands outputs for debugging purpose:
#   - Remove -qq > /dev/null from apt-get
#   - Remove -q from wget
#   - Remove -q from pip

## Environment Configuration

### Environment Variables

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

## Spark Configuration

In [0]:
import findspark

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

findspark.init()

spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

## Playground

In [0]:
!mkdir -p Datasets/
!wget -q -O Datasets/dpc-covid19-ita-regioni.csv https://raw.githubusercontent.com/pcm-dpc/COVID-19/master/dati-regioni/dpc-covid19-ita-regioni.csv
!wget -q -O Datasets/popolazione.csv https://raw.githubusercontent.com/vincenzomanzoni/covid19-curated-dataset/master/datasets/support/popolazione.csv
!wget -q -O Datasets/time_series_19-covid-Confirmed.csv https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_time_series/time_series_19-covid-Confirmed.csv

In [0]:
df_regioni_github = spark.read.csv('Datasets/dpc-covid19-ita-regioni.csv', inferSchema=True, header=True)
df_popolazione = spark.read.csv('Datasets/popolazione.csv', inferSchema=True, header=True, sep='\t')
df_global_confirmed = spark.read.csv('Datasets/time_series_19-covid-Confirmed.csv', inferSchema=True, header=True)

In [0]:
df_italia = df_regioni_github.groupBy("data").agg(
      sum("ricoverati_con_sintomi").alias("ricoverati_con_sintomi"),
      sum("terapia_intensiva").alias("terapia_intensiva"),
      sum("totale_ospedalizzati").alias("totale_ospedalizzati"),
      sum("isolamento_domiciliare").alias("isolamento_domiciliare"),
      sum("totale_attualmente_positivi").alias("totale_attualmente_positivi"),
      sum("nuovi_attualmente_positivi").alias("nuovi_attualmente_positivi"),
      sum("dimessi_guariti").alias("dimessi_guariti"),
      sum("deceduti").alias("deceduti"),
      sum("totale_casi").alias("totale_casi"),
      sum("tamponi").alias("tamponi")).withColumn("denominazione_regione", lit("Italia"))

df_italia = df_italia.select("data", 
                     "denominazione_regione", 
                     "ricoverati_con_sintomi",
                     "terapia_intensiva",
                     "totale_ospedalizzati",
                     "isolamento_domiciliare",
                     "totale_attualmente_positivi",
                     "nuovi_attualmente_positivi",
                     "dimessi_guariti",
                     "deceduti",
                     "totale_casi",
                     "tamponi").orderBy("data")

In [0]:
df_regioni_tranne_trentino_alto_adige = df_regioni_github.select("data", 
                     "denominazione_regione", 
                     "ricoverati_con_sintomi",
                     "terapia_intensiva",
                     "totale_ospedalizzati",
                     "isolamento_domiciliare",
                     "totale_attualmente_positivi",
                     "nuovi_attualmente_positivi",
                     "dimessi_guariti",
                     "deceduti",
                     "totale_casi",
                     "tamponi").where("denominazione_regione NOT IN ('P.A. Bolzano', 'P.A. Trento')")                    

In [0]:
df_regioni_trentino_alto_adige = df_regioni_github.where("denominazione_regione IN ('P.A. Bolzanoni', 'P.A. Trento')")
df_regioni_trentino_alto_adige = df_regioni_trentino_alto_adige.groupBy("data").agg(
      sum("ricoverati_con_sintomi").alias("ricoverati_con_sintomi"),
      sum("terapia_intensiva").alias("terapia_intensiva"),
      sum("totale_ospedalizzati").alias("totale_ospedalizzati"),
      sum("isolamento_domiciliare").alias("isolamento_domiciliare"),
      sum("totale_attualmente_positivi").alias("totale_attualmente_positivi"),
      sum("nuovi_attualmente_positivi").alias("nuovi_attualmente_positivi"),
      sum("dimessi_guariti").alias("dimessi_guariti"),
      sum("deceduti").alias("deceduti"),
      sum("totale_casi").alias("totale_casi"),
      sum("tamponi").alias("tamponi")).withColumn("denominazione_regione", lit("Trentino-Alto Adige"))

df_regioni_trentino_alto_adige = df_regioni_trentino_alto_adige.select("data", 
                     "denominazione_regione", 
                     "ricoverati_con_sintomi",
                     "terapia_intensiva",
                     "totale_ospedalizzati",
                     "isolamento_domiciliare",
                     "totale_attualmente_positivi",
                     "nuovi_attualmente_positivi",
                     "dimessi_guariti",
                     "deceduti",
                     "totale_casi",
                     "tamponi")

In [0]:
df_regioni_italia = df_italia.unionAll(df_regioni_trentino_alto_adige).unionAll(df_regioni_tranne_trentino_alto_adige).orderBy("data", "denominazione_regione")

In [14]:
df_regioni_italia.show()

+-------------------+---------------------+----------------------+-----------------+--------------------+----------------------+---------------------------+--------------------------+---------------+--------+-----------+-------+
|               data|denominazione_regione|ricoverati_con_sintomi|terapia_intensiva|totale_ospedalizzati|isolamento_domiciliare|totale_attualmente_positivi|nuovi_attualmente_positivi|dimessi_guariti|deceduti|totale_casi|tamponi|
+-------------------+---------------------+----------------------+-----------------+--------------------+----------------------+---------------------------+--------------------------+---------------+--------+-----------+-------+
|2020-02-24 18:00:00|              Abruzzo|                     0|                0|                   0|                     0|                          0|                         0|              0|       0|          0|      5|
|2020-02-24 18:00:00|           Basilicata|                     0|                0|

+--------------------+-----------+----------+-------+-------------+---------------+
|                Area|Popolazione|Superficie|Densita|Numero Comuni|Numero Province|
+--------------------+-----------+----------+-------+-------------+---------------+
|           Lombardia|   10060574|     23864|    422|         1506|             12|
|               Lazio|    5879082|     17232|    341|          378|              5|
|            Campania|    5801692|     13671|    424|          550|              5|
|             Sicilia|    4999891|     25832|    194|          390|              9|
|              Veneto|    4905854|     18345|    267|          563|              7|
|      Emilia Romagna|    4459477|     22453|    199|          328|              9|
|            Piemonte|    4356406|     25387|    172|         1181|              8|
|              Puglia|    4029053|     19541|    206|          257|              6|
|             Toscana|    3729641|     22987|    162|          273|         

In [63]:
import pandas as pd
df = df_global_confirmed.select("`Country/Region`", '3/4/20', '3/5/20', '3/6/20') \
  .where("`Country/Region` = 'Argentina'").toPandas()
df

Unnamed: 0,Country/Region,3/4/20,3/5/20,3/6/20
0,Argentina,1,1,2


In [0]:
df1 = pd.melt(df, id_vars=["Country/Region"], value_vars=['3/4/20', '3/5/20', '3/6/20'])

In [69]:
from pyspark import SparkContext, SQLContext
sql = SQLContext(sc)
sql.createDataFrame(df1).show()

+--------------+--------+-----+
|Country/Region|variable|value|
+--------------+--------+-----+
|     Argentina|  3/4/20|    1|
|     Argentina|  3/5/20|    1|
|     Argentina|  3/6/20|    2|
+--------------+--------+-----+

