In [1]:
import os
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [2]:
spark

# Initialise données

In [3]:
from pyspark.sql.functions import *
from pyspark.sql.types     import StructType, \
     StructField, FloatType, \
     IntegerType, StringType

spark = SparkSession.builder\
          .config("spark.jars.packages",
                         "io.delta:delta-core_2.12:0.8.0") \
          .config("spark.sql.extensions",
                         "io.delta.sql.DeltaSparkSessionExtension")\
          .getOrCreate()

meteoDataFrame  = spark.read.format('csv')\
    .option('sep',';')\
    .option('header','true')\
    .option('nullValue','mq')\
    .option('inferSchema', 'true')\
    .load('donnees/meteo')\
    .cache()

meteoDataFrame.columns
meteoDataFrame.printSchema()

schema = StructType([
        StructField('Id'           , StringType() , True),
        StructField('ville'        , StringType() , True),
        StructField('latitude'     , FloatType() , True),
        StructField('longitude'    , FloatType() , True),
        StructField('altitude'     , IntegerType() , True)])

villes  = spark.read.format('csv')   \
      .option('sep',';')                \
      .option('mergeSchema', 'true')    \
      .option('header','true')          \
      .schema(schema)                   \
      .load('/user/spark/donnees/postesSynop.csv')  \
      .cache()

@udf("string")
def formatVille(ville):
    if ville in ['CLERMONT-FD','MONT-DE-MARSAN',
                                   'ST-PIERRE','ST-BARTHELEMY METEO'] :
        return ville.title()
    else :
        if ville.find('-') != -1 :
            return ville[0:ville.find('-')].title()
        else:
            return ville.title()

villesT  = villes.select(
                col('Id').alias('id'),
                formatVille('ville').alias('ville'),
               'latitude',
               'longitude',
               'altitude')


meteo = meteoDataFrame.select(
                 col('numer_sta'),
                 to_timestamp(col('date').cast('string'),'yyyyMMddHHmmss'),
                 col('date')[0:4].cast('int') ,
                 col('date')[5:2].cast('int'),
                 col('date')[7:2].cast('int'),
                 col('date')[5:4],
                 round(col('t') - 273.15,2),
                 col('u') / 100 ,
                 col('vv') / 1000 ,
                 col('pres') / 1000,
                 coalesce( col('rr3'),
                           col('rr24')/8,
                           col('rr12')/4,
                           col('rr6')/2,
                           col('rr1')*3  ) )\
             .toDF('id','date','annee','mois','jour','mois_jour','temperature',
                   'humidite','visibilite','pression','precipitations')\
             .cache()

meteo.select('annee','mois','jour','temperature','humidite',
             'visibilite','pression').show(3)

meteoFance = meteo.where('id < 8000')\
             .join(villesT,'id')\
             .select(initcap(regexp_replace('ville','-',' ')).alias('ville'),
                     'annee','mois','jour','temperature',
                     'humidite','visibilite','pression','precipitations')

meteoFance.write\
       .mode('overwrite')\
       .format('parquet')\
       .partitionBy('annee')\
       .option('path', '/user/spark/donnees/meteoFrance')\
       .save()

meteo.join(villesT,'id')\
     .select(initcap(regexp_replace('ville','-',' ')).alias('ville'),
                     'annee','mois','jour','temperature',
                     'humidite','visibilite','pression','precipitations')\
     .write\
     .mode('overwrite')\
     .format('parquet')\
     .partitionBy('annee')\
     .option('path', '/user/spark/donnees/meteoGlobal')\
     .save()

