# Transformation du dataset No2 Paris par station de mesure

![](p_paris.png)

# I - Transformer les données sans faire le melt

   ## ******* 1.1 - charger les package nécessaires et créer une session Spark

In [None]:
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark import SparkContext, SparkConf

spark = (SparkSession
             .builder
             .appName(" Analyse de la pollution NO2 Paris")
             .enableHiveSupport()
             .getOrCreate()
            )

## ******* 1.2 - Charger le dataset No2

    ### Créer un schéma (en cas de besoin)

no2_schema = StructType([StructField("date"   , DateType(),True), 
                          StructField("heure" , IntegerType(),True),
                          StructField("EVRY"  , IntegerType(),True),
                          StructField("PA01H" , IntegerType(),True),
                          StructField("LOGNES", IntegerType(),True),
                          StructField("VILLEM", IntegerType(),True), 
                          StructField("GEN"   , IntegerType(),True), 
                          StructField("MELUN" , IntegerType(),True),
                          StructField("ARG"   , IntegerType(),True),
                          StructField("GON"   , IntegerType(),True),
                          StructField("DEF"   , IntegerType(),True),
                          StructField("A1"    , IntegerType(),True),
                          StructField("PA13"  , IntegerType(),True),
                          StructField("VERS"  , IntegerType(),True),
                          StructField("PA15L" , IntegerType(),True),
                          StructField("RUR-SE", IntegerType(),True),
                          StructField("PA18"  , IntegerType(),True),
                          StructField("CHAMP" , IntegerType(),True),
                          StructField("STDEN" , IntegerType(),True),
                          StructField("RN6"   , IntegerType(),True),
                          StructField("PA04C" , IntegerType(),True),
                          StructField("BOB"   , IntegerType(),True),
                          StructField("MANT"  , IntegerType(),True),
                          StructField("OPERA" , IntegerType(),True),
                          StructField("RN2"   , IntegerType(),True),
                          StructField("HAUS"  , IntegerType(),True),
                          StructField("BONAP" , IntegerType(),True),
                          StructField("AUT"   , IntegerType(),True),
                          StructField("VITRY" , IntegerType(),True),
                          StructField("RUR-SO", IntegerType(),True), 
                          StructField("RN20"  , IntegerType(),True),
                          StructField("BASCH" , IntegerType(),True),
                          StructField("PA12"  , IntegerType(),True),
                          StructField("NEUIL" , IntegerType(),True),
                          StructField("MONTG" , IntegerType(),True),
                          StructField("SOULT" , IntegerType(),True),
                          StructField("CELES" , IntegerType(),True),
                          StructField("ELYS"  , IntegerType(),True),
                          StructField("EIFF3" , IntegerType(),True),
                          StructField("TREMB" , IntegerType(),True),
                          StructField("AUB"   , IntegerType(),True),
                          StructField("IVRY"  , IntegerType(),True),
                          StructField("BP_EST", IntegerType(),True),
                          StructField("PA07"  , IntegerType(),True)])           

In [None]:
no2_DF = spark.read.csv("/Data/20180101_20201212-NO2_auto.csv", sep=",", header=True)               
no2_DF.show(5)

In [None]:
no2_DF.printSchema()

# ******* 1.3 - Supprimer les colonnes des stations qui sont en dehors de paris

In [None]:
# En se basant sur les adresses des capteurs, récupérés par scrapping sur le site airparif.fr
no2_DF_Paris = no2_DF.drop("GON", "A1" , "MANT" , "MELUN", "TREMB", "VERS", "RUR-SE", "RUR-SO", "RN2"  , "RN20" , "RN6"   , "ARG"  , "PA04C"
                           "AUB", "BOB", "CHAMP", "DEF"  , "EVRY" , "GEN" , "LOGNES", "MONTG" , "NEUIL", "STDEN", "VILLEM", "VITRY", "IVRY")
 
no2_DF_Paris.show()

In [None]:
# Convertir le type des colonnes en float :
# ceci est nécéssaire pour faire le groupby (pour le calcul de la moyenne)

