### Question 2 - Top 3 Cities

  For each country, compute the top 3 cities with best air quality and the top 3 cities with poorest air quality, updated weekly, i.e., averaged over a week (7 days).

 **Requirement**: Solve this question using Spark Core, Spark Dataframes and Spark SQL.

In [1]:
#@title Mount Google Drive
from google.colab import drive
drive.mount('/content/drive') # Faz o mount da drive

Mounted at /content/drive


In [2]:
#@title Install Pyspark
!pip install --quiet pyspark # Faz a instalação do Pyspark

[K     |████████████████████████████████| 281.4 MB 42 kB/s 
[K     |████████████████████████████████| 199 kB 52.6 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [4]:
#@title Dataset
!head -10000 /content/drive/MyDrive/projeto_spbd/sds011-2020-01-0*.csv > files
!head -10000 /content/drive/MyDrive/projeto_spbd/sds011-2020-01-1*.csv >> files
!head -10000 /content/drive/MyDrive/projeto_spbd/sds011-2020-01-2*.csv >> files
!head -10000 /content/drive/MyDrive/projeto_spbd/sds011-2020-01-3*.csv >> files
!head -10000 /content/drive/MyDrive/projeto_spbd/sds011-2020-02-0*.csv >> files
!head -10000 /content/drive/MyDrive/projeto_spbd/sds011-2020-02-1*.csv >> files
!head -10000 /content/drive/MyDrive/projeto_spbd/sds011-2020-02-2*.csv >> files
!head -10000 /content/drive/MyDrive/projeto_spbd/sds011-2020-03-0*.csv >> files
!head -10000 /content/drive/MyDrive/projeto_spbd/sds011-2020-03-1*.csv >> files
!head -10000 /content/drive/MyDrive/projeto_spbd/sds011-2020-03-2*.csv >> files
!head -10000 /content/drive/MyDrive/projeto_spbd/sds011-2020-03-3*.csv >> files

In [8]:
#@title Resolution using Spark DataFrame
import math
import pyspark
import json
import string
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from datetime import datetime
from operator import *
from pyspark import pandas

spark = SparkSession.builder.master('local[*]').appName('/content/files').getOrCreate() # É iniciada a SparkSession, em local mode
sc = spark.sparkContext

try :
    custom_schema = StructType([StructField("sen_id", StringType(), True),StructField("file_id", StringType(), True),StructField("lat", FloatType(), True),StructField("lon", FloatType(), True),StructField("timestamp", TimestampType(), True),StructField("p1", FloatType(), True),StructField("p2", FloatType(), True),])
    # É criado um schema para o DataFrame
    main_data = spark.read.schema(custom_schema).load('/content/files', sep=';', header=False, format="csv") # O dataset principal é atribuído ao schema criado
    aux_data=spark.read.json('/content/drive/MyDrive/projeto_spbd/sensors_all.json') # Carregamento do dataset secundário (JSON)
    df_joint=main_data.join(aux_data, main_data.sen_id == aux_data.sensor_id,"inner") \
                      .withColumn('week', weekofyear(main_data.timestamp)) \
                      .withColumn('sum_p1_p2', col("p1") + col("p2")) \
                      .select(col("sen_id"), col("timestamp"), col("sum_p1_p2"), col("country"), col("city"), col("week")) \
                      .groupBy(col("country"), col("city"), col("week")) \
                      .agg(avg(col("sum_p1_p2")).alias("metric"))
    # df_joint corresponde ao INNER JOINT entre os dois DataFrames, pelo sensor_id;
    # É criada a coluna week, que corresponde à semana do ano, e a coluna sum_p1_p2, que corresponde à soma dos valores de p1 e p2;
    # É feito o Select para as colunas sen_id, timestamp, sump_p1_p2, country, city e week;
    # É feito o groupBy pelo country, city e week;
    # É feito o AGG, onde é feita a média dos valores sum_p1_p2, e onde lhe é atribuída um nome, metric.
    partition1 = Window.partitionBy("country", "week").orderBy(col('metric').desc()) # É feita a Window PARTITION BY por country e week, por ordem decrescente dos valores de metric
    partition2 = Window.partitionBy("country", "week").orderBy(col('metric')) # É feita a Window PARTITION BY por country e week, por ordem crescente dos valores de metric
    print("---------- TOP 3 CITIES WITH POOREST AIR QUALITY ----------")
    df_joint.withColumn("Rank",rank().over(partition1)).filter(col('rank') <= 3).show()
    # É criada uma coluna com um ranking de valores de metric, tendo em conta o country e week. É feito um filtro dos 3 primeiros valores, neste caso são os três maiores valores para cada country e week.
    print("---------- TOP 3 CITIES WITH BEST AIR QUALITY ----------")
    df_joint.withColumn("Rank",rank().over(partition2)).filter(col('rank') <= 3).show()
    # É criada uma coluna com um ranking de valores de metric, tendo em conta o country e week. É feito um filtro dos 3 primeiros valores, neste caso são os três menores valores para cada country e week.
                    
except Exception as err:
    print(err)
    sc.stop()

---------- TOP 3 CITIES WITH POOREST AIR QUALITY ----------
+-------+-----------------+----+------------------+----+
|country|             city|week|            metric|Rank|
+-------+-----------------+----+------------------+----+
| France|           Luitré|   1|2097.0089213053384|   1|
| France|        Gallardon|   1| 886.9899520874023|   2|
| France|          Tullins|   1|340.68750071525574|   3|
| France|           Luitré|   2| 2999.800048828125|   1|
| France|        Fonsorbes|   2|160.16167163848877|   2|
| France|          Crusnes|   2| 144.1007126399449|   3|
| France|           Luitré|   3| 2999.800048828125|   1|
| France|         Péchabou|   3|238.21727440573952|   2|
| France|           Anglet|   3|103.99766685962678|   3|
| France|           Luitré|   4|  759.272006225586|   1|
| France|Serémange-Erzange|   4|276.13205500210034|   2|
| France|            Passy|   4|180.86461580716647|   3|
| France|        Tinténiac|   5| 339.0488944186105|   1|
| France|    Bois-Colombes| 