In [5]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## Exercise I

The input is a textual csv file containing the daily value of PM10 for a set of sensors, and in each line of the files has the following format:
```sensorId,date,PM10 value (μg/m3)\n```

Here is the example of data:
```
s1,2016-01-01,20.5
s2,2016-01-01,30.1
s1,2016-01-02,60.2
s2,2016-01-02,20.4
s1,2016-01-03,55.5
s2,2016-01-03,52.5
```

You're required to use pyspark to load the file, filter the values and use map/reduce code idea to give the output. The output is a line for each sensor on the standard output.
Each line contains a `sensorId` and the list of `dates` with a PM10 values greater than 50 for that sensor. The example output:
```
(s1, [2016-01-02, 2016-01-03])
(s2, [2016-01-03])
```



In [2]:
!pip install pyspark



In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, col

# Create a SparkSession
spark = SparkSession.builder.appName("PM10Analysis").getOrCreate()

In [21]:
# Assuming your file is in 'My Drive' in Google Drive:
file_path = "/content/drive/My Drive/sensor.csv"  # Update with the correct path

sensor_rdd = spark.sparkContext.textFile(file_path)
#sensor_rdd.collect()

In [23]:
sensor_rdd_larger_than_50 = sensor_rdd.map(lambda line: line.split(",")) \
                       .filter(lambda x: float(x[2]) > 50) \
                       .map(lambda x: (x[0], [x[1]]))
for item in sensor_rdd_larger_than_50.collect():
  print(item)

('s1', ['2016-01-02'])
('s1', ['2016-01-03'])
('s2', ['2016-01-03'])
('s1', ['2016-01-04'])
('s3', ['2016-01-05'])
('s4', ['2016-01-05'])
('s3', ['2016-01-06'])
('s4', ['2016-01-06'])
('s1', ['2016-01-07'])
('s2', ['2016-01-07'])
('s3', ['2016-01-08'])
('s2', ['2016-01-09'])
('s3', ['2016-01-09'])
('s4', ['2016-01-10'])


## Exercise II

Using the same data of the Exercise I, you're required to get the output: sensors ordered by the number of critical days. Each line of the output contains the number of days with a PM10 values greater than 50 for a sensor `s` and the `sensorId` of sensor `s`.

The example of the output:
```
2, s1
1, s2
```



In [25]:
sensor_rdd.collect()

['s1,2016-01-01,20.5',
 's2,2016-01-01,30.1',
 's1,2016-01-02,60.2',
 's2,2016-01-02,20.4',
 's1,2016-01-03,55.5',
 's2,2016-01-03,52.5',
 's1,2016-01-04,55.6',
 's2,2016-01-04,49.7',
 's3,2016-01-05,50.8',
 's4,2016-01-05,53.9',
 's3,2016-01-06,57.10',
 's4,2016-01-06,54.11',
 's1,2016-01-07,51.12',
 's2,2016-01-07,53.13',
 's3,2016-01-08,62.14',
 's4,2016-01-08,42.15',
 's2,2016-01-09,62.16',
 's3,2016-01-09,55.17',
 's4,2016-01-10,56.18']

In [28]:
for item in sensor_rdd.map(lambda line: line.split(",")) \
                       .filter(lambda x: float(x[2]) > 50) \
                       .map(lambda x: (x[0], 1)) \
                       .reduceByKey(lambda x, y: x + y) \
                       .sortBy(lambda x: x[1], ascending=False) \
                       .collect():
      print(item)

('s1', 4)
('s3', 4)
('s2', 3)
('s4', 3)


## Exercise III

In this exercise, you're given an input: A CSV file containing a list of profiles

- Header: `name,age,gender`
- Each line of the file contains the information about one user

The example of input data
```
name,surname,age
Paolo,Garza,42
Luca,Boccia,41
Maura,Bianchi,16
```

You're required to use pyspark to load and analyze the data to achieve the output: A CSV file containing one line for each profile. The original age attribute is substituted with a new attributed called rangeage of type String.
```
rangeage = "[" + (age/10)*10 + "-" + (age/10)*10+9 + "]"
```





In [30]:
from pyspark.sql import SparkSession

spark_df = SparkSession.builder.appName("MySparkApp").getOrCreate()

In [34]:
person_df = spark_df.read.csv("/content/drive/My Drive/person.csv", header=True, inferSchema=True)
person_df.show()

+-----+--------+---+
| name| surname|age|
+-----+--------+---+
|Paolo|   Garza| 42|
| Luca|  Boccia| 41|
|Maura| Bianchi| 16|
|Alice|   Cochi| 17|
|Laura|  Latini| 28|
|Paula| Zachini| 19|
|Carta|  Cianci| 29|
| Rita|Lisatini| 31|
+-----+--------+---+



In [45]:
from pyspark.sql.functions import concat, lit, floor, col

person_df.withColumn(
    "rangeage",
    concat(
        lit("["),
        floor(col("age") / 10) * 10,
        lit("-"),
        floor(col("age") / 10) * 10 + 9,
        lit("]")
    )
).show()


+-----+--------+---+--------+
| name| surname|age|rangeage|
+-----+--------+---+--------+
|Paolo|   Garza| 42| [40-49]|
| Luca|  Boccia| 41| [40-49]|
|Maura| Bianchi| 16| [10-19]|
|Alice|   Cochi| 17| [10-19]|
|Laura|  Latini| 28| [20-29]|
|Paula| Zachini| 19| [10-19]|
|Carta|  Cianci| 29| [20-29]|
| Rita|Lisatini| 31| [30-39]|
+-----+--------+---+--------+