data = [('Ajaccio'     ,'dfa' ),
                  ('Angers'      ,'dfa' ),
                  ('Angoulème'   ,'dfa' ),
                  ('Besançon'    ,'dfa' ),
                  ('Biarritz'    ,'dfa' ),
                  ('Bordeaux'    ,'dfa' ),
                  ('Brest'       ,'dfa' ),
                  ('Caen'        ,'dfa' ),
                  ('Clermont-Fd' ,'dfa' ),
                  ('Dijon'       ,'dfa' ),
                  ('Embrun'      ,'dfa' ),
                  ('Grenoble'    ,'dfa' ),
                  ('Lille'       ,'dfa' ),
                  ('Limoges'     ,'dfa' ),
                  ('Lyon'        ,'dfa' ),
                  ('Marseille'   ,'dfa' ),
                  ('Montpellier' ,'dfa' ),
                  ('Nancy'       ,'dfa' ),
                  ('Nantes'      ,'dfa' ),
                  ('Nice'        ,'dfa' ),
                  ('Nîmes'       ,'dfa' ),
                  ('Orléans'     ,'dfa' ),
                  ('Paris'       ,'dfa' )]

dfa = spark.sparkContext.parallelize(data).toDF(['ville','valeur'])

data = [ ('Nancy'       ,'dfb' ),
          ('Nantes'      ,'dfb' ),
          ('Nice'        ,'dfb' ),
          ('Nîmes'       ,'dfb' ),
          ('Orléans'     ,'dfb' ),
          ('Paris'       ,'dfb' ),
          ('Perpignan'   ,'dfb' ),
          ('Poitiers'    ,'dfb' ),
          ('Reims'       ,'dfb' ),
          ('Rennes'      ,'dfb' ),
          ('Rouen'       ,'dfb' ),
          ('St-Quentin'  ,'dfb' ),
          ('Strasbourg'  ,'dfb' ),
          ('Toulon'      ,'dfb' ),
          ('Toulouse'    ,'dfb' ),
          ('Tours'       ,'dfb' ),
          ('Vichy'       ,'dfb' )]

dfb = spark.sparkContext.parallelize(data).toDF(['ville','valeur'])

