In [1]:
# Import libraries
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

import matplotlib.pyplot as plt
import logging

logging.getLogger().setLevel(logging.INFO)

# Set plot parameters
plt.rcParams["figure.figsize"] = (20, 13)
%matplotlib inline
%config InlineBackend.figure_format = "retina"

ModuleNotFoundError: No module named 'pyspark'

# Preprocessing Train

## Analyse des données Train Customers

In [None]:
df_users_training = pd.read_csv('./data/train_customers.csv')

In [None]:
#df_users_training.describe(include="all")
#df_users_training.info(memory_usage="deep")
df_users_training["created_at"] = pd.to_datetime(df_users_training["created_at"])
df_users_training["updated_at"] = pd.to_datetime(df_users_training["updated_at"])
df_users_training.rename(columns={'akeed_customer_id': 'customer_id'}, inplace=True)

# Drop useless columns
df_users_training.drop(['language', 'dob', 'created_at'], axis=1, inplace=True)

#make gender only two values
df_users_training.gender.replace(to_replace=[r'[F|f].*', r'[M|m|?| ].*'], value=['F', 'M'], inplace=True, regex=True)

#take only verified users
df_users_training = df_users_training[df_users_training.verified == 1]

# drop verified column and status column
df_users_training.drop(['verified'], axis=1, inplace=True)
df_users_training.drop(['status'], axis=1, inplace=True)

## Analyse des données Train Locations

In [None]:
df_users_loc_training = pd.read_csv('./data/train_locations.csv')
#df_users_loc_training.describe(include="all")
#df_users_loc_training.info(memory_usage="deep")
# fill missing values from location_type
df_users_loc_training.location_type.fillna('Other', inplace=True)

## Analyse des données Train Orders

In [None]:
df_orders = pd.read_csv('./data/orders.csv')
#df_orders.describe(include='all')
#df_orders.info(memory_usage="deep")
# lier avec customer_id, location_number et location_type
df_orders.promo_code=df_orders.promo_code.fillna('', inplace=True)
df_orders.promo_code=df_orders.promo_code.astype('bool')
df_orders.promo_code_discount_percentage.fillna(0, inplace=True)
df_orders.LOCATION_TYPE.fillna('Other', inplace=True)
df_orders.drop(['delivery_time'], axis=1, inplace=True)
df_orders.rename(columns={'LOCATION_TYPE': 'location_type'}, inplace=True)
df_orders.rename(columns={'LOCATION_NUMBER': 'location_number'}, inplace=True)
# Supprimer les lignes sans order_id
# promo code important ? ou présence d'un promo code important
# promo_code_discount_percentage > promo_code : promo_code_discount_percentage peut être à 0
df_orders.is_favorite.fillna('', inplace=True)
df_orders.is_favorite = df_orders.is_favorite.astype('bool')
df_orders.is_favorite = df_orders.is_favorite.replace({True: 1, False: 0})
df_orders.promo_code.fillna('', inplace=True)
df_orders.promo_code = df_orders.promo_code.astype('bool')
df_orders.promo_code = df_orders.promo_code.replace({True: 1, False: 0})
df_orders.is_rated = df_orders.is_rated.replace({'No': 0, 'Yes': 1})
df_orders.akeed_order_id = df_orders.akeed_order_id.dropna()

In [None]:
df_users_training = pd.merge(df_users_training, df_users_loc_training, on='customer_id', how='left')
df_users_training = df_users_training.dropna()

In [None]:
df_merge_training = pd.merge(df_users_training, df_orders, on=['customer_id', 'location_number', 'location_type'])

In [None]:
def distance_to_rating(distance, min_distance, max_distance, min_rating, max_rating):
    # Normaliser la distance entre 0 et 1
    normalized_distance = (distance - min_distance) / (max_distance - min_distance)

    # Convertir la distance normalisée en note
    rating = normalized_distance * (max_rating - min_rating) + min_rating

    return rating

In [None]:
# Exemple d'utilisation
distances = np.array([1.0, 2.0, 3.0])
min_distance = 1.0
max_distance = 3.0
min_rating = 5.0 # Inverser l'ordre des notes
max_rating = 1.0 # Inverser l'ordre des notes
#create a column in df_merge_training with the rating
df_merge_training['rating'] = distance_to_rating(df_merge_training.deliverydistance, df_merge_training.deliverydistance.min(), df_merge_training.deliverydistance.max(), min_rating, max_rating)

In [None]:
df_matrix_train = df_merge_training[['customer_id', 'location_number', 'vendor_id', 'rating']]
df_matrix_train = df_matrix_train.assign(C=lambda x: x['customer_id'].astype(str) + ' X ' + x['location_number'].astype(str) + ' X ' + x['vendor_id'].astype(str))
df_matrix_train.rename(columns={'C': 'CID X LOC_NUM X VENDOR', 'vendor_id': 'vendor'}, inplace=True)
#remove duplicates
df_matrix_train = df_matrix_train.drop_duplicates(subset=['CID X LOC_NUM X VENDOR'])
df_matrix_train = df_matrix_train[['CID X LOC_NUM X VENDOR', 'vendor', 'rating']]
df_matrix_train

# Conversion de df_train en dataframe spark pour l'ALS

In [None]:
spark = SparkSession.builder.appName("ALSMatrixFactorisation").getOrCreate()

