In [29]:
!pip install pyspark



# Nouvelle section

In [31]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
import pandas as pd
import configparser
import folium

In [32]:
#definir la fonction
# Cette fonction retourne une colonne sous forme d'une liste d'une dataframe spark
def get_column(data,name):
    return data.select(name).rdd.flatMap(lambda x: x).collect()

# Ajout d'un marquer sur la map selon le groupe
def add_city(row):
    if row['grp']==0:
        col='green'
    elif row['grp']==1:
        col='red'
    else:
        col='blue'
    folium.Marker(
        location= [row['lat'],row['lon']],
        popup=folium.Popup(row['add']),
        icon=folium.Icon(color=col),
    ).add_to(maps)

In [33]:
# instancierr le lien to spark
spark = SparkSession.builder \
    .master('local') \
    .appName('Bristol-City-bike') \
    .getOrCreate()

In [34]:
#utiliser le ficheier de path
config = configparser.ConfigParser()
config.read('properties.conf')

['properties.conf']

In [None]:
# creer le fichier proprieties
path_to_input_data = config['Bristol-City-bike']['Input-data']
path_to_output_data = config['Bristol-City-bike']['Output-data']
num_partition_kmeans = int(config['Bristol-City-bike']['Kmeans-level'])

In [35]:
# importer le fichier json avec spark
bristol = spark\
            .read\
            .json(path_to_input_data)

In [None]:
# afficher le premier tableau
bristol.show()

+--------------------+----------+----------+--------------------+------+
|             address|  latitude| longitude|                name|number|
+--------------------+----------+----------+--------------------+------+
|Lower River Tce /...|-27.482279|153.028723|122 - LOWER RIVER...|   122|
|Main St / Darragh St| -27.47059|153.036046|91 - MAIN ST / DA...|    91|
|Sydney St Ferry T...|-27.474531|153.042728|88 - SYDNEY ST FE...|    88|
|Browne St / James St|-27.461881|153.046986|75 - BROWNE ST / ...|    75|
|Kurilpa Point / M...|-27.469658|153.016696|98 - KURILPA POIN...|    98|
|Montague Rd / Ski...| -27.48172| 153.00436|109 - MONTAGUE RD...|   109|
|Macquarie St / Gu...|-27.493626|153.001482|149 - MACQUARIE S...|   149|
|Bi-centennial Bik...|-27.476076|153.002459|139 - BI-CENTENNI...|   139|
|Sir William McGre...|-27.493963|153.011938|24 - SIR WILLIAM ...|    24|
|Vulture St / Trib...|-27.482197|153.020894|117 - VULTURE ST ...|   117|
|Lamington St / Re...|-27.465226|153.050864|73 - LA

 Création d'un nouveau data frame Kmeans-df contenant seulement les variables:¶

*   longitude  
*   lattitude








In [36]:
# tableau de lattitide, longitude
kmeans_df = bristol.select("latitude","longitude")
kmeans_df.show()

+----------+----------+
|  latitude| longitude|
+----------+----------+
|-27.482279|153.028723|
| -27.47059|153.036046|
|-27.474531|153.042728|
|-27.461881|153.046986|
|-27.469658|153.016696|
| -27.48172| 153.00436|
|-27.493626|153.001482|
|-27.476076|153.002459|
|-27.493963|153.011938|
|-27.482197|153.020894|
|-27.465226|153.050864|
|-27.468447|153.024662|
|-27.473021|153.025988|
|-27.457825|153.036866|
| -27.48148| 153.02368|
|-27.467464|153.022094|
|-27.499963|153.017633|
|-27.490776|152.994747|
|-27.458199|153.041688|
|-27.481808|153.025477|
+----------+----------+
only showing top 20 rows



In [37]:
# Kmeans 
features = ('longitude','latitude')
kmeans = KMeans()\
            .setK(num_partition_kmeans)\
            .setSeed(1)
assembler = VectorAssembler(inputCols = features, outputCol = "features")
dataset = assembler\
            .transform(kmeans_df)
model = kmeans\
            .fit(dataset)
fitted = model\
            .transform(dataset)

In [38]:
# les noms de colonnes
fitted.columns

['latitude', 'longitude', 'features', 'prediction']

On a bien vérifié qu’il s’agit de longitude, latitude, features, predictions.¶


Détermination des longitudes et latitudes moyennes pour chaque groupe en utilisant spark DSL et SQL

*   Méthode DSL




In [None]:
meanByGroup = fitted\
                .groupBy(fitted.prediction)\
                .agg(mean('latitude').alias('LatMoyenne'),
                     mean('longitude').alias('LongMoyenne')
                     )\
                .orderBy('prediction')
meanByGroup.show()

+----------+-------------------+------------------+
|prediction|         LatMoyenne|       LongMoyenne|
+----------+-------------------+------------------+
|         0|-27.481218536585374|153.00572882926832|
|         1|-27.460240636363633|153.04186302272726|
|         2| -27.47255990624999|   153.02594553125|
+----------+-------------------+------------------+





*   Méthode SQL




In [39]:
fitted.createOrReplaceTempView("fittedSQL")
spark.sql("""
    select prediction,
           mean(latitude) as LatMoyenne,
           mean(longitude) as LongMoyenne
    from fittedSQL
    group by prediction
    order by prediction
""").show()


+----------+-------------------+------------------+
|prediction|         LatMoyenne|       LongMoyenne|
+----------+-------------------+------------------+
|         0|-27.481218536585374|153.00572882926832|
|         1|-27.460240636363633|153.04186302272726|
|         2| -27.47255990624999|   153.02594553125|
+----------+-------------------+------------------+



Comparison des résultats
 

*   Nous trouvons les mêmes résultats avec les méthodes DSL et SQL.



In [40]:
#Création de la dataframe nécessaire pour la carte
groups = get_column(fitted, 'prediction')
latitudes = get_column(bristol,'latitude')
longitudes = get_column(bristol, 'longitude')
addresses = get_column(bristol, 'address')

df = pd.DataFrame({ 'add' : addresses,
                    'lat' : latitudes,
                    'lon' : longitudes,
                    'grp' : groups})

In [41]:
#Initiation d'une carte vide dans la ville de BRISBANE
g = [df['lat'].mean(), df['lon'].mean()]
maps = folium\
        .Map(location=g,
               zoom_start=13)

In [42]:
# ajouter des margeurs
for index, row in df.iterrows():
    add_city(row)

In [43]:
# ajouter des centres  de chaque grou^pes
gData = meanByGroup.toPandas()

for index, row in gData.iterrows():
    folium\
        .Marker(
            location= [row['LatMoyenne'],row['LongMoyenne']],
            popup=folium.Popup(f"Center of group {int(row['prediction'])}"),
            icon=folium.Icon(color='beige'),
        ).add_to(maps)

In [44]:
# Afficher la carte
maps
  

In [None]:
# enregistrer ma carte 
maps.save("output/Bristol-City-bike.html")

In [None]:
#  Exportation de la data frame fitted après élimination de la colonne features, dans le répertoire associé à la vairiable path_to_output_data
fitted\
    .drop('features')\
    .write\
    .format("csv")\
    .mode("overwrite")\
    .save(path_to_output_data, header = 'true')

In [None]:
# arreter la session de pyspark
spark.stop()