In [1]:
# Configuratins related to Cassandra connector & Cluster
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.datastax.spark:spark-cassandra-connector_2.11:2.3.2 --conf spark.cassandra.connection.host=127.0.0.1 pyspark-shell'

In [2]:
from pyspark import SparkContext
sc = SparkContext()

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession(sc)

# Import des fichiers grippe et météo depuis HDFS

## Import du fichier grippe

In [4]:
# File location and type
file_location = "hdfs://172.17.0.2:8020/user/cloudera/Projet_Grippe/incidence-REG-3.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","


# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

df.show()

+------+---------+----+-------+------+------+----------+---------+---------+--------------------+
|  week|indicator| inc|inc_low|inc_up|inc100|inc100_low|inc100_up|geo_insee|            geo_name|
+------+---------+----+-------+------+------+----------+---------+---------+--------------------+
|201847|        3| 706|      0|  1619|    37|         0|       85|       42|              ALSACE|
|201847|        3| 469|      0|  1220|    14|         0|       36|       72|           AQUITAINE|
|201847|        3|  79|      0|   234|     6|         0|       17|       83|            AUVERGNE|
|201847|        3| 420|      0|   942|    28|         0|       62|       25|     BASSE-NORMANDIE|
|201847|        3|   0|      0|     0|     0|         0|        0|       26|           BOURGOGNE|
|201847|        3| 863|      0|  1877|    25|         0|       55|       53|            BRETAGNE|
|201847|        3| 698|      1|  1395|    26|         0|       52|       24|              CENTRE|
|201847|        3| 2

In [5]:
permanent_table_name = "incidence_REG"

df.write.format("parquet").saveAsTable(permanent_table_name)

## Import données météo

In [6]:
# File location and type
file_location = "hdfs://172.17.0.2:8020/user/cloudera/Projet_Grippe/donnees-synop-essentielles-espaces.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ";"


# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

df = df.withColumnRenamed("ID OMM station", "id_station")

df.show()

+----------+--------------------+--------+---------------------+--------------------------+--------------+------------+-----------+-----------+--------+----------------------+-------------+--------------------+------------+-----------------+---------------------------------+-----------------------------------+---------------------------+-----------------------+---------------------------+----------------+-------------------+------------+----------------------+-------------------+-------------------+-------------------+-------------------+-----------------------+----------------------------------------------+-------------------------------+-----------------+---------------+---------------------+--------+--------------------+---------------------+----------------------------+-----------------------------+-------------------------------+-------------------------------+--------------------------------+--------------------------------+-----------------+-----------------+-----------------+--

In [7]:
permanent_table_name = "meteo"

df.write.format("parquet").saveAsTable(permanent_table_name)

In [8]:
print(spark.catalog.listTables())

[Table(name='incidence_reg', database='default', description=None, tableType='MANAGED', isTemporary=False), Table(name='meteo', database='default', description=None, tableType='MANAGED', isTemporary=False)]


# Requêtes sur les tables 

In [9]:
df_meteo = spark.sql("SELECT id_station, Date, pression, vitesse_vent, temperature, humidite FROM meteo WHERE id_station='07149'")
df_meteo.show()

+----------+--------------------+--------+------------+-----------+--------+
|id_station|                Date|pression|vitesse_vent|temperature|humidite|
+----------+--------------------+--------+------------+-----------+--------+
|      7149|2015-02-03 15:00:...|  100540|         3.6|     275.75|      53|
|      7149|2017-10-30 12:00:...|  102900|         4.8|     285.15|      57|
|      7149|2017-11-05 21:00:...|  102040|         1.9|     278.25|      92|
|      7149|2018-01-18 18:00:...|  100770|         4.0|     281.45|      82|
|      7149|2018-01-19 21:00:...|  101540|         5.9|     276.95|      84|
|      7149|2018-01-24 15:00:...|  101590|         9.7|     285.95|      68|
|      7149|2018-01-21 18:00:...|  101370|         1.6|     280.75|      98|
|      7149|2010-11-22 15:00:...|  100550|         4.1|     277.95|      78|
|      7149|2016-10-02 12:00:...|  101750|         4.8|     290.15|      57|
|      7149|2018-01-05 12:00:...|  100060|         5.1|     283.15|      82|