no2_DF_Paris = no2_DF_Paris.withColumn("date"  , col("date"  ).cast(StringType())) \
                           .withColumn("hour"  , col("hour"  ).cast("Float"))      \
                           .withColumn("PA01H" , col("PA01H" ).cast("Float"))      \
                           .withColumn("PA13"  , col("PA13"  ).cast("Float"))      \
                           .withColumn("PA15L" , col("PA15L" ).cast("Float"))      \
                           .withColumn("PA18"  , col("PA18"  ).cast("Float"))      \
                           .withColumn("PA04C" , col("PA04C" ).cast("Float"))      \
                           .withColumn("OPERA" , col("OPERA" ).cast("Float"))      \
                           .withColumn("HAUS"  , col("HAUS"  ).cast("Float"))      \
                           .withColumn("BONAP" , col("BONAP" ).cast("Float"))      \
                           .withColumn("AUT"   , col("AUT"   ).cast("Float"))      \
                           .withColumn("BASCH" , col("BASCH" ).cast("Float"))      \
                           .withColumn("PA12"  , col("PA12"  ).cast("Float"))      \
                           .withColumn("SOULT" , col("SOULT" ).cast("Float"))      \
                           .withColumn("CELES" , col("CELES" ).cast("Float"))      \
                           .withColumn("ELYS"  , col("ELYS"  ).cast("Float"))      \
                           .withColumn("EIFF3" , col("EIFF3" ).cast("Float"))      \
                           .withColumn("BP_EST", col("BP_EST").cast("Float"))      \
                           .withColumn("PA07"  , col("PA07"  ).cast("Float"))


In [None]:
# Afficher le schéma du dataframe
no2_DF_Paris.printSchema()

In [None]:
no2_DF_Paris.show(10)

In [None]:
# Convertir la colonne date au format datetime

from datetime import datetime
import pyspark.sql.types as T
import pyspark.sql.functions as F


def user_defined_timestamp(date_col):
    _date = datetime.strptime(date_col, '%d/%m/%Y')
    return _date.strftime('%Y-%m-%d')

user_defined_timestamp_udf = F.udf(user_defined_timestamp, T.StringType())

In [None]:
no2_DF_Paris = no2_DF_Paris.withColumn('date', user_defined_timestamp_udf('date'))
no2_DF_Paris.show()

In [None]:
# Ajouter le jour de la semaine
no2_DF_Paris = no2_DF_Paris.withColumn('week_day', date_format('date', 'EEEE'))
no2_DF_Paris.show()

### fonction pour avoir week_day

def get_weekday(date):
    import datetime
    import calendar
    day, month, year = (int(x) for x in date.split('-'))    
    weekday = datetime.date(year, month, day)
    return calendar.day_name[weekday.weekday()]

spark.udf.register('get_weekday', get_weekday)

In [None]:
## Changer l'ordre des colonnes

no2_DF_Paris = no2_DF_Paris[["date", "hour", "week_day", "PA01H", "PA13", "PA15L", "PA18", "PA04C", "OPERA", "HAUS", "BONAP", "AUT", "BASCH", "PA12", "SOULT", "CELES", "ELYS", "EIFF3", "BP_EST", "PA07"]]
no2_DF_Paris.show()

In [None]:
# modifier le format de l'heure

