## Missing packages installation

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz && rm spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

# To see the commands outputs for debugging purpose:
#   - Remove -qq > /dev/null from apt-get
#   - Remove -q from wget
#   - Remove -q from pip

## Environment Configuration

### Environment Variables

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

## Spark Configuration

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

## Playground

In [0]:
!mkdir .p Datasets/
!wget -O  Datasets/dpc-covid19-ita-regioni.csv https://raw.githubusercontent.com/pcm-dpc/COVID-19/master/dati-regioni/dpc-covid19-ita-regioni.csv

In [0]:
df_regioni_github = spark.read.csv('Datasets/dpc-covid19-ita-regioni.csv', inferSchema=True, header =True)

In [0]:
from pyspark.sql.functions import *

df_italia = df_regioni_github.groupBy("data").agg(
      sum("ricoverati_con_sintomi").alias("ricoverati_con_sintomi"),
      sum("terapia_intensiva").alias("terapia_intensiva"),
      sum("totale_ospedalizzati").alias("totale_ospedalizzati"),
      sum("isolamento_domiciliare").alias("isolamento_domiciliare"),
      sum("totale_attualmente_positivi").alias("totale_attualmente_positivi"),
      sum("nuovi_attualmente_positivi").alias("nuovi_attualmente_positivi"),
      sum("dimessi_guariti").alias("dimessi_guariti"),
      sum("deceduti").alias("deceduti"),
      sum("totale_casi").alias("totale_casi"),
      sum("tamponi").alias("tamponi")).withColumn("denominazione_regione", lit("Italia"))

df_italia = df_italia.select("data", 
                     "denominazione_regione", 
                     "ricoverati_con_sintomi",
                     "terapia_intensiva",
                     "totale_ospedalizzati",
                     "isolamento_domiciliare",
                     "totale_attualmente_positivi",
                     "nuovi_attualmente_positivi",
                     "dimessi_guariti",
                     "deceduti",
                     "totale_casi",
                     "tamponi").orderBy("data")

In [0]:
df_regioni_tranne_trentino_alto_adige = df_regioni_github.select("data", 
                     "denominazione_regione", 
                     "ricoverati_con_sintomi",
                     "terapia_intensiva",
                     "totale_ospedalizzati",
                     "isolamento_domiciliare",
                     "totale_attualmente_positivi",
                     "nuovi_attualmente_positivi",
                     "dimessi_guariti",
                     "deceduti",
                     "totale_casi",
                     "tamponi").where("denominazione_regione NOT IN ('P.A. Bolzano', 'P.A. Trento')")                    

In [0]:
df_regioni_trentino_alto_adige = df_regioni_github.where("denominazione_regione IN ('P.A. Bolzanoni', 'P.A. Trento')")
df_regioni_trentino_alto_adige = df_regioni_trentino_alto_adige.groupBy("data").agg(
      sum("ricoverati_con_sintomi").alias("ricoverati_con_sintomi"),
      sum("terapia_intensiva").alias("terapia_intensiva"),
      sum("totale_ospedalizzati").alias("totale_ospedalizzati"),
      sum("isolamento_domiciliare").alias("isolamento_domiciliare"),
      sum("totale_attualmente_positivi").alias("totale_attualmente_positivi"),
      sum("nuovi_attualmente_positivi").alias("nuovi_attualmente_positivi"),
      sum("dimessi_guariti").alias("dimessi_guariti"),
      sum("deceduti").alias("deceduti"),
      sum("totale_casi").alias("totale_casi"),
      sum("tamponi").alias("tamponi")).withColumn("denominazione_regione", lit("Trentino-Alto Adige"))

df_regioni_trentino_alto_adige = df_regioni_trentino_alto_adige.select("data", 
                     "denominazione_regione", 
                     "ricoverati_con_sintomi",
                     "terapia_intensiva",
                     "totale_ospedalizzati",
                     "isolamento_domiciliare",
                     "totale_attualmente_positivi",
                     "nuovi_attualmente_positivi",
                     "dimessi_guariti",
                     "deceduti",
                     "totale_casi",
                     "tamponi")

In [0]:
df_regioni_italia = df_italia.unionAll(df_regioni_trentino_alto_adige).unionAll(df_regioni_tranne_trentino_alto_adige).orderBy("data", "denominazione_regione")

In [0]:
df_regioni_italia.show()