In [10]:
#Formatage date du df_meteo
import pyspark.sql.functions as f
#from pyspark.sql.functions import to_timestamp


#Création colonne date et time
split_col_1 = f.split(df_meteo['Date'], 'T')
df_meteo = df_meteo.withColumn('f_Date', split_col_1.getItem(0))

split_col_2 = f.split(df_meteo['f_Date'], ' ')
df_meteo = df_meteo.withColumn('ff_Date', split_col_2.getItem(0))
df_meteo = df_meteo.withColumn('time', split_col_2.getItem(1))


#Selection des colonnes
df_meteo=df_meteo.select(df_meteo.ff_Date.alias("date"), "time" , "temperature", 'humidite', 'pression' , 'vitesse_vent')

df_meteo.show()

+----------+--------+-----------+--------+--------+------------+
|      date|    time|temperature|humidite|pression|vitesse_vent|
+----------+--------+-----------+--------+--------+------------+
|2015-02-03|15:00:00|     275.75|      53|  100540|         3.6|
|2017-10-30|12:00:00|     285.15|      57|  102900|         4.8|
|2017-11-05|21:00:00|     278.25|      92|  102040|         1.9|
|2018-01-18|18:00:00|     281.45|      82|  100770|         4.0|
|2018-01-19|21:00:00|     276.95|      84|  101540|         5.9|
|2018-01-24|15:00:00|     285.95|      68|  101590|         9.7|
|2018-01-21|18:00:00|     280.75|      98|  101370|         1.6|
|2010-11-22|15:00:00|     277.95|      78|  100550|         4.1|
|2016-10-02|12:00:00|     290.15|      57|  101750|         4.8|
|2018-01-05|12:00:00|     283.15|      82|  100060|         5.1|
|2018-01-05|15:00:00|     283.45|      79|  100060|         5.5|
|2013-04-27|18:00:00|     282.85|      50|  101190|         4.6|
|2013-04-26|00:00:00|    

In [11]:
df_meteo.count()

25681

In [12]:
import datetime
from pyspark.sql.types import *

#Creation colonne year, month et day_month
split_col_3 = f.split(df_meteo['date'], '-')
df_meteo=df_meteo.withColumn("year_m", split_col_3.getItem(0))
df_meteo=df_meteo.withColumn("month", split_col_3.getItem(1))
df_meteo=df_meteo.withColumn("day_month", split_col_3.getItem(2))

#Cast en integer
df_meteo = df_meteo.withColumn("year_m", df_meteo["year_m"].cast("integer"))
df_meteo = df_meteo.withColumn("month", df_meteo["month"].cast("integer"))
df_meteo = df_meteo.withColumn("day_month", df_meteo["day_month"].cast("integer"))

#Creation colonne isoweek
def date_to_isoweek(iso_year, iso_week, iso_day):
    res = datetime.date(iso_year,iso_week,iso_day).isocalendar()[1]
    return res  

udf_isoweek = f.udf(date_to_isoweek, IntegerType())
df_meteo = df_meteo.withColumn('isoweek_m', udf_isoweek(df_meteo.year_m,df_meteo.month,df_meteo.day_month))

df_meteo.show()

+----------+--------+-----------+--------+--------+------------+------+-----+---------+---------+
|      date|    time|temperature|humidite|pression|vitesse_vent|year_m|month|day_month|isoweek_m|
+----------+--------+-----------+--------+--------+------------+------+-----+---------+---------+
|2015-02-03|15:00:00|     275.75|      53|  100540|         3.6|  2015|    2|        3|        6|
|2017-10-30|12:00:00|     285.15|      57|  102900|         4.8|  2017|   10|       30|       44|
|2017-11-05|21:00:00|     278.25|      92|  102040|         1.9|  2017|   11|        5|       44|
|2018-01-18|18:00:00|     281.45|      82|  100770|         4.0|  2018|    1|       18|        3|
|2018-01-19|21:00:00|     276.95|      84|  101540|         5.9|  2018|    1|       19|        3|
|2018-01-24|15:00:00|     285.95|      68|  101590|         9.7|  2018|    1|       24|        4|
|2018-01-21|18:00:00|     280.75|      98|  101370|         1.6|  2018|    1|       21|        3|
|2010-11-22|15:00:00