df_training = spark.createDataFrame(df_matrix_train)
df_training.show()

In [None]:
indexer = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in list(set(df_training.columns)-set(['rating'])) ]
pipeline = Pipeline(stages=indexer)
df_training = pipeline.fit(df_training).transform(df_training)
df_training.show()

# Preprocessing Test

## Analyse des données Test Customers

In [None]:
df_users_test = pd.read_csv('./data/test_customers.csv')
df_users_test.describe(include="all")
df_users_test.info(memory_usage="deep")
df_users_test["created_at"] = pd.to_datetime(df_users_test["created_at"])
df_users_test["updated_at"] = pd.to_datetime(df_users_test["updated_at"])
df_users_test.rename(columns={'akeed_customer_id': 'customer_id'}, inplace=True)

# Drop useless columns
df_users_test.drop(['language', 'dob', 'created_at'], axis=1, inplace=True)

#make gender only two values
df_users_test.gender.replace(to_replace=[r'[F|f].*', r'[M|m|?| ].*'], value=['F', 'M'], inplace=True, regex=True)

#take only verified users
df_users_test = df_users_test[df_users_test.verified == 1]


# drop verified column and status column
#df_users_test.drop(['verified'], axis=1, inplace=True)
#df_users_test.drop(['status'], axis=1, inplace=True)

## Analyse des données Test Locations

In [None]:
df_users_loc_test = pd.read_csv('./data/test_locations.csv')
df_users_loc_test.describe(include="all")
df_users_loc_test.info(memory_usage="deep")
# fill missing values from location_type
df_users_loc_test.location_type.fillna('Other', inplace=True)
#df_users_loc_test.location_type.fillna('Other', inplace=True)

In [None]:
df_users_test = pd.merge(df_users_test, df_users_loc_test, on='customer_id', how='left')
df_users_test.rename(columns={'customer_id_index': 'user_id','latitude':'latitude_user','longitude': 'longitude_user'}, inplace=True)

## Analyse des données Test Vendors

In [None]:
df_client = pd.read_csv('./data/vendors.csv')
df_client = df_client[['id', 'latitude', 'longitude']]
df_client.rename(columns={'id': 'vendor_id','latitude': 'latitude_vendor', 'longitude': 'longitude_vendor'}, inplace=True)
df_matrix_test = pd.merge(df_users_test, df_client, how='cross')
#show the first 20 rows
df_matrix_test.head(20)
df_matrix_test.info(memory_usage="deep")

In [None]:
#get deliverydistance and rating
def distance(lat1, lon1, lat2, lon2):
    # rayon de la terre en km
    R = 6371

    # conversion en radian
    lat1 = np.radians(lat1)
    lon1 = np.radians(lon1)
    lat2 = np.radians(lat2)
    lon2 = np.radians(lon2)

    # formule
    dlon = lon2 - lon1
    dlat = lat2 - lat1
    a = np.sin(dlat / 2)**2 + np.cos(lat1) * \
        np.cos(lat2) * np.sin(dlon / 2)**2
    c = 2 * np.arcsin(np.sqrt(a))

    # distance en km
    distance = R * c

    return distance

In [None]:
df_matrix_test['deliverydistance'] = distance(df_matrix_test.latitude_user, df_matrix_test.longitude_user, df_matrix_test.latitude_vendor, df_matrix_test.longitude_vendor)
df_matrix_test['rating'] = distance_to_rating(df_matrix_test.deliverydistance, df_matrix_test.deliverydistance.min(), df_matrix_test.deliverydistance.max(), min_rating, max_rating)
df_matrix_test= df_matrix_test[['customer_id','location_number','vendor_id','rating']]
df_matrix_test = df_matrix_test.assign(C=lambda x: x['customer_id'].astype(str) + ' X ' + x['location_number'].astype(str) + ' X ' + x['vendor_id'].astype(str))
df_matrix_test.rename(columns={'C': 'CID X LOC_NUM X VENDOR', 'vendor_id': 'vendor'}, inplace=True)
#remove duplicates
df_matrix_test = df_matrix_test.drop_duplicates(subset=['CID X LOC_NUM X VENDOR'])
df_matrix_test = df_matrix_test[['CID X LOC_NUM X VENDOR', 'vendor', 'rating']]
df_matrix_test

## Conversion de df_test en dataframe spark pour l'ALS

In [None]:
df_test = spark.createDataFrame(df_matrix_test)
df_test.show()

In [None]:
indexer = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in list(set(df_test.columns)-set(['rating'])) ]
pipeline = Pipeline(stages=indexer)
df_test = pipeline.fit(df_test).transform(df_test)
df_test.show()

# Création du modèle ALS

In [None]:
als = ALS(
    maxIter=5,
    regParam=0.09,
    rank=25,
    userCol="CID X LOC_NUM X VENDOR_index",
    itemCol="vendor_index",
    ratingCol="rating",
    coldStartStrategy="drop",
    nonnegative=True,
)

model = als.fit(df_training)

# Evaluation du modèle

In [None]:
evaluator = RegressionEvaluator(
    metricName="rmse", labelCol="rating", predictionCol="prediction"
)

predictions = model.transform(df_test)
rmse = evaluator.evaluate(predictions)

print("RMSE=" + str(rmse))
predictions.show()

# Recommandation pour tous les utilisateurs

In [None]:
user_recs = model.recommendForAllUsers(20).show(10)