<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Jointures" data-toc-modified-id="Jointures-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Jointures</a></span><ul class="toc-item"><li><span><a href="#inner" data-toc-modified-id="inner-1.1"><span class="toc-item-num">1.1&nbsp;&nbsp;</span>inner</a></span></li><li><span><a href="#outer" data-toc-modified-id="outer-1.2"><span class="toc-item-num">1.2&nbsp;&nbsp;</span>outer</a></span></li></ul></li></ul></div>

In [1]:
import os
from distutils.sysconfig import get_python_lib,get_python_inc,get_python_version

In [2]:
os.environ['SPARK_HOME']=f'{get_python_lib()}{os.sep}pyspark'
os.environ['PYSPARK_PYTHON']=f'{get_python_lib()}{os.sep}pyspark'.split('Lib')[0]+'python.exe'

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types     import StructType, \
     StructField, FloatType, \
     IntegerType, StringType

In [4]:
referentiel = os.path.join(os.getcwd(),'warehouse')
os.makedirs(referentiel, exist_ok=True)

In [5]:
referentiel

'F:\\PythonFormation\\Spark-DataFrames\\warehouse'

In [6]:
spark = SparkSession \
    .builder \
    .appName("PresentationSpark") \
    .config("spark.sql.warehouse.dir", referentiel) \
    .config("spark.executor.cores", 8) \
    .config("spark.executor.memory", '24g') \
    .enableHiveSupport() \
    .getOrCreate()

In [7]:
spark

In [8]:
print(f"spark.executor.cores = {spark.conf.get('spark.executor.cores')}\nspark.executor.memory = {spark.conf.get('spark.executor.memory')}")

spark.executor.cores = 8
spark.executor.memory = 24g


In [9]:
from pyspark.sql.functions import *
from pyspark.sql.types     import StructType, \
     StructField, FloatType, \
     IntegerType, StringType

meteoDataFrame  = spark.read.format('csv')\
    .option('sep',';')\
    .option('header','true')\
    .option('nullValue','mq')\
    .option('inferSchema', 'true')\
    .load('../donnees/meteo/*/') #    .cache()

schema = StructType([
        StructField('Id'           , StringType() , True),
        StructField('ville'        , StringType() , True),
        StructField('latitude'     , FloatType() , True),
        StructField('longitude'    , FloatType() , True),
        StructField('altitude'     , IntegerType() , True)])

villes  = spark.read.format('csv')   \
      .option('sep',';')                \
      .option('mergeSchema', 'true')    \
      .option('header','true')          \
      .schema(schema)                   \
      .load('../donnees/postesSynop.csv')  #    .cache()

@udf("string")
def formatVille(ville):
    if ville in ['CLERMONT-FD','MONT-DE-MARSAN',
                                   'ST-PIERRE','ST-BARTHELEMY METEO'] :
        return ville.title()
    else :
        if ville.find('-') != -1 :
            return ville[0:ville.find('-')].title()
        else:
            return ville.title()

villesT  = villes.select(
                col('Id').alias('id'),
                formatVille('ville').alias('ville'),
               'latitude',
               'longitude',
               'altitude')


meteo = meteoDataFrame.select(
                 col('numer_sta'),
                 to_timestamp(col('date').cast('string'),'yyyyMMddHHmmss'),
                 col('date')[0:4].cast('int') ,
                 col('date')[5:2].cast('int'),
                 col('date')[7:2].cast('int'),
                 col('date')[5:4],
                 round(col('t') - 273.15,2),
                 col('u') / 100 ,
                 col('vv') / 1000 ,
                 col('pres') / 1000,
                 coalesce( col('rr3'),
                           col('rr24')/8,
                           col('rr12')/4,
                           col('rr6')/2,
                           col('rr1')*3  ) )\
             .toDF('id','date','annee','mois','jour','mois_jour','temperature',
                   'humidite','visibilite','pression','precipitations') #             .cache()

meteo.select('annee','mois','jour','temperature','humidite',
             'visibilite','pression').toPandas().head(3)

Unnamed: 0,annee,mois,jour,temperature,humidite,visibilite,pression
0,2023,8,1,15.4,0.97,20.0,99.46
1,2023,8,1,15.6,0.95,51.23,99.63
2,2023,8,1,16.3,0.92,12.0,100.43