In [13]:
df_meteo=df_meteo.select("year_m", "isoweek_m", "time" , "temperature", 'humidite', 'pression' , 'vitesse_vent')
df_meteo.count()

25681

In [14]:
df_meteo.show()

+------+---------+--------+-----------+--------+--------+------------+
|year_m|isoweek_m|    time|temperature|humidite|pression|vitesse_vent|
+------+---------+--------+-----------+--------+--------+------------+
|  2015|        6|15:00:00|     275.75|      53|  100540|         3.6|
|  2017|       44|12:00:00|     285.15|      57|  102900|         4.8|
|  2017|       44|21:00:00|     278.25|      92|  102040|         1.9|
|  2018|        3|18:00:00|     281.45|      82|  100770|         4.0|
|  2018|        3|21:00:00|     276.95|      84|  101540|         5.9|
|  2018|        4|15:00:00|     285.95|      68|  101590|         9.7|
|  2018|        3|18:00:00|     280.75|      98|  101370|         1.6|
|  2010|       47|15:00:00|     277.95|      78|  100550|         4.1|
|  2016|       39|12:00:00|     290.15|      57|  101750|         4.8|
|  2018|        1|12:00:00|     283.15|      82|  100060|         5.1|
|  2018|        1|15:00:00|     283.45|      79|  100060|         5.5|
|  201

In [15]:
#Formatage temperature et pression
df_meteo = df_meteo.withColumn('temperature', f.round(df_meteo.temperature - float(273.15),0))
df_meteo = df_meteo.withColumn("pression", df_meteo["pression"].cast("integer"))
df_meteo.count()

25681

In [16]:
#Filtrage des horaires. 12H00 retenu uniquement (hypothèse)
df_meteo = df_meteo.filter("Time == '12:00:00' ")

In [17]:
df_meteo.count()

3212

In [18]:
#GroupBy(isoweek et year), moyenne des temperature, humidité, pression et vitesse_vent
df_meteo_final = df_meteo.groupBy("year_m", "isoweek_m").mean()
df_meteo_final = df_meteo_final.select("year_m", "isoweek_m", "avg(temperature)" , "avg(humidite)", "avg(pression)", "avg(vitesse_vent)")

#Renommer les colonnes
df_meteo_final = df_meteo_final.withColumnRenamed("avg(temperature)", "avg_temperature")
df_meteo_final = df_meteo_final.withColumnRenamed("avg(humidite)", "avg_humidite")
df_meteo_final = df_meteo_final.withColumnRenamed("avg(pression)", "avg_pression")
df_meteo_final = df_meteo_final.withColumnRenamed("avg(vitesse_vent)", "avg_vitesse_vent")

df_meteo_final.show()

+------+---------+------------------+------------------+------------------+------------------+
|year_m|isoweek_m|   avg_temperature|      avg_humidite|      avg_pression|  avg_vitesse_vent|
+------+---------+------------------+------------------+------------------+------------------+
|  2018|       22|22.714285714285715| 62.57142857142857|101665.71428571429| 2.828571428571429|
|  2012|       39|              16.0|63.142857142857146|100922.85714285714| 5.871428571428572|
|  2012|       24|17.571428571428573| 66.28571428571429|101374.28571428571|               4.8|
|  2015|       19|16.857142857142858| 58.57142857142857|101592.85714285714|               5.1|
|  2011|       42|12.714285714285714| 65.57142857142857|102084.28571428571| 3.757142857142857|
|  2014|       23|20.714285714285715|60.142857142857146|101388.57142857143|3.0142857142857147|
|  2016|       24|17.714285714285715|              69.0|100925.71428571429| 3.942857142857143|
|  2012|       10| 9.428571428571429| 66.142857142