root
 |-- numer_sta: integer (nullable = true)
 |-- date: long (nullable = true)
 |-- pmer: integer (nullable = true)
 |-- tend: integer (nullable = true)
 |-- cod_tend: integer (nullable = true)
 |-- dd: integer (nullable = true)
 |-- ff: double (nullable = true)
 |-- t: double (nullable = true)
 |-- td: double (nullable = true)
 |-- u: integer (nullable = true)
 |-- vv: integer (nullable = true)
 |-- ww: integer (nullable = true)
 |-- w1: integer (nullable = true)
 |-- w2: integer (nullable = true)
 |-- n: integer (nullable = true)
 |-- nbas: integer (nullable = true)
 |-- hbas: integer (nullable = true)
 |-- cl: integer (nullable = true)
 |-- cm: integer (nullable = true)
 |-- ch: integer (nullable = true)
 |-- pres: integer (nullable = true)
 |-- niv_bar: integer (nullable = true)
 |-- geop: integer (nullable = true)
 |-- tend24: integer (nullable = true)
 |-- tn12: double (nullable = true)
 |-- tn24: double (nullable = true)
 |-- tx12: double (nullable = true)
 |-- tx24: double (n

In [25]:
from pyspark.sql.functions import *
from pyspark.sql import Window

meteoFance = meteo.where('id < 8000')\
             .join(villes.withColumnRenamed('Id', 'id'),'id')\
             .select(initcap(regexp_replace('ville','-',' ')).alias('ville'),
                     'annee','mois','jour','temperature',
                     'humidite','visibilite','pression','precipitations')

meteoFance.count()
meteoFance.selectExpr('ville','annee','mois','jour','temperature as t',
                      'humidite as h','visibilite as v',
                      'pression as p','precipitations as e').show()

meteoMM = meteoFance.where("ville = 'Mont De Marsan' and \
                                annee = 2019")\
                     .select('mois','jour','temperature','humidite','visibilite',
                             'pression','precipitations')
meteoMM.show()

+-------------------+-----+----+----+---+----+-----+------+----+
|              ville|annee|mois|jour|  t|   h|    v|     p|   e|
+-------------------+-----+----+----+---+----+-----+------+----+
|          Abbeville| 2019|  12|   1|3.7|0.79| 20.0|100.86| 0.0|
|      Lille Lesquin| 2019|  12|   1|2.8|0.87|12.23|101.38| 0.0|
|    Pte De La Hague| 2019|  12|   1|8.7|0.75| 10.0|101.39| 0.0|
|     Caen Carpiquet| 2019|  12|   1|4.9| 0.8|30.18|100.62| 0.0|
|         Rouen Boos| 2019|  12|   1|3.5|0.84|39.54| 99.68| 0.0|
|       Reims Prunay| 2019|  12|   1|1.7|0.89| 20.0|100.53| 0.0|
|     Brest Guipavas| 2019|  12|   1|7.1|0.91| 30.3|100.09| 0.0|
|        Ploumanac'h| 2019|  12|   1|8.0|0.95| null|100.61| 2.0|
|  Rennes St Jacques| 2019|  12|   1|6.2|0.92|18.06| 100.7| 2.0|
|            Alencon| 2019|  12|   1|4.3|0.89|13.52| 99.52|-0.1|
|               Orly| 2019|  12|   1|4.7|0.77|17.23| 100.4| 0.0|
|    Troyes Barberey| 2019|  12|   1|3.9|0.83| 20.0| 100.1| 0.0|
|        Nancy Ochey| 201

# row_number

In [26]:
jourPOby = Window.partitionBy('mois').orderBy('jour')
jourOby  = Window.orderBy('mois','jour')
meteoMM.where("annee = 2019")\
       .groupBy('mois','jour')\
       .agg( round(sum('precipitations'),2).alias('prec'))\
       .select('mois','jour',
          col('prec').alias('prec'),
          round(sum('prec').over(jourPOby),2).alias('s1'),
          row_number().over(jourPOby).alias('rn1'),
          round(sum('prec').over(jourOby),2).alias('s2'),
          row_number().over(jourOby).alias('rn2'))\
       .show(35)

+----+----+----+----+---+-----+---+
|mois|jour|prec|  s1|rn1|   s2|rn2|
+----+----+----+----+---+-----+---+
|   1|   1|-0.3|-0.3|  1| -0.3|  1|
|   1|   2|-0.1|-0.4|  2| -0.4|  2|
|   1|   3| 0.0|-0.4|  3| -0.4|  3|
|   1|   4| 0.0|-0.4|  4| -0.4|  4|
|   1|   5| 0.0|-0.4|  5| -0.4|  5|
|   1|   6| 0.0|-0.4|  6| -0.4|  6|
|   1|   7| 0.0|-0.4|  7| -0.4|  7|
|   1|   8| 0.3|-0.1|  8| -0.1|  8|
|   1|   9| 0.1| 0.0|  9|  0.0|  9|
|   1|  10| 0.0| 0.0| 10|  0.0| 10|
|   1|  11| 0.0| 0.0| 11|  0.0| 11|
|   1|  12|-0.3|-0.3| 12| -0.3| 12|
|   1|  13| 2.2| 1.9| 13|  1.9| 13|
|   1|  14| 0.8| 2.7| 14|  2.7| 14|
|   1|  15| 0.0| 2.7| 15|  2.7| 15|
|   1|  16| 0.0| 2.7| 16|  2.7| 16|
|   1|  17| 2.8| 5.5| 17|  5.5| 17|
|   1|  18|-0.3| 5.2| 18|  5.2| 18|
|   1|  19|-0.3| 4.9| 19|  4.9| 19|
|   1|  20| 0.6| 5.5| 20|  5.5| 20|
|   1|  21|-0.1| 5.4| 21|  5.4| 21|
|   1|  22| 7.1|12.5| 22| 12.5| 22|
|   1|  23| 9.4|21.9| 23| 21.9| 23|
|   1|  24| 0.0|21.9| 24| 21.9| 24|
|   1|  25|-0.2|21.7| 25| 21

# rank

In [27]:
jourPOby = Window.partitionBy('mois').orderBy('jour')
palmaresM  = Window.partitionBy('mois').orderBy(desc('prec'))
palmaresA  = Window.orderBy(desc('prec'))
meteoMM.where("annee = 2019")\
       .groupBy('mois','jour')\
       .agg( round(sum('precipitations'),2).alias('prec'))\
       .select('mois','jour',
          col('prec').alias('prec'),
          round(sum('prec').over(jourPOby),2).alias('s1'),
          row_number().over(jourPOby).alias('rn1'),
          rank().over(palmaresM).alias('rk1'),
          rank().over(palmaresA).alias('rk2'))\
       .orderBy(desc('prec'))\
       .show(35)

+----+----+----+-----+---+---+---+
|mois|jour|prec|   s1|rn1|rk1|rk2|
+----+----+----+-----+---+---+---+
|  11|   5|52.5|133.2|  5|  1|  1|
|   1|  31|44.2| 98.0| 31|  1|  2|
|   6|   5|42.4| 60.9|  5|  1|  3|
|   7|   9|41.4| 41.9|  9|  1|  4|
|  11|  16|39.3|298.0| 16|  2|  5|
|   4|   6|34.7| 57.7|  6|  1|  6|
|  11|   4|31.5| 80.7|  4|  3|  7|
|   4|  26|28.9|152.8| 26|  2|  8|
|   6|  21|28.9|105.5| 21|  2|  8|
|   5|  17|26.2| 45.8| 17|  1| 10|
|  11|   8|26.1|202.5|  8|  4| 11|
|  12|  12|25.0| 43.0| 12|  1| 12|
|  12|  13|25.0| 68.0| 13|  1| 12|
|  11|   7|22.1|176.4|  7|  5| 14|
|   5|  18|21.8| 67.6| 18|  2| 15|
|  11|   6|21.1|154.3|  6|  6| 16|
|   4|  15|20.4| 89.5| 15|  3| 17|
|   8|  11|19.7| 48.3| 11|  1| 18|
|  11|  17|19.3|317.3| 17|  7| 19|
|  11|  23|18.8|339.6| 23|  8| 20|
|   6|   4|18.4| 18.5|  4|  3| 21|
|  11|  14|18.2|242.1| 14|  9| 22|
|  11|   2|18.1| 32.7|  2| 10| 23|
|  11|  15|16.6|258.7| 15| 11| 24|
|  11|   3|16.5| 49.2|  3| 12| 25|
|   2|   1|15.2| 15.

# lag lead 

In [28]:
palmaresA  = Window.orderBy("mois")
meteoMM.where("annee = 2019")\
       .groupBy('mois')\
       .agg( round(sum('precipitations'),2).alias('prec'))\
       .select('mois','prec',
          round(sum('prec').over(palmaresA),2).alias('s1'),
          ntile(4).over(palmaresA).alias('ntile'),
          lag('prec',1).over(palmaresA).alias('lag'),
          lead('prec',1).over(palmaresA).alias('lead'))\
       .show(35)

+----+-----+------+-----+-----+-----+
|mois| prec|    s1|ntile|  lag| lead|
+----+-----+------+-----+-----+-----+
|   1| 98.0|  98.0|    1| null| 30.9|
|   2| 30.9| 128.9|    1| 98.0| 38.1|
|   3| 38.1| 167.0|    1| 30.9|155.4|
|   4|155.4| 322.4|    2| 38.1| 92.6|
|   5| 92.6| 415.0|    2|155.4|105.5|
|   6|105.5| 520.5|    2| 92.6| 61.3|
|   7| 61.3| 581.8|    3|105.5| 56.9|
|   8| 56.9| 638.7|    3| 61.3| 31.4|
|   9| 31.4| 670.1|    3| 56.9| 94.4|
|  10| 94.4| 764.5|    4| 31.4|365.3|
|  11|365.3|1129.8|    4| 94.4| 98.7|
|  12| 98.7|1228.5|    4|365.3| null|
+----+-----+------+-----+-----+-----+