no2_DF_Paris_t = no2_DF_Paris.withColumn('hour', regexp_replace('hour', '1.0' , '01:00'))
no2_DF_Paris_t = no2_DF_Paris_t.withColumn('hour', regexp_replace('hour', '2.0' , '02:00'))
no2_DF_Paris_t = no2_DF_Paris_t.withColumn('hour', regexp_replace('hour', '3.0' , '03:00'))
no2_DF_Paris_t = no2_DF_Paris_t.withColumn('hour', regexp_replace('hour', '4.0' , '04:00'))
no2_DF_Paris_t = no2_DF_Paris_t.withColumn('hour', regexp_replace('hour', '5.0' , '05:00'))
no2_DF_Paris_t = no2_DF_Paris_t.withColumn('hour', regexp_replace('hour', '6.0' , '06:00'))
no2_DF_Paris_t = no2_DF_Paris_t.withColumn('hour', regexp_replace('hour', '7.0' , '07:00'))
no2_DF_Paris_t = no2_DF_Paris_t.withColumn('hour', regexp_replace('hour', '8.0' , '08:00'))
no2_DF_Paris_t = no2_DF_Paris_t.withColumn('hour', regexp_replace('hour', '9.0' , '09:00'))
no2_DF_Paris_t = no2_DF_Paris_t.withColumn('hour', regexp_replace('hour', '10.0', '10:00'))
no2_DF_Paris_t = no2_DF_Paris_t.withColumn('hour', regexp_replace('hour', '11.0', '11:00'))
no2_DF_Paris_t = no2_DF_Paris_t.withColumn('hour', regexp_replace('hour', '12.0', '12:00'))
no2_DF_Paris_t = no2_DF_Paris_t.withColumn('hour', regexp_replace('hour', '13.0', '13:00'))
no2_DF_Paris_t = no2_DF_Paris_t.withColumn('hour', regexp_replace('hour', '14.0', '14:00'))
no2_DF_Paris_t = no2_DF_Paris_t.withColumn('hour', regexp_replace('hour', '15.0', '15:00'))
no2_DF_Paris_t = no2_DF_Paris_t.withColumn('hour', regexp_replace('hour', '16.0', '16:00'))
no2_DF_Paris_t = no2_DF_Paris_t.withColumn('hour', regexp_replace('hour', '17.0', '17:00'))
no2_DF_Paris_t = no2_DF_Paris_t.withColumn('hour', regexp_replace('hour', '18.0', '18:00'))
no2_DF_Paris_t = no2_DF_Paris_t.withColumn('hour', regexp_replace('hour', '19.0', '19:00'))
no2_DF_Paris_t = no2_DF_Paris_t.withColumn('hour', regexp_replace('hour', '20.0', '20:00'))
no2_DF_Paris_t = no2_DF_Paris_t.withColumn('hour', regexp_replace('hour', '21.0', '21:00'))
no2_DF_Paris_t = no2_DF_Paris_t.withColumn('hour', regexp_replace('hour', '22.0', '22:00'))
no2_DF_Paris_t = no2_DF_Paris_t.withColumn('hour', regexp_replace('hour', '23.0', '23:00'))
no2_DF_Paris_t = no2_DF_Paris_t.withColumn('hour', regexp_replace('hour', '24.0', '00:00'))
no2_DF_Paris_t = no2_DF_Paris_t.withColumn('hour', regexp_replace('hour', '101:00', '11:00'))
no2_DF_Paris_t = no2_DF_Paris_t.withColumn('hour', regexp_replace('hour', '102:00', '12:00'))
no2_DF_Paris_t = no2_DF_Paris_t.withColumn('hour', regexp_replace('hour', '103:00', '13:00'))
no2_DF_Paris_t = no2_DF_Paris_t.withColumn('hour', regexp_replace('hour', '104:00', '14:00'))
no2_DF_Paris_t = no2_DF_Paris_t.withColumn('hour', regexp_replace('hour', '105:00', '15:00'))
no2_DF_Paris_t = no2_DF_Paris_t.withColumn('hour', regexp_replace('hour', '106:00', '16:00'))
no2_DF_Paris_t = no2_DF_Paris_t.withColumn('hour', regexp_replace('hour', '107:00', '17:00'))
no2_DF_Paris_t = no2_DF_Paris_t.withColumn('hour', regexp_replace('hour', '108:00', '18:00'))
no2_DF_Paris_t = no2_DF_Paris_t.withColumn('hour', regexp_replace('hour', '109:00', '19:00'))
no2_DF_Paris_t = no2_DF_Paris_t.withColumn('hour', regexp_replace('hour', '201:00', '21:00'))
no2_DF_Paris_t = no2_DF_Paris_t.withColumn('hour', regexp_replace('hour', '202:00', '22:00'))
no2_DF_Paris_t = no2_DF_Paris_t.withColumn('hour', regexp_replace('hour', '203:00', '23:00'))
no2_DF_Paris_t = no2_DF_Paris_t.withColumn('hour', regexp_replace('hour', '204:00', '00:00'))
no2_DF_Paris_t.show(25)
 