In [19]:
#Cast to integer
df_meteo_final = df_meteo_final.withColumn("avg_temperature", df_meteo_final["avg_temperature"].cast("integer"))
df_meteo_final = df_meteo_final.withColumn("avg_humidite", df_meteo_final["avg_humidite"].cast("integer"))
df_meteo_final = df_meteo_final.withColumn("avg_pression", df_meteo_final["avg_pression"].cast("integer"))
df_meteo_final = df_meteo_final.withColumn("avg_vitesse_vent", df_meteo_final["avg_vitesse_vent"].cast("integer"))

df_meteo_final.show()

+------+---------+---------------+------------+------------+----------------+
|year_m|isoweek_m|avg_temperature|avg_humidite|avg_pression|avg_vitesse_vent|
+------+---------+---------------+------------+------------+----------------+
|  2018|       22|             22|          62|      101665|               2|
|  2012|       39|             16|          63|      100922|               5|
|  2012|       24|             17|          66|      101374|               4|
|  2015|       19|             16|          58|      101592|               5|
|  2011|       42|             12|          65|      102084|               3|
|  2014|       23|             20|          60|      101388|               3|
|  2016|       24|             17|          69|      100925|               3|
|  2012|       10|              9|          66|      103107|               4|
|  2018|       38|             22|          57|      101751|               6|
|  2017|       20|             21|          48|      102042|    

# Import des données Grippe

In [20]:
df_grippe = spark.sql("SELECT * FROM incidence_REG")
df_grippe.show()

+------+---------+----+-------+------+------+----------+---------+---------+--------------------+
|  week|indicator| inc|inc_low|inc_up|inc100|inc100_low|inc100_up|geo_insee|            geo_name|
+------+---------+----+-------+------+------+----------+---------+---------+--------------------+
|201847|        3| 706|      0|  1619|    37|         0|       85|       42|              ALSACE|
|201847|        3| 469|      0|  1220|    14|         0|       36|       72|           AQUITAINE|
|201847|        3|  79|      0|   234|     6|         0|       17|       83|            AUVERGNE|
|201847|        3| 420|      0|   942|    28|         0|       62|       25|     BASSE-NORMANDIE|
|201847|        3|   0|      0|     0|     0|         0|        0|       26|           BOURGOGNE|
|201847|        3| 863|      0|  1877|    25|         0|       55|       53|            BRETAGNE|
|201847|        3| 698|      1|  1395|    26|         0|       52|       24|              CENTRE|
|201847|        3| 2

In [21]:
#Filtre ILE-DE-FRANCE
df_grippe = df_grippe.filter("geo_name == 'ILE-DE-FRANCE' ")
df_grippe.show()

+------+---------+----+-------+------+------+----------+---------+---------+-------------+
|  week|indicator| inc|inc_low|inc_up|inc100|inc100_low|inc100_up|geo_insee|     geo_name|
+------+---------+----+-------+------+------+----------+---------+---------+-------------+
|201847|        3|3333|    891|  5775|    27|         7|       47|       11|ILE-DE-FRANCE|
|201846|        3|1320|      0|  2759|    11|         0|       23|       11|ILE-DE-FRANCE|
|201845|        3| 830|      0|  1826|     7|         0|       15|       11|ILE-DE-FRANCE|
|201844|        3| 710|      0|  1694|     6|         0|       14|       11|ILE-DE-FRANCE|
|201843|        3| 583|      0|  1322|     5|         0|       11|       11|ILE-DE-FRANCE|
|201842|        3| 665|      0|  1430|     5|         0|       11|       11|ILE-DE-FRANCE|
|201841|        3|1695|     30|  3360|    14|         0|       28|       11|ILE-DE-FRANCE|
|201840|        3|1221|      0|  2592|    10|         0|       21|       11|ILE-DE-FRANCE|

