## Exercise I

The input is a textual csv file containing the daily value of PM10 for a set of sensors, and in each line of the files has the following format:
```sensorId,date,PM10 value (μg/m3)\n```

Here is the example of data:
```
s1,2016-01-01,20.5
s2,2016-01-01,30.1
s1,2016-01-02,60.2
s2,2016-01-02,20.4
s1,2016-01-03,55.5
s2,2016-01-03,52.5
```

You're required to use pyspark to load the file, filter the values and use map/reduce code idea to give the output. The output is a line for each sensor on the standard output.
Each line contains a `sensorId` and the list of `dates` with a PM10 values greater than 50 for that sensor. The example output:
```
(s1, [2016-01-02, 2016-01-03])
(s2, [2016-01-03])
```



In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"
import findspark
findspark.init()

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.appName("PM10Analysis").getOrCreate()

In [5]:
from google.colab import files
files.upload()

Saving person.csv to person.csv
Saving sensor.csv to sensor.csv


{'person.csv': b'name,surname,age\r\nPaolo,Garza,42\r\nLuca,Boccia,41\r\nMaura,Bianchi,16\r\nAlice,Cochi,17\r\nLaura,Latini,28\r\nPaula,Zachini,19\r\nCarta,Cianci,29\r\nRita,Lisatini,31',
 'sensor.csv': b's1,2016-01-01,20.5\r\ns2,2016-01-01,30.1\r\ns1,2016-01-02,60.2\r\ns2,2016-01-02,20.4\r\ns1,2016-01-03,55.5\r\ns2,2016-01-03,52.5\r\ns1,2016-01-04,55.6\r\ns2,2016-01-04,49.7\r\ns3,2016-01-05,50.8\r\ns4,2016-01-05,53.9\r\ns3,2016-01-06,57.10\r\ns4,2016-01-06,54.11\r\ns1,2016-01-07,51.12\r\ns2,2016-01-07,53.13\r\ns3,2016-01-08,62.14\r\ns4,2016-01-08,42.15\r\ns2,2016-01-09,62.16\r\ns3,2016-01-09,55.17\r\ns4,2016-01-10,56.18'}

In [17]:
df1 = spark.read.csv('/content/sensor.csv', inferSchema=True, header=True)
df2 = spark.read.csv('/content/person.csv', inferSchema=True, header=True)

df1.show()
df2.show()

+---+----------+-----+
| s1|2016-01-01| 20.5|
+---+----------+-----+
| s2|2016-01-01| 30.1|
| s1|2016-01-02| 60.2|
| s2|2016-01-02| 20.4|
| s1|2016-01-03| 55.5|
| s2|2016-01-03| 52.5|
| s1|2016-01-04| 55.6|
| s2|2016-01-04| 49.7|
| s3|2016-01-05| 50.8|
| s4|2016-01-05| 53.9|
| s3|2016-01-06| 57.1|
| s4|2016-01-06|54.11|
| s1|2016-01-07|51.12|
| s2|2016-01-07|53.13|
| s3|2016-01-08|62.14|
| s4|2016-01-08|42.15|
| s2|2016-01-09|62.16|
| s3|2016-01-09|55.17|
| s4|2016-01-10|56.18|
+---+----------+-----+

+-----+--------+---+
| name| surname|age|
+-----+--------+---+
|Paolo|   Garza| 42|
| Luca|  Boccia| 41|
|Maura| Bianchi| 16|
|Alice|   Cochi| 17|
|Laura|  Latini| 28|
|Paula| Zachini| 19|
|Carta|  Cianci| 29|
| Rita|Lisatini| 31|
+-----+--------+---+



In [25]:
df1 = df1.toDF("sensorId", "date", "pm10_value")
df1.show()

+--------+----------+----------+
|sensorId|      date|pm10_value|
+--------+----------+----------+
|      s2|2016-01-01|      30.1|
|      s1|2016-01-02|      60.2|
|      s2|2016-01-02|      20.4|
|      s1|2016-01-03|      55.5|
|      s2|2016-01-03|      52.5|
|      s1|2016-01-04|      55.6|
|      s2|2016-01-04|      49.7|
|      s3|2016-01-05|      50.8|
|      s4|2016-01-05|      53.9|
|      s3|2016-01-06|      57.1|
|      s4|2016-01-06|     54.11|
|      s1|2016-01-07|     51.12|
|      s2|2016-01-07|     53.13|
|      s3|2016-01-08|     62.14|
|      s4|2016-01-08|     42.15|
|      s2|2016-01-09|     62.16|
|      s3|2016-01-09|     55.17|
|      s4|2016-01-10|     56.18|
+--------+----------+----------+