In [None]:
aff = no2_DF_Paris_t.drop("PA04C", "PA07")
aff.show()

In [None]:
no2_DF_Paris_t.printSchema()

In [None]:
no2_DF_Paris_t.select([count(when(col(c).isNull(), c)).alias(c) for c in no2_DF_Paris.columns]).show()

In [None]:
no2_DF_Paris_t.count()

## *****************************************************************************************************************

# *

# *

# II - Transformer et pivoter les données 

# 2.1 - Pivoter le dataframe

In [None]:
### dataframe initial avant le groupby

columns_no2 = no2_DF_Paris_t.columns[3:]
 
no2_DF_Paris_pivot = no2_DF_Paris_t.selectExpr('date','hour', 'week_day', "stack({}, {})".format(len(columns_no2), ', '.join(("'{}', {}".format(i, i) for i in columns_no2))))

no2_DF_Paris_pivot = no2_DF_Paris_pivot.withColumnRenamed("col0", "station_code")
no2_DF_Paris_pivot = no2_DF_Paris_pivot.withColumnRenamed("col1", "taux_no2")
display(no2_DF_Paris_pivot)
no2_DF_Paris_pivot.show()

In [None]:
no2_DF_Paris_pivot.printSchema()

# 2.2 - Charger le fichier qui contient les adresses des stations de mesure No2
### Ce fichier est le résultat du scrapping du site airparif
#


In [None]:
adress_stations = spark.read.csv("/Data/stationsNO2_paris.csv", sep=",", header = True)               
adress_stations.show(20)


In [None]:
adress_stations.printSchema()

In [None]:
### Join le fichier adress et fichier No2 avant le groupby

joined_no2_adress =  no2_DF_Paris_pivot.join(adress_stations, 'station_code', "inner")
joined_no2_adress.show()

pour_affich = joined_no2_adress.drop("station_code")
pour_affich.show()

In [None]:
no2_DF_Paris_pivot.count()

In [None]:
joined_no2_adress.count()

In [None]:
from pyspark.sql.functions import isnan, when, count, col
joined_no2_adress.select([count(when(col(c).isNull(), c)).alias(c) for c in joined_no2_adress.columns]).show()

In [None]:
join_no2_adress.coalesce(1).write \
.option("header",True) \
.csv("../resultat_merge_spark.csv")

In [None]:
## coalesce(1) ou repartition(1) --> exporter les resultats en 1 seul fichier csv
'''
join_no2_adress.coalesce(1).write \
.option("header",True) \
.csv("../resultat_merge_spark.csv")


join_no2_adress_h.coalesce(1).write \
.option("header",True) \
.csv("../resultat_merge_spark_h.csv")

join_no2_adress_j.coalesce(1).write \
.option("header",True) \
.csv("../resultat_merge_spark_j.csv")

'''

# III. Ingestion vers Cassandra

In [None]:
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider

CASSANDRA_HOST = ['localhost']
CASSANDRA_PORT = 9042
CASSANDRA_DB = "no2_Paris"
CASSANDRA_TABLE = "stationsno2_paris"

auth_provider = PlainTextAuthProvider(username='cassandra', password='cassandra')

try :
    cluster = Cluster(protocol_version=3,auth_provider=auth_provider,contact_points=CASSANDRA_HOST,load_balancing_policy=None ,port=CASSANDRA_PORT)
    session =cluster.connect()
except ValueError :
    print("Oops! échec de connexion cluster. Try again...")

In [None]:
#creation du key space
session.execute("CREATE KEYSPACE IF NOT EXISTS No2_Paris WITH REPLICATION={'class':'SimpleStrategy','replication_factor':3};")

In [None]:
# Creation de la table
session.execute("CREATE TABLE No2_Paris.stationsNO2_Paris(col_id Text, station_code Text, date Text, hour Text, week_day Text, taux_no2 Text, station_name Text, station_adress Text, gps_lat Text, gps_long Text, zip_code Text,primary key (col_id));")

In [None]:
session = cluster.connect('no2_paris')

In [None]:
## convert spark dataframe to pandas