In [22]:
#Cast "week" en string
df_grippe = df_grippe.withColumn("week", df_grippe["week"].cast("string"))

#Split Annee et n° Semaine
df_grippe = df_grippe.withColumn("year_g", df_grippe.week.substr(1, 4))
df_grippe = df_grippe.withColumn("isoweek_g", df_grippe.week.substr(5, 7))

df_grippe.show()

+------+---------+----+-------+------+------+----------+---------+---------+-------------+------+---------+
|  week|indicator| inc|inc_low|inc_up|inc100|inc100_low|inc100_up|geo_insee|     geo_name|year_g|isoweek_g|
+------+---------+----+-------+------+------+----------+---------+---------+-------------+------+---------+
|201847|        3|3333|    891|  5775|    27|         7|       47|       11|ILE-DE-FRANCE|  2018|       47|
|201846|        3|1320|      0|  2759|    11|         0|       23|       11|ILE-DE-FRANCE|  2018|       46|
|201845|        3| 830|      0|  1826|     7|         0|       15|       11|ILE-DE-FRANCE|  2018|       45|
|201844|        3| 710|      0|  1694|     6|         0|       14|       11|ILE-DE-FRANCE|  2018|       44|
|201843|        3| 583|      0|  1322|     5|         0|       11|       11|ILE-DE-FRANCE|  2018|       43|
|201842|        3| 665|      0|  1430|     5|         0|       11|       11|ILE-DE-FRANCE|  2018|       42|
|201841|        3|1695|     

In [23]:
import datetime #installer le package via le terminal
from pyspark.sql.types import *
from pyspark.sql.functions import lit

#Cast "week" en string
df_grippe = df_grippe.withColumn("year_g", df_grippe["year_g"].cast("integer"))
df_grippe = df_grippe.withColumn("isoweek_g", df_grippe["isoweek_g"].cast("integer"))


#isoweek to date (en considérant le dimanche)
def iso_to_gregorian(iso_year, iso_week, iso_day):
    jan4 = datetime.date(iso_year, 1, 4)
    start = jan4 - datetime.timedelta(days=jan4.isoweekday()-1)
    return str(start + datetime.timedelta(weeks=iso_week-1, days=iso_day-1))

udf_fct = f.udf(iso_to_gregorian, StringType())
df_grippe = df_grippe.withColumn('date', udf_fct(df_grippe.year_g,df_grippe.isoweek_g,lit(7)))

In [24]:
df_grippe_final=df_grippe.select("year_g", "isoweek_g", "date", "inc", "inc_low", "inc_up", "inc100", "inc100_low", "inc100_up")
df_grippe_final.show()

+------+---------+----------+----+-------+------+------+----------+---------+
|year_g|isoweek_g|      date| inc|inc_low|inc_up|inc100|inc100_low|inc100_up|
+------+---------+----------+----+-------+------+------+----------+---------+
|  2018|       47|2018-11-25|3333|    891|  5775|    27|         7|       47|
|  2018|       46|2018-11-18|1320|      0|  2759|    11|         0|       23|
|  2018|       45|2018-11-11| 830|      0|  1826|     7|         0|       15|
|  2018|       44|2018-11-04| 710|      0|  1694|     6|         0|       14|
|  2018|       43|2018-10-28| 583|      0|  1322|     5|         0|       11|
|  2018|       42|2018-10-21| 665|      0|  1430|     5|         0|       11|
|  2018|       41|2018-10-14|1695|     30|  3360|    14|         0|       28|
|  2018|       40|2018-10-07|1221|      0|  2592|    10|         0|       21|
|  2018|       39|2018-09-30| 223|      0|   901|     2|         0|        8|
|  2018|       38|2018-09-23| 744|      0|  1753|     6|        

In [25]:
df_grippe_final.printSchema()

root
 |-- year_g: integer (nullable = true)
 |-- isoweek_g: integer (nullable = true)
 |-- date: string (nullable = true)
 |-- inc: integer (nullable = true)
 |-- inc_low: integer (nullable = true)
 |-- inc_up: integer (nullable = true)
 |-- inc100: integer (nullable = true)
 |-- inc100_low: integer (nullable = true)
 |-- inc100_up: integer (nullable = true)



