## Exercise I

The input is a textual csv file containing the daily value of PM10 for a set of sensors, and in each line of the files has the following format:
```sensorId,date,PM10 value (μg/m3)\n```

Here is the example of data:
```
s1,2016-01-01,20.5
s2,2016-01-01,30.1
s1,2016-01-02,60.2
s2,2016-01-02,20.4
s1,2016-01-03,55.5
s2,2016-01-03,52.5
```

You're required to use pyspark to load the file, filter the values and use map/reduce code idea to give the output. The output is a line for each sensor on the standard output.
Each line contains a `sensorId` and the list of `dates` with a PM10 values greater than 50 for that sensor. The example output:
```
(s1, [2016-01-02, 2016-01-03])
(s2, [2016-01-03])
```



In [87]:
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/jdk-11.0.26-oracle-x64"

In [88]:
from pyspark.sql.session import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

In [90]:
from pyspark.sql.types import *

labels = [
    ("sensorId", StringType()),
    ("date", DateType()),
    ("PM10 value (μg/m3)", FloatType()),
]

schema = StructType([StructField(x[0], x[1]) for x in labels])
df = spark.read.csv("sensor.csv", sep=",", schema=schema)

In [91]:
from pyspark.sql.functions import col


def map_pm10(data_frame):
    df_filter = data_frame.filter(col("PM10 value (μg/m3)") > 50)
    return df_filter.rdd.map(lambda row: (row["sensorId"], row["date"].isoformat()))


def reduce_pm10(rdd_mapped):
    return rdd_mapped.groupByKey().mapValues(list)


print(reduce_pm10(map_pm10(df)).collect())

[('s1', ['2016-01-02', '2016-01-03', '2016-01-04', '2016-01-07']), ('s2', ['2016-01-03', '2016-01-07', '2016-01-09']), ('s3', ['2016-01-05', '2016-01-06', '2016-01-08', '2016-01-09']), ('s4', ['2016-01-05', '2016-01-06', '2016-01-10'])]


## Exercise II

Using the same data of the Exercise I, you're required to get the output: sensors ordered by the number of critical days. Each line of the output contains the number of days with a PM10 values greater than 50 for a sensor `s` and the `sensorId` of sensor `s`.

The example of the output:
```
2, s1
1, s2
```



In [94]:
def map_pm10_ordered(data_frame):
    df_filter = data_frame.filter(col("PM10 value (μg/m3)") > 50)
    return df_filter.rdd.map(lambda row: (row["sensorId"], row["date"].isoformat()))


def reduce_pm10_ex2(rdd_mapped):
    groupped = rdd_mapped.groupByKey().mapValues(list).collect()
    return [
        (len(row[1]), row[0])
        for row in sorted(groupped, key=lambda x: len(x[1]), reverse=True)
    ]


result = reduce_pm10_ex2(map_pm10_ordered(df))
for value, id in result:
    print(f"{value}, {id}")

4, s1
4, s3
3, s2
3, s4


## Exercise III

In this exercise, you're given an input: A CSV file containing a list of profiles

- Header: `name,age,gender`
- Each line of the file contains the information about one user

The example of input data
```
name,surname,age
Paolo,Garza,42
Luca,Boccia,41
Maura,Bianchi,16
```

You're required to use pyspark to load and analyze the data to achieve the output: A CSV file containing one line for each profile. The original age attribute is substituted with a new attributed called rangeage of type String.
```
rangeage = "[" + (age/10)*10 + "-" + (age/10)*10+9 + "]"
```

In [95]:
from pyspark.sql.functions import concat, lit, floor

labels = [("name", StringType()), ("surname", StringType()), ("age", IntegerType())]
schema = StructType([StructField(x[0], x[1]) for x in labels])
df = spark.read.csv("person.csv", header=True, schema=schema)

df_output = df.withColumn(
    "rangeage",
    concat(
        lit("["),
        floor(col("age") / 10) * 10,
        lit("-"),
        floor(col("age") / 10) * 10 + 9,
        lit("]"),
    ),
)
df_output = df_output.select("name", "surname", "rangeage")
df_output.show()
df_output.write.csv("output.csv", header=True)

+-----+--------+--------+
| name| surname|rangeage|
+-----+--------+--------+
|Paolo|   Garza| [40-49]|
| Luca|  Boccia| [40-49]|
|Maura| Bianchi| [10-19]|
|Alice|   Cochi| [10-19]|
|Laura|  Latini| [20-29]|
|Paula| Zachini| [10-19]|
|Carta|  Cianci| [20-29]|
| Rita|Lisatini| [30-39]|
+-----+--------+--------+

