In [1]:
# import all necessary package

from pyspark.sql import SparkSession
from pyspark.sql import functions as sf
import pandas as pd

In [2]:
# create session
spark = SparkSession.builder.appName('ExoPySpark').getOrCreate()

In [61]:
# create a temporary table for each csv file
dir = 'Exo_PySpark'

data_path_dict = {
    'traffic':f'{dir}/fichier1_trafic.csv',
    'topologie':f'{dir}/fichier2_topologie.csv',
    'geographique':f'{dir}/fichier3_Donnees_geographique.csv',
    'isis':f'{dir}/fichier4_couts_ISIS.csv'
}

for table_name , path in data_path_dict.items():
    spark.read.option('header',True).csv(path).createOrReplaceTempView(table_name)


In [62]:
# check if all tables are included
spark.catalog.listTables()

[Table(name='geographique', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='isis', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='topologie', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='traffic', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

In [134]:
# Filter on file 2
topologie = spark.table('topologie')
isis =  spark.table('isis')
geographique = spark.table('geographique')
traffic = spark.table('traffic')

# Issues :
# there is row with admin_state_port = Connexion KO
# Is the filter expression the best way to do the task


sub_topologie = topologie.filter("equipement LIKE 'PTN%' AND client LIKE '%PTN%' AND admin_state_port!='down' AND vlan=='NA' AND type_port_ou_mda == '10G'")\
    .withColumn('id_lien', sf.concat(topologie.equipement, sf.lit('_'), topologie.client))\
    .select('equipement', 'client', 'port', 'portlldp', 'interface_netwk', 'port_lag_netwk', 'type_port_ou_mda')

In [135]:
# step to follow
# function which add a string value to the column name of a spark dataframe
def add_to_col_name(df, value):
    return [column_name + value for column_name in df.columns]

# 1. merge topology table filtered result with isis using inner join by (nom_equipement, port, interface_netwk)

isis_renamed = isis.withColumnRenamed("nom_equipement","equipement")
merged_result = sub_topologie.join(isis_renamed, on=['equipement', 'port', 'interface_netwk'], how='inner')

# 2. merge result by client = geographique.nom_equipement to give column with _A

geographique_renamed_column_A = geographique.toDF(*add_to_col_name(geographique, '_A'))
merged_result = merged_result.join(geographique_renamed_column_A, merged_result.equipement == geographique_renamed_column_A.nom_equipement_A)

# 3. merge result by client = geographique.nom_equipement to give column with _B

geographique_renamed_column_B = geographique.toDF(*add_to_col_name(geographique, '_B'))
merged_result = merged_result.join(geographique_renamed_column_B, merged_result.equipement == geographique_renamed_column_B.nom_equipement_B)


# 4.

In [136]:
# check the merge result
merged_result.toPandas()

Unnamed: 0,equipement,port,interface_netwk,client,portlldp,port_lag_netwk,type_port_ou_mda,isis_level,isis_metric,nom_equipement_A,role_A,configuration_A,region_exploitation_A,nom_equipement_B,role_B,configuration_B,region_exploitation_B
0,PTN00122,1/1/2,TO_PTN00022_LAG22_WDM_FTTA,PTN00022,8/1/2,lag-22,10G,1,10,PTN00122,B,7750-SR7,IDF,PTN00122,B,7750-SR7,IDF
1,PTN00122,2/1/3,TO_PTN00022_LAG22_WDM_FTTA,PTN00022,8/1/4,lag-22,10G,1,10,PTN00122,B,7750-SR7,IDF,PTN00122,B,7750-SR7,IDF
2,PTN00820,2/1/3,TO_PTN00821_LAG_21,PTN00821,1/1/3,lag-21,10G,1,10,PTN00820,B,7750-SR7,WST,PTN00820,B,7750-SR7,WST
3,PTN00820,2/1/4,TO_PTN00821_LAG_21,PTN00821,2/1/1,lag-21,10G,1,10,PTN00820,B,7750-SR7,WST,PTN00820,B,7750-SR7,WST
4,PTN00651,4/1/2,TO_PTN00042_LAG22,PTN00042,2/1/2,lag-22,10G,1,10,PTN00651,B,7750-SR7,MED,PTN00651,B,7750-SR7,MED
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2825,PTN01404,1/1/2,TO_PTN01731_LAG-20,PTN01731,1/1/2,lag-20,10G,1,10,PTN01404,B,7750-SR7,WST,PTN01404,B,7750-SR7,WST
2826,PTN01404,1/1/3,TO_PTN01731_LAG-20,PTN01731,1/1/3,lag-20,10G,1,10,PTN01404,B,7750-SR7,WST,PTN01404,B,7750-SR7,WST
2827,PTN01404,2/1/2,TO_PTN00258_LAG-27,PTN00258,No LLDP,lag-27,10G,1,10,PTN01404,B,7750-SR7,WST,PTN01404,B,7750-SR7,WST
2828,PTN01404,2/1/3,TO_PTN00258_LAG-27,PTN00258,No LLDP,lag-27,10G,1,10,PTN01404,B,7750-SR7,WST,PTN01404,B,7750-SR7,WST


In [137]:
# Partie II

Part II

In [150]:
# change traffic_out and traffic_in type
traffic=traffic.withColumn('trafic_in', sf.regexp_replace('trafic_in', ',', '.'))
traffic=traffic.withColumn('trafic_out', sf.regexp_replace('trafic_out', ',', '.'))

traffic = traffic.withColumn('trafic_in', traffic.trafic_in.cast('float'))
traffic = traffic.withColumn('trafic_out', traffic.trafic_out.cast('float'))

traffic = traffic.filter("interface_equipement LIKE 'lag%'").withColumn('traffic_max',sf.greatest(traffic.trafic_in, traffic.trafic_out))


traffic_result = traffic.groupby(['equipement', 'interface_equipement', 'date']).max('traffic_max', 'trafic_out', 'trafic_in').toDF(*['equipement', 'interface_equipement','date', 'trafic_max', 'trafic_out', 'trafic_in',])

In [151]:
traffic_result.toPandas()

Unnamed: 0,equipement,interface_equipement,date,trafic_max,trafic_out,trafic_in
0,PTN00247,lag-20,2022-08-01,1443.899902,1443.899902,415.637573
1,PTN00026,lag-27,2022-08-01,6594.279785,6594.279785,591.415649
2,PTN00110,lag-103,2022-08-01,0.166656,0.166656,0.001432
3,PTN00255,lag-100,2022-08-01,23145.210938,3070.613037,23145.210938
4,PTN01446,lag-22,2022-08-01,6685.588379,6685.588379,849.665283
...,...,...,...,...,...,...
1436,PTN00319,lag-24,2022-08-01,2462.129883,365.048248,2462.129883
1437,PTN00095,lag-21,2022-08-01,4674.581055,4674.581055,441.838348
1438,PTN00075,lag-139,2022-08-01,18.686552,18.686552,0.012232
1439,PTN00239,lag-1,2022-08-01,2.469640,2.469640,0.042856


In [152]:
# select unique of concerned topologie by 'equipement', 'client', 'port_lag_netwk'
sub_top_unique = sub_topologie.dropDuplicates(['equipement', 'client', 'port_lag_netwk']).select('equipement', 'client', 'port_lag_netwk', 'type_port_ou_mda', 'interface_netwk')


In [153]:
# join subtract of topologie with traffic result computed earlier
result = sub_top_unique.join(traffic_result, (traffic_result.equipement == sub_top_unique.equipement) & (traffic_result.interface_equipement == sub_top_unique.port_lag_netwk)).drop(traffic_result.equipement)

In [154]:
result.toPandas()

Unnamed: 0,equipement,client,port_lag_netwk,type_port_ou_mda,interface_netwk,interface_equipement,date,trafic_max,trafic_out,trafic_in
0,PTN00247,PTN00246,lag-20,10,TO_PTN00246_LAG_20,lag-20,2022-08-01,1443.899902,1443.899902,415.637573
1,PTN00026,PTN00507,lag-27,10,TO_PTN00507_LAG27,lag-27,2022-08-01,6594.279785,6594.279785,591.415649
2,PTN01446,PTN02406,lag-22,10,TO_PTN02406_LAG22,lag-22,2022-08-01,6685.588379,6685.588379,849.665283
3,PTN00105,PTN00100,lag-21,10,TO_PTN00100_LAG21,lag-21,2022-08-01,2527.676270,34.930023,2527.676270
4,PTN00092,PTN00073,lag-192,10,TO_PTN00073_LAG-192,lag-192,2022-08-01,505.761993,0.252048,505.761993
...,...,...,...,...,...,...,...,...,...,...
887,PTN00480,PTN00027,lag-22,10,TO_PTN00027_LAG22,lag-22,2022-08-01,6746.111328,1576.010742,6746.111328
888,PTN00488,PTN00486,lag-21,10,TO_PTN00486_LAG21,lag-21,2022-08-01,18644.550781,18644.550781,2266.364746
889,PTN00319,PTN00217,lag-24,10,TO_PTN00217_LAG24,lag-24,2022-08-01,2462.129883,365.048248,2462.129883
890,PTN00095,PTN00104,lag-21,10,TO_PTN00104_LAG21,lag-21,2022-08-01,4674.581055,4674.581055,441.838348


In [155]:
# check if result correspond to given example
result.filter("equipement IN ('PTN00216','PTN00218') AND client IN ('PTN00216','PTN00218')").toPandas()

Unnamed: 0,equipement,client,port_lag_netwk,type_port_ou_mda,interface_netwk,interface_equipement,date,trafic_max,trafic_out,trafic_in
0,PTN00216,PTN00218,lag-21,10,TO_PTN00218_LAG21,lag-21,2022-08-01,7023.918457,7023.918457,869.327148
1,PTN00218,PTN00216,lag-21,10,TO_PTN00216_LAG21,lag-21,2022-08-01,7034.388184,867.169983,7034.388184


In [156]:
# add geographique information for equipement and client

geographique_renamed_column_A = geographique.toDF(*add_to_col_name(geographique, '_A'))
merged_result = result.join(geographique_renamed_column_A, result.equipement == geographique_renamed_column_A.nom_equipement_A)

geographique_renamed_column_B = geographique.toDF(*add_to_col_name(geographique, '_B'))
merged_result = merged_result.join(geographique_renamed_column_B, merged_result.equipement == geographique_renamed_column_B.nom_equipement_B)


In [157]:
merged_result = merged_result.drop(*['nom_equipement_A', 'nom_equipement_B'])

In [158]:
# get capacite_lien
sub_topologie=sub_topologie.withColumn('type_port_ou_mda', sf.regexp_replace('type_port_ou_mda', 'G', ''))
sub_topologie=sub_topologie.withColumn('type_port_ou_mda',sub_topologie.type_port_ou_mda.cast('int'))
capacite_lien = sub_topologie.groupby(['equipement', 'client']).sum('type_port_ou_mda').withColumnRenamed('sum(type_port_ou_mda)', 'capacite_lien')

In [159]:
# merge result with capacite_lien
res = merged_result.join(capacite_lien, on=['equipement', 'client'])

In [161]:
res.toPandas()

Unnamed: 0,equipement,client,port_lag_netwk,type_port_ou_mda,interface_netwk,interface_equipement,date,trafic_max,trafic_out,trafic_in,role_A,configuration_A,region_exploitation_A,role_B,configuration_B,region_exploitation_B,capacite_lien
0,PTN00247,PTN00246,lag-20,10,TO_PTN00246_LAG_20,lag-20,2022-08-01,1443.899902,1443.899902,415.637573,B,7750-SR7,WST,B,7750-SR7,WST,40
1,PTN00026,PTN00507,lag-27,10,TO_PTN00507_LAG27,lag-27,2022-08-01,6594.279785,6594.279785,591.415649,A,7750-SR12,WST,A,7750-SR12,WST,10
2,PTN01446,PTN02406,lag-22,10,TO_PTN02406_LAG22,lag-22,2022-08-01,6685.588379,6685.588379,849.665283,B,7750-SR7,CTA,B,7750-SR7,CTA,40
3,PTN00105,PTN00100,lag-21,10,TO_PTN00100_LAG21,lag-21,2022-08-01,2527.676270,34.930023,2527.676270,B,7750-SR7,MED,B,7750-SR7,MED,20
4,PTN00092,PTN00073,lag-192,10,TO_PTN00073_LAG-192,lag-192,2022-08-01,505.761993,0.252048,505.761993,B,7750-SR7,IDF,B,7750-SR7,IDF,50
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
887,PTN00480,PTN00027,lag-22,10,TO_PTN00027_LAG22,lag-22,2022-08-01,6746.111328,1576.010742,6746.111328,B,7750-SR7,WST,B,7750-SR7,WST,20
888,PTN00488,PTN00486,lag-21,10,TO_PTN00486_LAG21,lag-21,2022-08-01,18644.550781,18644.550781,2266.364746,B,7750-SR7,WST,B,7750-SR7,WST,30
889,PTN00319,PTN00217,lag-24,10,TO_PTN00217_LAG24,lag-24,2022-08-01,2462.129883,365.048248,2462.129883,B,7750-SR7,NOE,B,7750-SR7,NOE,30
890,PTN00095,PTN00104,lag-21,10,TO_PTN00104_LAG21,lag-21,2022-08-01,4674.581055,4674.581055,441.838348,A,7750-SR12,MED,A,7750-SR12,MED,20


In [162]:
# create columns tdc
res = res.withColumn('tdc', ((res.trafic_max)/1000)/res.capacite_lien * 100)

In [166]:
merged_result = res.join(isis, (res.equipement == isis.nom_equipement) & (res.interface_netwk == isis.interface_netwk)).drop(isis.interface_netwk).drop(res.interface_netwk).drop(*['region_exploitation_A', 'nom_equipement', 'port', 'type_port_ou_mda']).withColumnRenamed('region_Exploitation_B', 'region_Exploitation').withColumnRenamed('port_lag_netwk', 'interface_client')


In [167]:
# MB to giga

col_list = ['trafic_max', 'trafic_in', 'trafic_out']
for col in col_list:
    merged_result = merged_result.withColumn(col, merged_result[col] / 1000)

In [177]:
# add id lien column
merged_result = merged_result.withColumn("id_lien", sf.concat(merged_result.equipement, sf.lit('_'), merged_result.client))

In [178]:
from pyspark.sql.types import StringType
def get_intervalle_charge(value):
    if value <=70:
        return 'Inf 70%'
    else:
        if value<=90:
            return 'Sup 70% Inf 90%'
        else:
            if value<100:
                return 'Sup 90%'
            else:
                return 'Erreur'





In [179]:
# issues : this can't work
# int_charge = sf.udf(lambda z:get_intervalle_charge(z), StringType())
# merged_result.withColumn("intervalle_charge", int_charge(merged_result.tdc))

# little hack for intervalle charge
df_pandas_result = merged_result.toPandas()
df_pandas_result["intervalle_charge"] = df_pandas_result.tdc.map(lambda z: get_intervalle_charge(z))

In [180]:
df_pandas_result

Unnamed: 0,equipement,client,interface_client,interface_equipement,date,trafic_max,trafic_out,trafic_in,role_A,configuration_A,role_B,configuration_B,region_Exploitation,capacite_lien,tdc,isis_level,isis_metric,id_lien,intervalle_charge
0,PTN00247,PTN00246,lag-20,lag-20,2022-08-01,1.443900,1.443900,0.415638,B,7750-SR7,B,7750-SR7,WST,40,3.609750,1,10,PTN00247_PTN00246,Inf 70%
1,PTN00247,PTN00246,lag-20,lag-20,2022-08-01,1.443900,1.443900,0.415638,B,7750-SR7,B,7750-SR7,WST,40,3.609750,1,10,PTN00247_PTN00246,Inf 70%
2,PTN00247,PTN00246,lag-20,lag-20,2022-08-01,1.443900,1.443900,0.415638,B,7750-SR7,B,7750-SR7,WST,40,3.609750,1,10,PTN00247_PTN00246,Inf 70%
3,PTN00247,PTN00246,lag-20,lag-20,2022-08-01,1.443900,1.443900,0.415638,B,7750-SR7,B,7750-SR7,WST,40,3.609750,1,10,PTN00247_PTN00246,Inf 70%
4,PTN00026,PTN00507,lag-27,lag-27,2022-08-01,6.594280,6.594280,0.591416,A,7750-SR12,A,7750-SR12,WST,10,65.942798,1,20,PTN00026_PTN00507,Inf 70%
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2529,PTN00319,PTN00217,lag-24,lag-24,2022-08-01,2.462130,0.365048,2.462130,B,7750-SR7,B,7750-SR7,NOE,30,8.207100,1,10,PTN00319_PTN00217,Inf 70%
2530,PTN00095,PTN00104,lag-21,lag-21,2022-08-01,4.674581,4.674581,0.441838,A,7750-SR12,A,7750-SR12,MED,20,23.372905,1,10,PTN00095_PTN00104,Inf 70%
2531,PTN00095,PTN00104,lag-21,lag-21,2022-08-01,4.674581,4.674581,0.441838,A,7750-SR12,A,7750-SR12,MED,20,23.372905,1,10,PTN00095_PTN00104,Inf 70%
2532,PTN00100,PTN00104,lag-20,lag-20,2022-08-01,4.283467,0.385071,4.283467,B,7750-SR7,B,7750-SR7,MED,20,21.417336,1,10,PTN00100_PTN00104,Inf 70%


In [182]:
df_pandas_result.to_csv('traffic_result.csv')