### Importer les packages nécessaires

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
import pandas as pd
import configparser
import folium

### Définir les fonctions nécessaires

In [2]:
# Cette fonction retourne une colonne sous forme d'une liste d'une dataframe spark
def get_column(data,name):
    return data.select(name).rdd.flatMap(lambda x: x).collect()

# Ajout d'un marquer sur la map selon le groupe
def add_city(row):
    if row['grp']==0:
        col='green'
    elif row['grp']==1:
        col='red'
    else:
        col='blue'
    folium.Marker(
        location= [row['lat'],row['lon']],
        popup=folium.Popup(row['add']),
        icon=folium.Icon(color=col),
    ).add_to(maps)

### 1- Instancier le client Spark Session

In [3]:
spark = SparkSession.builder \
    .master('local') \
    .appName('Bristol-City-bike') \
    .getOrCreate()

### 2- Utiliser le fichier de configuration pour récupérer les path

In [4]:
config = configparser.ConfigParser()
config.read('properties.conf')

['properties.conf']

#### Créer des variables à partir du fichier properties.conf

In [5]:
path_to_input_data = config['Bristol-City-bike']['Input-data']
path_to_output_data = config['Bristol-City-bike']['Output-data']
num_partition_kmeans = int(config['Bristol-City-bike']['Kmeans-level'])

### 3- Importer le json avec spark 

In [6]:
bristol = spark\
            .read\
            .json(path_to_input_data)

#### Afficher les 3 premières lignes

In [7]:
bristol.show(3)

+--------------------+----------+----------+--------------------+------+
|             address|  latitude| longitude|                name|number|
+--------------------+----------+----------+--------------------+------+
|Lower River Tce /...|-27.482279|153.028723|122 - LOWER RIVER...|   122|
|Main St / Darragh St| -27.47059|153.036046|91 - MAIN ST / DA...|    91|
|Sydney St Ferry T...|-27.474531|153.042728|88 - SYDNEY ST FE...|    88|
+--------------------+----------+----------+--------------------+------+
only showing top 3 rows



### 4- Création d'un nouveau data frame Kmeans-df contenant seulement les variables:
* latitude 
* longitude

In [8]:
kmeans_df = bristol.select("latitude","longitude")
kmeans_df.show(3)

+----------+----------+
|  latitude| longitude|
+----------+----------+
|-27.482279|153.028723|
| -27.47059|153.036046|
|-27.474531|153.042728|
+----------+----------+
only showing top 3 rows



### 5- KMeans

In [9]:
features = ('longitude','latitude')
kmeans = KMeans()\
            .setK(num_partition_kmeans)\
            .setSeed(1)
assembler = VectorAssembler(inputCols = features, outputCol = "features")
dataset = assembler\
            .transform(kmeans_df)
model = kmeans\
            .fit(dataset)
fitted = model\
            .transform(dataset)

### 6- Les noms des colonnes de fitted ? 

In [10]:
fitted.columns

['latitude', 'longitude', 'features', 'prediction']

#### ==> On a bine vérifié qu’il s’agit de longitude, latitude, features, predictions.

### 7- Détermination des longitudes et latitudes moyennes pour chaque groupe en utilisant spark DSL et SQL

* Méthode DSL

In [11]:
meanByGroup = fitted\
                .groupBy(fitted.prediction)\
                .agg(mean('latitude').alias('LatMoyenne'),
                     mean('longitude').alias('LongMoyenne')
                     )\
                .orderBy('prediction')
meanByGroup.show()

+----------+-------------------+------------------+
|prediction|         LatMoyenne|       LongMoyenne|
+----------+-------------------+------------------+
|         0|-27.481218536585374|153.00572882926832|
|         1|-27.460240636363633|153.04186302272726|
|         2| -27.47255990624999|   153.02594553125|
+----------+-------------------+------------------+



* Méthode SQL

In [12]:
fitted.createOrReplaceTempView("fittedSQL")
spark.sql("""
    select prediction,
           mean(latitude) as LatMoyenne,
           mean(longitude) as LongMoyenne
    from fittedSQL
    group by prediction
    order by prediction
""").show()

+----------+-------------------+------------------+
|prediction|         LatMoyenne|       LongMoyenne|
+----------+-------------------+------------------+
|         0|-27.481218536585374|153.00572882926832|
|         1|-27.460240636363633|153.04186302272726|
|         2| -27.47255990624999|   153.02594553125|
+----------+-------------------+------------------+



### Comparison des résultats

####       ==> Nous trouvons les mêmes résultats avec les méthodes DSL et SQL.

### 8- Visualisation dans une carte avec le package folium
#### NB: La ville est en Australie et s’appelle BRISBANE et non pas BRISTOLE

#### Création de la dataframe nécessaire pour la carte

In [13]:
groups = get_column(fitted, 'prediction')
latitudes = get_column(bristol,'latitude')
longitudes = get_column(bristol, 'longitude')
addresses = get_column(bristol, 'address')

df = pd.DataFrame({ 'add' : addresses,
                    'lat' : latitudes,
                    'lon' : longitudes,
                    'grp' : groups})

#### Initiation d'une carte vide dans la ville de BRISBANE

In [14]:
g = [df['lat'].mean(), df['lon'].mean()]
maps = folium\
        .Map(location=g,
               zoom_start=13)

#### Ajout des marqueurs

In [15]:
for index, row in df.iterrows():
    add_city(row)

#### Ajout des centres de chaque groupe (des marqueurs beiges)

In [16]:
gData = meanByGroup.toPandas()

for index, row in gData.iterrows():
    folium\
        .Marker(
            location= [row['LatMoyenne'],row['LongMoyenne']],
            popup=folium.Popup(f"Center of group {int(row['prediction'])}"),
            icon=folium.Icon(color='beige'),
        ).add_to(maps)

### Affichage de la carte

In [17]:
maps

In [21]:
maps.save("output/Bristol-City-bike.html")

### 9- Exportation de la data frame fitted après élimination de la colonne  features, dans le répertoire  associé  à la vairiable path_to_output_data

In [18]:
fitted\
    .drop('features')\
    .write\
    .format("csv")\
    .mode("overwrite")\
    .save(path_to_output_data, header = 'true')

### Arrêt de la session spark

In [19]:
spark.stop()