# PDM Big Data course - final exam

## 0. Setup
The following code downloads and prepares the dataset, and puts it into the `data` folder. It is ready to execute. You will need it for the practical part (2.).

 Since it takes a couple of minutes to finish, run it now, and start answering the preliminary questions.

In [None]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
[K     |████████████████████████████████| 204.2MB 75kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 42.9MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612242 sha256=6a84babc4557908388986414c30b8fb600dab6db70116f04e9779c0d7fdb9921
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1


In [None]:
import pyspark
spark = pyspark.sql.SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [None]:
!wget https://github.com/CSSEGISandData/COVID-19/archive/master.zip

--2021-01-22 16:40:28--  https://github.com/CSSEGISandData/COVID-19/archive/master.zip
Resolving github.com (github.com)... 140.82.114.3
Connecting to github.com (github.com)|140.82.114.3|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://codeload.github.com/CSSEGISandData/COVID-19/zip/master [following]
--2021-01-22 16:40:28--  https://codeload.github.com/CSSEGISandData/COVID-19/zip/master
Resolving codeload.github.com (codeload.github.com)... 140.82.113.9
Connecting to codeload.github.com (codeload.github.com)|140.82.113.9|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 209500304 (200M) [application/zip]
Saving to: ‘master.zip’


2021-01-22 16:40:30 (108 MB/s) - ‘master.zip’ saved [209500304/209500304]



In [None]:
!unzip -o -q master.zip

In [None]:
!rm -r data
!mkdir data

In [None]:
import shutil
from pathlib import Path
import pandas as pd
from tqdm.notebook import tqdm

DATA_IN = Path("COVID-19-master/csse_covid_19_data/csse_covid_19_daily_reports")
DATA_OUT = Path("data")

for file_path in tqdm(list(DATA_IN.iterdir()), desc="Preprocessing"):
    if file_path.suffix != '.csv':
        shutil.copy(file_path, Path("data") / file_path.name)
        continue
    df = pd.read_csv(file_path)
    month, day, year = file_path.stem.split('.')[0].split('-')
    date = f'{year}-{month}-{day}'
    if date > '2021-01-20':
        continue
    df.rename(columns={'Lat': 'Lat_',
                       'Province/State': 'Province_State',
                       'Country/Region': 'Country_Region',
                       'Last Update': 'Last_Update'}, inplace=True)
    df = df.replace(',', '', regex=True).replace('"', '', regex=True)
    df = df[['Province_State', 'Country_Region', 'Last_Update',
             'Confirmed', 'Deaths', 'Recovered']].fillna(0)
   
    df['Date'] = date
    df.to_csv(Path("data") / file_path.name, index=False)

HBox(children=(FloatProgress(value=0.0, description='Preprocessing', max=368.0, style=ProgressStyle(descriptio…




## 1. Preliminary questions

---
**Note**

In this part, you are expected to write Spark code. Among all Spark methods, you may only use RDD methods (`filter`, `map`, `reduceByKey`, etc.).

However, if you come up with another Spark code, that uses any other method, but still produces the expected results, you will still get half of the points.

---

### 1.1. About _replication_ and _sharding_:
* What problem(s) does each of them try to solve?
* Give one drawback of each.

Sharding is useful when the data cannot fit on a single machine (it is split across multiple machines.

Replicated storage can mitigate data loss and improve availability (if one _replica_ dies, the other still have the data and can handle requests), and also reduce response time under high load (if one replica is busy, another one can take the query).

Main drawbacks of replication: slower writes (if strong consistency) or potentially inconsistent data across replicas (if eventual consistency) ; more storage space used.

Main drawbacks of sharding: inefficient joins (if the join key is different from the sharding key) ; the master node becomes a single point of failure.

### 1.2. Among the PySpark methods `filter`, `map`, `count` and `take`, which ones are _lazy_ and which ones are _actions_? Which ones transfer data onto the master, and which ones do not?

* `filter` and `map` and `reduceByKey` are lazy: they do not transfer any data onto the master (they do not do anything actually, until one executes an action on top of them). They all return and RDD.
* `count`, `take` and `reduce` are actions: they do transfer data onto the master. `count` returns an integer, `take` returns a Python list and `reduce` returns a Python object (depending on the aggregate function chosen).

### 1.3. Here are four separate Spark codes (where `sc` is a Spark context). For each of them, tell how many times the `numbers.txt` file will actually be read from the disk? Please explain each answer in a few words.
(a)
```
sc.textFile("numbers.txt")
```
(b)
```
sc.textFile("numbers.txt").cache().max()
sc.textFile("numbers.txt").cache().count()
```
(c)
```
rdd = sc.textFile("numbers.txt")
rdd.max()
rdd.count()
```
(d)

```
rdd = sc.textFile("numbers.txt")
rdd.map(lambda x: x.lower())
```

* (a) The file will be read once.
* (b) The file will be read twice, since we recreate a new RDD with the second `textFile` call. The following code would do effective caching, reading the data only once (the second `cache()` is useless):
```
rdd = sc.textFile("numbers.txt")
rdd.cache().max()
rdd.cache().count()
```
* (c) The file will be read twice (once for `max` and once for `count`), since the RDD is lazy: `textFile` does not actually store anything in memory.
* (d) The file will not be read at all, because `map` is lazy.

### 1.4 Here are three separate Spark codes (where `rdd` is an RDD of tuples of size 1000). Rank them from the slowest to the fastest to run, and explain in few words why.
(a)
```
rdd.reduceByKey(lambda x: (x[0], x[1])).take(10)
```
(b)
```
rdd.reduceByKey(lambda x: (x[0], x[1])).take(100)
```
(b)
```
rdd.reduceByKey(lambda x: (x[0], x[1])).collect()
```

The three codes take exactly the same time to run: because `reduceByKey` always needs to go over the whole RDD (even if at the end, we output only a couple of rows).

## 2. Practical example

### 2.1. Read all CSV files from `data` folder into a single Spark RDD, and count the total number of rows.

In [None]:
covid_rdd_full = sc.textFile("data/*.csv").map(lambda row: row.split(","))

In [None]:
header = covid_rdd_full.first()
covid_rdd = covid_rdd_full.filter(lambda row: row[0] != "Province_State").cache()

In [None]:
covid_rdd.count()

1150435

In [None]:
index_date = header.index('Date')
index_country = header.index('Country_Region')
index_deaths = header.index('Deaths')
index_cases = header.index('Confirmed')

### 2.2. Find the last (most recent) date in the data set.


In [None]:
covid_rdd.map(lambda row: row[index_date]).reduce(lambda a, b: max(a, b))

'2021-01-20'

### 2.3. Compute the number of deaths per day worldwide.

In [None]:
covid_rdd.map(lambda row: (row[index_date], float(row[index_deaths]))).reduceByKey(lambda a, b: a + b).collect()

### 2.4. Compute the mortality rate per country (over the whole time period).
The mortality rate is defined as the number of deaths divided by the number of confirmed cases.

NB: The expected answer only reads the CSV files once. But if you come up with a two-pass solution, you will still get half of the points.

In [None]:
covid_rdd.map(
    lambda row: (row[index_country], (float(row[index_cases]), float(row[index_deaths])))
).reduceByKey(
    lambda a, b: (a[0] + b[0], a[1] + b[1])
).map(
    lambda t: (t[0], t[1][1] / t[1][0])
).collect()

### 3. (bonus points) Same as questions 2.1, 2.2, 2.3 and 2.3 but using DataFrames instead of RDD.

### 3.1. Load the CSV files into a DataFrame and count the number of rows.

In [None]:
df = spark.read.csv("data/*.csv", header=True).cache()

In [None]:
df.count()

1150435

### 3.2. Most recent date.

In [None]:
df.agg({"Date": "max"}).show()

+----------+
| max(Date)|
+----------+
|2021-01-20|
+----------+



### 3.3. Daily number of deaths.

In [None]:
df = df.withColumn("Deaths", df["Deaths"].cast("float"))

In [None]:
df.groupby('Date').sum('Deaths').toPandas()

Unnamed: 0,Date,sum(Deaths)
0,2020-04-13,119568.0
1,2020-02-26,2770.0
2,2020-06-24,482845.0
3,2020-11-12,1295182.0
4,2020-09-12,920995.0
...,...,...
360,2020-04-05,69486.0
361,2020-12-28,1774686.0
362,2020-10-25,1154956.0
363,2020-05-01,238691.0


### 3.4. Mortality rate per country.

In [None]:
df = df.withColumn("Confirmed", df["Confirmed"].cast("float"))

In [None]:
result = df.groupby("Country_Region").agg({"Confirmed": "sum", "Deaths": "sum"})

In [None]:
result = result.withColumn('MortalityRate', result['sum(Deaths)'] / result['sum(Confirmed)'])

In [None]:
result.toPandas()

Unnamed: 0,Country_Region,sum(Deaths),sum(Confirmed),MortalityRate
0,Chad,21794.0,329300.0,0.066183
1,Paraguay,218031.0,10513560.0,0.020738
2,Russia,6179623.0,361663086.0,0.017087
3,Bahamas The,0.0,10.0,0.000000
4,North Ireland,0.0,1.0,0.000000
...,...,...,...,...
236,Vietnam,5608.0,249630.0,0.022465
237,Mainland China,77471.0,2473987.0,0.031314
238,Diamond Princess,3876.0,215024.0,0.018026
239,Mali,37189.0,891775.0,0.041702