# Import des vacances scolaires

In [26]:
# File location and type
file_location = "hdfs://172.17.0.2:8020/user/cloudera/Projet_Grippe/vacances_scolaire.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","


# The applied options are for CSV files. For other file types, these will be ignored.
df_vac = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

# Permanent table creation

permanent_table_name = "vacances"
df_vac.write.format("parquet").saveAsTable(permanent_table_name)
df_vac.printSchema()

#Suppression des colonnes inutiles

df_vac = df_vac.drop('vacances_zone_a', 'vacances_zone_b', 'nom_vacances')

#Cast de la date en string

df_vac = df_vac.withColumn("date", df_vac["date"].cast("string"))

#Recuperation de la date et suppression des heures

split_col_date = f.split(df_vac['date'], ' ')
df_vac=df_vac.withColumn("f_date", split_col_date.getItem(0))
df_vac = df_vac.drop('date')
df_vac=df_vac.withColumnRenamed("f_date", 'date_v')
df_vac = df_vac.select('date_v','vacances_zone_c')

df_vac.show()



root
 |-- date: timestamp (nullable = true)
 |-- vacances_zone_a: boolean (nullable = true)
 |-- vacances_zone_b: boolean (nullable = true)
 |-- vacances_zone_c: boolean (nullable = true)
 |-- nom_vacances: string (nullable = true)

+----------+---------------+
|    date_v|vacances_zone_c|
+----------+---------------+
|2008-01-01|          false|
|2008-01-02|          false|
|2008-01-03|          false|
|2008-01-04|          false|
|2008-01-05|          false|
|2008-01-06|          false|
|2008-01-07|          false|
|2008-01-08|          false|
|2008-01-09|          false|
|2008-01-10|          false|
|2008-01-11|          false|
|2008-01-12|          false|
|2008-01-13|          false|
|2008-01-14|          false|
|2008-01-15|          false|
|2008-01-16|          false|
|2008-01-17|          false|
|2008-01-18|          false|
|2008-01-19|          false|
|2008-01-20|          false|
+----------+---------------+
only showing top 20 rows



# Croisement des données

In [27]:
#Jointure meteo - grippe
df_final = df_meteo_final.join(df_grippe_final, (df_meteo_final.year_m== df_grippe_final.year_g) & (df_meteo_final.isoweek_m== df_grippe_final.isoweek_g))

df_final.show()

+------+---------+---------------+------------+------------+----------------+------+---------+----------+-----+-------+------+------+----------+---------+
|year_m|isoweek_m|avg_temperature|avg_humidite|avg_pression|avg_vitesse_vent|year_g|isoweek_g|      date|  inc|inc_low|inc_up|inc100|inc100_low|inc100_up|
+------+---------+---------------+------------+------------+----------------+------+---------+----------+-----+-------+------+------+----------+---------+
|  2018|       22|             22|          62|      101665|               2|  2018|       22|2018-06-03|  323|      0|   738|     3|         0|        6|
|  2012|       39|             16|          63|      100922|               5|  2012|       39|2012-09-30|  644|      0|  1304|     5|         0|       11|
|  2012|       24|             17|          66|      101374|               4|  2012|       24|2012-06-17|   54|      0|   312|     0|         0|        2|
|  2015|       19|             16|          58|      101592|          

In [28]:
#Création colonne id et cast en integer
from pyspark.sql.functions import concat, col
df_final = df_final.withColumn("id", concat(col("year_m"), col("isoweek_m")) )
df_final = df_final.withColumn("id", df_final["id"].cast("integer"))


df_final=df_final.select("id", "year_m", "isoweek_m","date" ,"avg_temperature", "avg_humidite", "avg_pression", "avg_vitesse_vent", "inc", "inc_low", "inc_up", "inc100", "inc100_low", "inc100_up" )

df_final.show()