no2_pd_dataframe = joined_no2_adress.select("*").toPandas()
no2_pd_dataframe["taux_no2"] = no2_pd_dataframe["taux_no2"].fillna(999)
no2_pd_dataframe["col_id"] = no2_pd_dataframe.index
no2_pd_dataframe = no2_pd_dataframe[["col_id",  "station_code", "date", "hour", "week_day", "taux_no2", "station_name", "station_adress", "gps_lat", "gps_long", "zip_code"]]
no2_pd_dataframe.head()

In [None]:
no2_pd_dataframe["col_id"] = no2_pd_dataframe["col_id"].astype(str)
no2_pd_dataframe["taux_no2"] = no2_pd_dataframe["taux_no2"].astype(str)
no2_pd_dataframe["zip_code"] = no2_pd_dataframe["zip_code"].astype(str)

In [None]:
no2_pd_dataframe.dtypes

In [None]:
query_insert="INSERT INTO no2_Paris.stationsno2_paris (col_id, station_code, date, hour, week_day, taux_no2, station_name, station_adress, gps_lat, gps_long, zip_code) VALUES ($${}$$, '{}','{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}', '{}');"

In [None]:
for ct in no2_pd_dataframe.index:  #
    CQL_query = query_insert.format(no2_pd_dataframe['col_id'][ct]  , no2_pd_dataframe['station_code'][ct],no2_pd_dataframe['date'][ct], no2_pd_dataframe['hour'][ct], no2_pd_dataframe['week_day'][ct] , no2_pd_dataframe['taux_no2'][ct], no2_pd_dataframe['station_name'][ct], no2_pd_dataframe['station_adress'][ct],no2_pd_dataframe['gps_lat'][ct],no2_pd_dataframe['gps_long'][ct], no2_pd_dataframe['zip_code'][ct],'{}')
    print(CQL_query)
    session.execute(CQL_query) 

In [None]:
r = session.execute('SELECT * FROM no2_paris.stationsno2_paris;')

In [None]:
#df=spark.read.format("org.apache.spark.sql.cassandra").options(table="stationsno2_paris", keyspace="no2_paris").load()
#no2_DF_Paris_pivot.write.format("org.apache.spark.sql.cassandra").mode('append').option("confirm.truncate","true").options(table="stationsno2_paris", keyspace="no2_paris").save()

# Transformation du dataset Traffic

In [None]:
Traffic_DF = spark.read.csv("../traffic_transf_pandas.csv", sep=",", header = True)               
Traffic_DF.show(2)
Traffic_DF.select("sensor_id", "sensor_name", "site_id", "trafic_per_hour",  "latitude", "longitude", "hour", "date", "zip_code").show()

In [None]:
Traffic_DF.printSchema()

In [None]:
Traffic_DF = Traffic_DF.withColumn("day_of_week", date_format('date', 'EEEE'))

In [None]:
Traffic_DF.coalesce(1).write \
.option("header",True) \
.csv("../Traffic.csv")

In [None]:
Traffic_DF.printSchema()

In [None]:
Traffic_DF = Traffic_DF[["sensor_id", "clean_sensor_name", "sensor_name" , "site_id", "site_name", "street_name", "gps", "latitude", "longitude", "zip_code", "trafic_per_hour", 
                         "recording_date", "date", "hour", "day_of_week", "sensor_installation_date", "orientation", "picture_link"]]

In [None]:
Traffic_DF = Traffic_DF.withColumn("trafic_per_hour"  , col("trafic_per_hour").cast("float"))
Traffic_DF.printSchema()

In [None]:
# group by trafic

Traffic_DF_gpd = Traffic_DF.groupby(['zip_code',"date","hour"]).agg(avg("trafic_per_hour"))
Traffic_DF_gpd.show()


In [None]:
Traffic_DF_gpd = Traffic_DF_gpd.withColumnRenamed("avg(trafic_per_hour)", "trafic_per_hour")

In [None]:
Traffic_DF_gpd.show()

In [None]:
from pyspark.sql.functions import monotonically_increasing_id 
Traffic_DF_gpd = Traffic_DF_gpd.select("*").withColumn("col_id", monotonically_increasing_id())