In [10]:
villesT.toPandas().head(3)

Unnamed: 0,id,ville,latitude,longitude,altitude
0,7005,Abbeville,50.136002,1.834,69
1,7015,Lille,50.57,3.0975,47
2,7020,Pte De La Hague,49.725166,-1.939833,6


In [11]:
data = [('Ajaccio'     ,'dfa' ),
        ('Angers'      ,'dfa' ),
        ('Angoulème'   ,'dfa' ),
        ('Besançon'    ,'dfa' ),
        ('Biarritz'    ,'dfa' ),
        ('Bordeaux'    ,'dfa' ),
        ('Brest'       ,'dfa' ),
        ('Caen'        ,'dfa' ),
        ('Clermont-Fd' ,'dfa' ),
        ('Dijon'       ,'dfa' ),
        ('Embrun'      ,'dfa' ),
        ('Grenoble'    ,'dfa' ),
        ('Lille'       ,'dfa' ),
        ('Limoges'     ,'dfa' ),
        ('Lyon'        ,'dfa' ),
        ('Marseille'   ,'dfa' ),
        ('Montpellier' ,'dfa' ),
        ('Nancy'       ,'dfa' ),
        ('Nantes'      ,'dfa' ),
        ('Nice'        ,'dfa' ),
        ('Nîmes'       ,'dfa' ),
        ('Orléans'     ,'dfa' ),
        ('Paris'       ,'dfa' )]

dfa = spark.sparkContext.parallelize(data).toDF(['ville','valeur'])

data = [ ('Nancy'       ,'dfb' ),
          ('Nantes'      ,'dfb' ),
          ('Nice'        ,'dfb' ),
          ('Nîmes'       ,'dfb' ),
          ('Orléans'     ,'dfb' ),
          ('Paris'       ,'dfb' ),
          ('Perpignan'   ,'dfb' ),
          ('Poitiers'    ,'dfb' ),
          ('Reims'       ,'dfb' ),
          ('Rennes'      ,'dfb' ),
          ('Rouen'       ,'dfb' ),
          ('St-Quentin'  ,'dfb' ),
          ('Strasbourg'  ,'dfb' ),
          ('Toulon'      ,'dfb' ),
          ('Toulouse'    ,'dfb' ),
          ('Tours'       ,'dfb' ),
          ('Vichy'       ,'dfb' )]

dfb = spark.sparkContext.parallelize(data).toDF(['ville','valeur'])

# Jointures

<img src="https://raw.githubusercontent.com/rbizoi/AnalyserLesDonneesAvecSpark/main/DataFrameSpark/images/M06-07.png" width="400">   

## inner

In [12]:
meteo.join(villesT,
       meteo.id == villesT.id)\
       .select('ville','annee','mois_jour',
               'temperature','precipitations')\
       .toPandas().head(10)

Unnamed: 0,ville,annee,mois_jour,temperature,precipitations
0,Abbeville,2023,801,15.4,0.4
1,Lille,2023,801,15.6,0.6
2,Pte De La Hague,2023,801,16.3,0.0
3,Caen,2023,801,15.9,2.0
4,Rouen,2023,801,15.6,13.4
5,Reims,2023,801,18.6,2.4
6,Brest,2023,801,15.0,-0.1
7,Ploumanac'H,2023,801,15.9,0.2
8,Rennes,2023,801,16.8,0.6
9,Alencon,2023,801,18.0,3.0


In [13]:
meteo.join(villes,
       meteo['id'].eqNullSafe(villes['Id']))\
       .select('ville','annee','mois_jour',
               'temperature','precipitations')\
       .toPandas().head(10)

Unnamed: 0,ville,annee,mois_jour,temperature,precipitations
0,ABBEVILLE,2023,801,15.4,0.4
1,LILLE-LESQUIN,2023,801,15.6,0.6
2,PTE DE LA HAGUE,2023,801,16.3,0.0
3,CAEN-CARPIQUET,2023,801,15.9,2.0
4,ROUEN-BOOS,2023,801,15.6,13.4
5,REIMS-PRUNAY,2023,801,18.6,2.4
6,BREST-GUIPAVAS,2023,801,15.0,-0.1
7,PLOUMANAC'H,2023,801,15.9,0.2
8,RENNES-ST JACQUES,2023,801,16.8,0.6
9,ALENCON,2023,801,18.0,3.0