+------+------+---------+----------+---------------+------------+------------+----------------+-----+-------+------+------+----------+---------+
|    id|year_m|isoweek_m|      date|avg_temperature|avg_humidite|avg_pression|avg_vitesse_vent|  inc|inc_low|inc_up|inc100|inc100_low|inc100_up|
+------+------+---------+----------+---------------+------------+------------+----------------+-----+-------+------+------+----------+---------+
|201822|  2018|       22|2018-06-03|             22|          62|      101665|               2|  323|      0|   738|     3|         0|        6|
|201239|  2012|       39|2012-09-30|             16|          63|      100922|               5|  644|      0|  1304|     5|         0|       11|
|201224|  2012|       24|2012-06-17|             17|          66|      101374|               4|   54|      0|   312|     0|         0|        2|
|201519|  2015|       19|2015-05-10|             16|          58|      101592|               5| 2258|      0|  4820|    19|       

In [29]:
#Jointure vacances scolaires avec meteo_grippe

df_grippe_meteo_vac = df_final.join(df_vac, df_final.date == df_vac.date_v)
df_grippe_meteo_vac.show()

+------+------+---------+----------+---------------+------------+------------+----------------+-----+-------+------+------+----------+---------+----------+---------------+
|    id|year_m|isoweek_m|      date|avg_temperature|avg_humidite|avg_pression|avg_vitesse_vent|  inc|inc_low|inc_up|inc100|inc100_low|inc100_up|    date_v|vacances_zone_c|
+------+------+---------+----------+---------------+------------+------------+----------------+-----+-------+------+------+----------+---------+----------+---------------+
|201822|  2018|       22|2018-06-03|             22|          62|      101665|               2|  323|      0|   738|     3|         0|        6|2018-06-03|          false|
|201239|  2012|       39|2012-09-30|             16|          63|      100922|               5|  644|      0|  1304|     5|         0|       11|2012-09-30|          false|
|201224|  2012|       24|2012-06-17|             17|          66|      101374|               4|   54|      0|   312|     0|         0|      

In [30]:
df_grippe_meteo_vac = df_grippe_meteo_vac.drop('date_v')
df_grippe_meteo_vac.printSchema()

root
 |-- id: integer (nullable = true)
 |-- year_m: integer (nullable = true)
 |-- isoweek_m: integer (nullable = true)
 |-- date: string (nullable = true)
 |-- avg_temperature: integer (nullable = true)
 |-- avg_humidite: integer (nullable = true)
 |-- avg_pression: integer (nullable = true)
 |-- avg_vitesse_vent: integer (nullable = true)
 |-- inc: integer (nullable = true)
 |-- inc_low: integer (nullable = true)
 |-- inc_up: integer (nullable = true)
 |-- inc100: integer (nullable = true)
 |-- inc100_low: integer (nullable = true)
 |-- inc100_up: integer (nullable = true)
 |-- vacances_zone_c: boolean (nullable = true)



# Export vers ElasticSearch

In [None]:
import pandas as pd
pandas_grippe_meteo = df_grippe_meteo_vac.toPandas()
pandas_grippe_meteo.to_json(orient='records',path_or_buf = "grippe_meteo_vac.json")
pandas_grippe_meteo.to_csv(path_or_buf = "bokeh/grippe_meteo_vac.csv")

In [None]:
import elasticsearch
from elasticsearch import helpers
es = elasticsearch.Elasticsearch(['172.17.0.4'],http_auth=('elastic', 'secret'), scheme="http", port=9200)

In [None]:
# import data from pandas dataframe into elastic

import json
tmp = pandas_grippe_meteo.to_json(orient='records')
df_json= json.loads(tmp)
for doc in df_json:
    es.index(index="test", doc_type="grippe_meteo",body=doc)

# Stockage sur Cassandra

In [31]:
 df_grippe_meteo_vac.write\
    .format("org.apache.spark.sql.cassandra")\
    .mode('append')\
    .options(table="grippe_meteo_vac", keyspace="smart_data")\
    .save()