In [None]:
Traffic_DF_gpd.show()

In [None]:
Traffic_DF_gpd = Traffic_DF_gpd.withColumn('recording_date',concat(" ", "date", "hour"))
Traffic_DF_gpd.show()

In [None]:
 ## convert spark dataframe to pandas

trafic_pd_dataframe = Traffic_DF_gpd.select("*").toPandas()
trafic_pd_dataframe = trafic_pd_dataframe[["col_id", "date", "hour", "trafic_per_hour", "zip_code", "recording_date"]]
trafic_pd_dataframe.head()

In [None]:
trafic_pd_dataframe.dtypes

In [None]:
trafic_pd_dataframe["col_id"] = trafic_pd_dataframe["col_id"].astype(str)
trafic_pd_dataframe["avg(trafic_per_hour)"] = trafic_pd_dataframe["trafic_per_hour"].astype(str)

In [None]:
trafic_pd_dataframe.dtypes

In [None]:
#creation du key space
session.execute("CREATE KEYSPACE IF NOT EXISTS trafic_Paris WITH REPLICATION={'class':'SimpleStrategy','replication_factor':3};")

In [None]:
# Creation de la table
session.execute("CREATE TABLE trafic_Paris.trafic_velo_paris(col_id Text, date Text, hour Text, trafic_per_hour Text, zip_code Text, recording_date Text, primary key (col_id));")

In [None]:
session = cluster.connect('trafic_paris')

In [None]:
query_insert_trafic = "INSERT INTO no2_paris.trafic_velo_paris(col_id, date, hour, trafic_per_hour, zip_code, recording_date) VALUES ($${}$$, '{}','{}', '{}', '{}', '{}');"


In [None]:
trafic_pd_dataframe.dtypes

In [None]:
for ct in trafic_pd_dataframe.index:  #
    CQL_query = query_insert_trafic.format(trafic_pd_dataframe['col_id'][ct], trafic_pd_dataframe['date'][ct], trafic_pd_dataframe['hour'][ct], trafic_pd_dataframe['trafic_per_hour'][ct] , trafic_pd_dataframe['zip_code'][ct], trafic_pd_dataframe['recording_date'][ct],'{}')
    print(CQL_query)
    session.execute(CQL_query)

# ***************   Test ingestion cassandra ********************

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession

In [None]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--conf spark.ui.port=4040 --packages com.datastax.spark:spark-cassandra-connector_2.11:2.3.1 pyspark-shell'

In [None]:
spark = SparkSession.builder \
  .appName('Ingestion NO2 data') \
  .config('spark.cassandra.connection.host', 'localhost') \
  .config('spark.cassandra.connection.port', '9042') \
  .config("spark.cassandra.auth.username","cassandra")\
  .config("spark.cassandra.auth.password","cassandra")\
  .master('local[2]') \
  .getOrCreate()

In [None]:
Traffic_DF_gpd.show(5)

In [None]:
data_ingest = Traffic_DF_gpd.select(col("zip_code").alias("zip_code"), col("date").alias("date"), col("hour").alias("hour"), col("trafic_per_hour").alias("trafic_per_hour"), col("col_id").alias("col_id"), col("recording_date").alias("recording_date"))

In [None]:
data_ingest = data_ingest.withColumn("zip_code"  , col("zip_code"  ).cast(StringType())) \
                           .withColumn("date" , col("date" ).cast(StringType()))  \
                           .withColumn("hour" , col("hour" ).cast(StringType()))  \
                           .withColumn("trafic_per_hour" , col("trafic_per_hour" ).cast(StringType()))  \
                           .withColumn("col_id" , col("col_id" ).cast(StringType()))  \
                           .withColumn("recording_date" , col("recording_date" ).cast(StringType()))  
data_ingest.printSchema()


In [None]:
data_ingest.show()

In [None]:
#import org.apache.spark.sql.cassandra
data_ingest.write.format("org.apache.spark.sql.cassandra").mode('append').option("confirm.truncate","true").options(table="trafic_velo_paris", keyspace="trafic_paris").save()