# import

In [28]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import pyspark
from pyspark.sql import SparkSession
from statistics import mean
from difflib import SequenceMatcher

In [29]:
# create SparkSession
spark = SparkSession.builder.master("local[*]") \
                    .appName('SparkSNCF') \
                    .getOrCreate()

# extract SparkContext
sc = spark.sparkContext

print('Spark is ready with CPU usage :', sc.defaultParallelism)

Spark is ready with CPU usage : 2


# import data

In [30]:
table_prix = sc.textFile('./tarifs-tgv-par-od.csv')
table_prix

./tarifs-tgv-par-od.csv MapPartitionsRDD[60] at textFile at NativeMethodAccessorImpl.java:0

In [31]:
table_prix_head = table_prix.take(1)
table_prix_no_header = table_prix.filter(lambda row: row != table_prix_head[0])

In [32]:
new_table_prix = table_prix_no_header.map(lambda line: tuple(line.split(';')[0:4:3])) \
                                     .map(lambda x: tuple(x[0].split('-', 1) + [float(x[1])])) \
                                     .map(lambda x : (x[0].strip(), x[1].strip(), x[2]))
columns = ["origine","destination","prix_1"]
df = spark.createDataFrame(data=new_table_prix, schema = columns)

df.show()

+--------------------+--------------------+------+
|             origine|         destination|prix_1|
+--------------------+--------------------+------+
|               NIMES|        LILLE EUROPE| 184.0|
|MOUTIERS SALINS B...|             QUIMPER| 190.0|
|              LANDRY|             QUIMPER| 190.0|
|  TGV HAUTE PICARDIE|                AGDE| 188.0|
|  TGV HAUTE PICARDIE|                SETE| 188.0|
|  PARIS GARE DE LYON|              TOULON| 166.0|
|  PARIS GARE DE LYON|         MONTBELIARD| 119.0|
| LIMOGES BENEDICTINS|AEROPORT CDG 2 TG...| 111.0|
|         CHATEAUROUX|AEROPORT CDG 2 TG...|  86.0|
|CHAMBERY CHALLES ...|           MASSY TGV| 136.0|
|           MASSY TGV|AIX LES BAINS LE ...| 136.0|
|AEROPORT CDG 2 TG...|AIX LES BAINS LE ...| 136.0|
|          NICE VILLE|    CHALON SUR SAONE| 151.0|
|             BELFORT|MONTBELIARD TGV-L...|  53.0|
|MARSEILLE ST CHARLES|     LONS LE SAUNIER| 119.0|
|     BESANCON VIOTTE|        LILLE EUROPE| 155.0|
|      ANGERS ST LAUD|         

In [33]:
def itineraryPrice(station1, station2):
    station1, station2 = station1.strip().upper(), station2.strip().upper()
    tuple_prix = df.where(f"(origine = '{station1}' AND destination = '{station2}' OR origine = '{station2}' AND destination = '{station1}')").collect()
    if tuple_prix == []:
        return None
    elif sum([row[2] in ('', 'nan', None) for row in tuple_prix]) > 0:
        return None
    else:
        return mean([row[2] for row in tuple_prix])
print(itineraryPrice("nimes", "LILLE EUROPE"))
print(itineraryPrice("nimes", "PALITA"))
print(itineraryPrice("TOURS", "LILLE FLANDRES"))
print(itineraryPrice('CULOZ', 'LIBOURNE'))

184.0
None
134.0
None


In [34]:
df_gare_loc_sql = spark.read.option("delimiter", ";").option("header",True).csv("./referentiel-gares-voyageurs.csv")
df_gare_loc_sql.createOrReplaceTempView("sncf_tgv_loc")

from geopy.distance import geodesic
df_gare_loc = spark.sql("SELECT `Intitulé gare`, Latitude, Longitude FROM sncf_tgv_loc")
df_gare_loc_pd = df_gare_loc.toPandas()
df_gare_loc_pd.dropna(inplace=True)
pds = df_gare_loc_pd.apply(lambda x: (x[0], (float(x[1]), float(x[2]))), axis=1, result_type='expand')
pds.columns = ['gare', 'coordonne']
pds

Unnamed: 0,gare,coordonne
0,Osséja,"(42.4199665, 1.9771166)"
1,Font-Romeu-Odeillo-Via,"(42.4912317, 2.0383917)"
2,Planès,"(42.5009185, 2.1369269)"
3,Sauto,"(42.5063705, 2.1600389)"
4,Thuès Carença,"(42.5229486, 2.2227279)"
...,...,...
2862,La Barasse,"(43.2858384, 5.4845101)"
2863,Rosa Parks,"(48.89602, 2.37397)"
2864,Nice Pont Michel,"(43.72254, 7.29142)"
2865,Irigny Yvours,"(45.69002, 4.83229)"


In [35]:
def similarWord(str):
  for interest in self.interests:
    for keyword in keywords:
       s = SequenceMatcher(None,interest,keyword)
       match_freq = s.ratio()
       if match_freq >= self.limit:
            #print interest, keyword, match_freq
            final_score += 1
            break 

In [40]:
def assimilate(nomDeGare):
  pds['similarity'] = pds['gare'].map(lambda x: SequenceMatcher(None, x, nomDeGare).ratio())
  return  pds.iloc[pds['similarity'].idxmax()]['coordonne']

In [43]:
df['coordonne origin'] = df['origine'].map(lambda x: assimilate(x))
df['coordonne destinations'] = df['destination'].map(lambda x: assimilate(x))

IndexError: ignored

In [42]:
df = df.toPandas()

In [44]:
pds

Unnamed: 0,gare,coordonne,similarity
0,Osséja,"(42.4199665, 1.9771166)",0.080000
1,Font-Romeu-Odeillo-Via,"(42.4912317, 2.0383917)",0.048780
2,Planès,"(42.5009185, 2.1369269)",0.000000
3,Sauto,"(42.5063705, 2.1600389)",0.083333
4,Thuès Carença,"(42.5229486, 2.2227279)",0.062500
...,...,...,...
2862,La Barasse,"(43.2858384, 5.4845101)",0.206897
2863,Rosa Parks,"(48.89602, 2.37397)",0.068966
2864,Nice Pont Michel,"(43.72254, 7.29142)",0.057143
2865,Irigny Yvours,"(45.69002, 4.83229)",0.125000