In [14]:
meteo.join(villes.withColumnRenamed('Id', 'id'),'id')\
       .select('ville','annee','mois_jour',
               'temperature','precipitations')\
       .toPandas().head(10)

Unnamed: 0,ville,annee,mois_jour,temperature,precipitations
0,ABBEVILLE,2023,801,15.4,0.4
1,LILLE-LESQUIN,2023,801,15.6,0.6
2,PTE DE LA HAGUE,2023,801,16.3,0.0
3,CAEN-CARPIQUET,2023,801,15.9,2.0
4,ROUEN-BOOS,2023,801,15.6,13.4
5,REIMS-PRUNAY,2023,801,18.6,2.4
6,BREST-GUIPAVAS,2023,801,15.0,-0.1
7,PLOUMANAC'H,2023,801,15.9,0.2
8,RENNES-ST JACQUES,2023,801,16.8,0.6
9,ALENCON,2023,801,18.0,3.0


In [15]:
dfa.join(dfb,'ville').toPandas()

Unnamed: 0,ville,valeur,valeur.1
0,Nancy,dfa,dfb
1,Nantes,dfa,dfb
2,Nice,dfa,dfb
3,Nîmes,dfa,dfb
4,Orléans,dfa,dfb
5,Paris,dfa,dfb


In [16]:
dfa.join(dfb,dfa['ville'] == dfb['ville'],'inner').toPandas().head(50)

Unnamed: 0,ville,valeur,ville.1,valeur.1
0,Nancy,dfa,Nancy,dfb
1,Nantes,dfa,Nantes,dfb
2,Nice,dfa,Nice,dfb
3,Nîmes,dfa,Nîmes,dfb
4,Orléans,dfa,Orléans,dfb
5,Paris,dfa,Paris,dfb


## outer 

In [17]:
dfa.join(dfb,dfa['ville'] == dfb['ville'],'outer').toPandas().head(50)

Unnamed: 0,ville,valeur,ville.1,valeur.1
0,Ajaccio,dfa,,
1,Angers,dfa,,
2,Angoulème,dfa,,
3,Besançon,dfa,,
4,Biarritz,dfa,,
5,Bordeaux,dfa,,
6,Brest,dfa,,
7,Caen,dfa,,
8,Clermont-Fd,dfa,,
9,Dijon,dfa,,


In [18]:
dfa.join(dfb,dfa['ville'] == dfb['ville'],'full').toPandas().head(50)

Unnamed: 0,ville,valeur,ville.1,valeur.1
0,Ajaccio,dfa,,
1,Angers,dfa,,
2,Angoulème,dfa,,
3,Besançon,dfa,,
4,Biarritz,dfa,,
5,Bordeaux,dfa,,
6,Brest,dfa,,
7,Caen,dfa,,
8,Clermont-Fd,dfa,,
9,Dijon,dfa,,


In [19]:
dfa.join(dfb,dfa['ville'] == dfb['ville'],'left').toPandas().head(50)

Unnamed: 0,ville,valeur,ville.1,valeur.1
0,Angers,dfa,,
1,Ajaccio,dfa,,
2,Angoulème,dfa,,
3,Bordeaux,dfa,,
4,Biarritz,dfa,,
5,Besançon,dfa,,
6,Caen,dfa,,
7,Brest,dfa,,
8,Dijon,dfa,,
9,Grenoble,dfa,,


In [20]:
dfa.join(dfb,dfa['ville'] == dfb['ville'],'right').toPandas().head(50)

Unnamed: 0,ville,valeur,ville.1,valeur.1
0,Nantes,dfa,Nantes,dfb
1,Nancy,dfa,Nancy,dfb
2,Nice,dfa,Nice,dfb
3,Nîmes,dfa,Nîmes,dfb
4,Paris,dfa,Paris,dfb
5,Orléans,dfa,Orléans,dfb
6,,,Poitiers,dfb
7,,,Perpignan,dfb
8,,,Reims,dfb
9,,,Rennes,dfb