In [28]:
filtered_df1 = df1.filter(df1["pm10_value"] > 50)

## Group by sensorId and collect the dates with PM10 values > 50
result_df1 = (
    filtered_df1
    .groupBy("sensorId")
    .agg(F.collect_list("date").alias("dates_with_high_pm10"))
)

## Show the result for PM10 data
print("PM10 Data Result:")
result_df1.show(truncate=False)

## individuals with age > 20
filtered_df2 = df2.filter(df2["age"] > 20)

## Show the result for personal information
print("Filtered Personal Information Result:")
filtered_df2.show(truncate=False)

PM10 Data Result:
+--------+------------------------------------------------+
|sensorId|dates_with_high_pm10                            |
+--------+------------------------------------------------+
|s4      |[2016-01-05, 2016-01-06, 2016-01-10]            |
|s2      |[2016-01-03, 2016-01-07, 2016-01-09]            |
|s3      |[2016-01-05, 2016-01-06, 2016-01-08, 2016-01-09]|
|s1      |[2016-01-02, 2016-01-03, 2016-01-04, 2016-01-07]|
+--------+------------------------------------------------+

Filtered Personal Information Result:
+-----+--------+---+
|name |surname |age|
+-----+--------+---+
|Paolo|Garza   |42 |
|Luca |Boccia  |41 |
|Laura|Latini  |28 |
|Carta|Cianci  |29 |
|Rita |Lisatini|31 |
+-----+--------+---+



## Exercise II

Using the same data of the Exercise I, you're required to get the output: sensors ordered by the number of critical days. Each line of the output contains the number of days with a PM10 values greater than 50 for a sensor `s` and the `sensorId` of sensor `s`.

The example of the output:
```
2, s1
1, s2
```



In [29]:
critical_days_count = (
    filtered_df1
    .groupBy("sensorId")
    .agg(F.count("date").alias("critical_days_count")))
ordered_result = critical_days_count.orderBy(F.desc("critical_days_count"))


## Show the result
ordered_result.show(truncate=False)

+--------+-------------------+
|sensorId|critical_days_count|
+--------+-------------------+
|s1      |4                  |
|s3      |4                  |
|s4      |3                  |
|s2      |3                  |
+--------+-------------------+



## Exercise III

In this exercise, you're given an input: A CSV file containing a list of profiles

- Header: `name,age,gender`
- Each line of the file contains the information about one user

The example of input data
```
name,surname,age
Paolo,Garza,42
Luca,Boccia,41
Maura,Bianchi,16
```

You're required to use pyspark to load and analyze the data to achieve the output: A CSV file containing one line for each profile. The original age attribute is substituted with a new attributed called rangeage of type String.
```
rangeage = "[" + (age/10)*10 + "-" + (age/10)*10+9 + "]"
```





In [30]:
df2.show()

+-----+--------+---+
| name| surname|age|
+-----+--------+---+
|Paolo|   Garza| 42|
| Luca|  Boccia| 41|
|Maura| Bianchi| 16|
|Alice|   Cochi| 17|
|Laura|  Latini| 28|
|Paula| Zachini| 19|
|Carta|  Cianci| 29|
| Rita|Lisatini| 31|
+-----+--------+---+



In [32]:
df2 = df2.withColumn("rangeage", F.concat(
    F.lit("["),
    ((F.col("age") / 10) * 10).cast("int").cast("string"),
    F.lit("-"),
    (((F.col("age") / 10) * 10) + 9).cast("int").cast("string"),
    F.lit("]")
))

## Result for personal information with the new column
print("Personal Information Result with rangeage:")
df2.show(truncate=False)


Personal Information Result with rangeage:
+-----+--------+---+--------+
|name |surname |age|rangeage|
+-----+--------+---+--------+
|Paolo|Garza   |42 |[42-51] |
|Luca |Boccia  |41 |[41-50] |
|Maura|Bianchi |16 |[16-25] |
|Alice|Cochi   |17 |[17-26] |
|Laura|Latini  |28 |[28-37] |
|Paula|Zachini |19 |[19-28] |
|Carta|Cianci  |29 |[29-38] |
|Rita |Lisatini|31 |[31-40] |
+-----+--------+---+--------+

