Le bloc de code suivant est utilisé pour l'installation d'Apache Spark, prit de ce notebook:
https://colab.research.google.com/github/groda/big_data/blob/master/Run_Spark_on_Google_Colab.ipynb

In [None]:
import requests
import subprocess
import os
import re
import socket
import shutil
import time
import sys

def run(cmd):
    # run a shell command
    try:
        # Run the command and capture stdout and stderr
        subprocess_output = subprocess.run(cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
        # Access stdout (stderr redirected to stdout)
        stdout_result = subprocess_output.stdout.strip().splitlines()[-1]
        # Process the results as needed
        print(f'✅ {stdout_result}')
        return stdout_result
    except subprocess.CalledProcessError as e:
        # Handle the error if the command returns a non-zero exit code
        print(f"Command failed with return code {e.returncode}")
        print("stdout:", e.stdout)

def is_java_installed():
    return shutil.which("java")

def install_java():
    # Uncomment and modify the desired version
    # java_version= 'openjdk-11-jre-headless'
    # java_version= 'default-jre'
    # java_version= 'openjdk-17-jre-headless'
    # java_version= 'openjdk-18-jre-headless'
    java_version= 'openjdk-19-jre-headless'
    os.environ['JAVA_HOME'] = ' /usr/lib/jvm/java-19-openjdk-amd64'
    print(f"Java not found. Installing {java_version} ... (this might take a while)")
    try:
        cmd = f"apt install -y {java_version}"
        subprocess_output = subprocess.run(cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
        stdout_result = subprocess_output.stdout
        # Process the results as needed
        print(f'✅ Done installing Java {java_version}')
    except subprocess.CalledProcessError as e:
        # Handle the error if the command returns a non-zero exit code
        print(f"Command failed with return code {e.returncode}")
        print("stdout:", e.stdout)

print("\n0️⃣   Install Java if not available")
if is_java_installed():
    print("✅ Java is already installed.")
else:
    install_java()

print("\n1️⃣   Download and install Hadoop and Spark")
# URL for downloading Hadoop and Spark
SPARK_VERSION = "3.5.1"
HADOOP_SPARK_URL = "https://dlcdn.apache.org/spark/spark-" + SPARK_VERSION + \
                   "/spark-" + SPARK_VERSION + "-bin-hadoop3.tgz"
r = requests.head(HADOOP_SPARK_URL)
if r.status_code >= 200 and r.status_code < 400:
    print(f'✅ {HADOOP_SPARK_URL} was found')
else:
    SPARK_CDN = "https://dlcdn.apache.org/spark/"
    print(f'⚠️ {HADOOP_SPARK_URL} was NOT found. \nCheck for available Spark versions in {SPARK_CDN}')

# set some environment variables
os.environ['SPARK_HOME'] = os.path.join(os.getcwd(), os.path.splitext(os.path.basename(HADOOP_SPARK_URL))[0])
os.environ['PATH'] = ':'.join([os.path.join(os.environ['SPARK_HOME'], 'bin'), os.environ['PATH']])
os.environ['PATH'] = ':'.join([os.path.join(os.environ['SPARK_HOME'], 'sbin'), os.environ['PATH']])

# download Spark
# using --no-clobber option will prevent wget from downloading file if already present
# shell command: wget --no-clobber $HADOOP_SPARK_URL
cmd = f"wget --no-clobber {HADOOP_SPARK_URL}"
run(cmd)

# uncompress
try:
    # Run the command and capture stdout and stderr
    cmd = "([ -d $(basename {0}|sed 's/\.[^.]*$//') ] && echo -n 'Folder already exists') || (tar xzf $(basename {0}) && echo 'Uncompressed Spark distribution')"
    subprocess_output = subprocess.run(cmd.format(HADOOP_SPARK_URL), shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
    # Access stdout (stderr redirected to stdout)
    stdout_result = subprocess_output.stdout
    # Process the results as needed
    print(f'✅ {stdout_result}')

except subprocess.CalledProcessError as e:
    # Handle the error if the command returns a non-zero exit code
    print(f"Command failed with return code {e.returncode}")
    print("stdout:", e.stdout)


print("\n2️⃣   Start Spark engine")
# start master
# shell command: $SPARK_HOME/sbin/start-master.sh
cmd = os.path.join(os.environ['SPARK_HOME'], 'sbin', 'stop-master.sh')
run(cmd)
cmd = os.path.join(os.environ['SPARK_HOME'], 'sbin', 'start-master.sh')
out = run(cmd)

# start one worker (first stop it in case it's already running)
# shell command: $SPARK_HOME/sbin/start-worker.sh spark://${HOSTNAME}:7077
cmd = [os.path.join(os.environ['SPARK_HOME'], 'sbin', 'stop-worker.sh')]
run(cmd)
cmd = os.path.join(os.environ['SPARK_HOME'], 'sbin', 'start-worker.sh') + ' ' + 'spark://'+socket.gethostname()+':7077'
run(cmd)

print("\n3️⃣   Start Master Web UI")
# get master UI's port number
# the subprocess that's starting the master with start-master.sh
# might still not be ready with assigning the port number at this point
# therefore we check the logfile a few times (attempts=5) to see if the port
# has been assigned. This might take 1-2 seconds.

master_log = out.partition("logging to")[2].strip()
print("Search for port number in log file {}".format(master_log))
attempts = 10
search_pattern = "Successfully started service 'MasterUI' on port (\d+)"
found = False
for i in range(attempts):
  if not found:
   with open(master_log) as log:
      found = re.search(search_pattern, log.read())
      if found:
          webUIport = found.group(1)
          print(f"✅ Master UI is available at localhost:{webUIport} (attempt nr. {i})")
          break
      else:
          time.sleep(2) # need to try until port information is found in the logfile
          i+=1
if not found:
  print("Could not find port for Master Web UI\n")

IN_COLAB = 'google.colab' in sys.modules
if IN_COLAB:
    # serve the Web UI on Colab
    print("Click on the link below to open the Spark Web UI 🚀")
    from google.colab import output
    output.serve_kernel_port_as_window(webUIport)

print("\n4️⃣   Start history server")
# start history server
# shell command: mkdir -p /tmp/spark-events
# shell command: $SPARK_HOME/sbin/start-history-server.sh
spark_events_dir = os.path.join('/tmp', 'spark-events')
if not os.path.exists(spark_events_dir):
    os.mkdir(spark_events_dir)
cmd = os.path.join(os.environ['SPARK_HOME'], 'sbin', 'stop-history-server.sh')
run(cmd)
cmd = os.path.join(os.environ['SPARK_HOME'], 'sbin', 'start-history-server.sh')
run(cmd)

if IN_COLAB:
    # serve the History Server
    print("Click on the link below to open the Spark History Server Web UI 🚀")
    output.serve_kernel_port_as_window(18080)



0️⃣   Install Java if not available
✅ Java is already installed.

1️⃣   Download and install Hadoop and Spark
✅ https://dlcdn.apache.org/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz was found
✅ File ‘spark-3.5.1-bin-hadoop3.tgz’ already there; not retrieving.
✅ Folder already exists

2️⃣   Start Spark engine
✅ stopping org.apache.spark.deploy.master.Master
✅ starting org.apache.spark.deploy.master.Master, logging to /content/spark-3.5.1-bin-hadoop3/logs/spark--org.apache.spark.deploy.master.Master-1-05f026f3848f.out
✅ stopping org.apache.spark.deploy.worker.Worker
✅ starting org.apache.spark.deploy.worker.Worker, logging to /content/spark-3.5.1-bin-hadoop3/logs/spark--org.apache.spark.deploy.worker.Worker-1-05f026f3848f.out

3️⃣   Start Master Web UI
Search for port number in log file /content/spark-3.5.1-bin-hadoop3/logs/spark--org.apache.spark.deploy.master.Master-1-05f026f3848f.out
✅ Master UI is available at localhost:8081 (attempt nr. 2)
Click on the link below to open the Spark

<IPython.core.display.Javascript object>


4️⃣   Start history server
✅ stopping org.apache.spark.deploy.history.HistoryServer
✅ starting org.apache.spark.deploy.history.HistoryServer, logging to /content/spark-3.5.1-bin-hadoop3/logs/spark--org.apache.spark.deploy.history.HistoryServer-1-05f026f3848f.out
Click on the link below to open the Spark History Server Web UI 🚀


<IPython.core.display.Javascript object>

Installation de PySpark, l'API Python pour Spark

In [None]:
!pip install pyspark



Importation de tous les packages nécessaires

In [None]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, DoubleType, StringType
from pyspark.ml.feature import StringIndexer
import random
import json
import gzip
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
import itertools

Connexion à la session Google Drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


Démarrage de la session PySpark

In [None]:
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

Spécification du chemin pour le jeu de données

In [None]:
file_path = "/content/drive/MyDrive/RECsys/RECsys_Datafiles/Industrial_and_Scientific_5.json.gz"

Création de la fonction pour lire et décompresser le fichier de données


In [None]:
def readJSON(file_path, inChunks, percentage):
    """
    The readJSON function decompresses a .gz file containing list of Json
    onjects and reads in chunks when the inChunks parameter is set to True.

    :file_path: The path of the file to be read.
    :inChunks:  If True, reads the file in chunks based on the percentage
    parameter.
                If False, reads the entire file.
    :percentage: The total percentage to read from the file if inChunks is True.
    :return: returns a list of Json objects.
    """

    json_objects = []

    if (inChunks is False):
        precentage = 1

    with gzip.open(file_path, 'rt') as f:
        total_lines = sum(1 for line in f)
        fraction = round(total_lines * percentage)

    with gzip.open(file_path, 'rt') as f:
        for i, line in enumerate(f):
            if i > fraction:
              break
            try:
              json_data = json.loads(line)
              json_objects.append(json_data)
            except json.JSONDecodeError:
              print(f"Error parsing line: {line}")

    return json_objects

Lecture des fichiers : nous n'avons pas utilisé le mode "inChunks" car nous n'avons pas utilisé de grands ensembles de données, mais la fonction sera utile pour les cas d'utilisation futurs.

In [None]:
read_json = readJSON(file_path, inChunks=False, percentage=1)

Conversion du tableau d'objets JSON en un dataframe pandas.

Le choix de convertir d'abord en un dataframe pandas était nécessaire car le dataframe Spark avait des problèmes pour lire les données car certaines valeurs contenaient des virgules.

In [None]:
df1 = pd.DataFrame.from_records(read_json)

Ici, nous supprimons l'ancien tableau JSON, juste pour libérer plus d'espace mémoire. Cela n'aura pas beaucoup d'importance compte tenu de la taille de l'ensemble de données actuellement utilisé.

In [None]:
del read_json

In [None]:
df1.head()

Unnamed: 0,overall,verified,reviewTime,reviewerID,asin,style,reviewerName,reviewText,summary,unixReviewTime,vote,image
0,5.0,True,"11 27, 2017",A1JB7HFWHRYHT7,B0000223SI,{'Size:': ' 1-(Pack)'},Alex W.,This worked really well for what I used it for...,Couldn't have been happier with it's performance,1511740800,,
1,5.0,True,"11 4, 2017",A2FCLJG5GV8SD6,B0000223SI,{'Size:': ' 1-(Pack)'},Randall Harris,Fast cutting and good adheasive.,Good paper.,1509753600,,
2,5.0,False,"10 27, 2017",A3IT9B33NWYQSL,B0000223SI,{'Size:': ' 1-(Pack)'},A. C.,Worked great for my lapping bench. I would li...,Handy!,1509062400,,
3,4.0,True,"01 13, 2018",AUL5LCV4TT73P,B0000223SK,{'Size:': ' 1-Pack'},TnT,As advertised,As advertised,1515801600,,
4,5.0,True,"10 7, 2017",A1V3I3L5JKO7TM,B0000223SK,{'Size:': ' 1-Pack'},John Jones,seems like a pretty good value as opposed to b...,seems like a pretty good value as opposed to b...,1507334400,,


Suppression des colonnes inutiles

In [None]:
df1.drop(columns=['verified', 'reviewTime', 'style', 'reviewerName', 'reviewText',
				  'summary', 'unixReviewTime', 'vote', 'image'], inplace=True)

In [None]:
df1.dtypes

overall       float64
reviewerID     object
asin           object
dtype: object

Nous définissons ici le schéma pour le dataframe de Spark, puis nous en créons un.

In [None]:
schema = StructType([
    StructField("overall", DoubleType(), True),
    StructField("reviewerID", StringType(), True),
    StructField("asin", StringType(), True)
])

data = spark.createDataFrame(df1, schema=schema)

Voici une partie des données

In [None]:
data.show()

+-------+--------------+----------+
|overall|    reviewerID|      asin|
+-------+--------------+----------+
|    5.0|A1JB7HFWHRYHT7|B0000223SI|
|    5.0|A2FCLJG5GV8SD6|B0000223SI|
|    5.0|A3IT9B33NWYQSL|B0000223SI|
|    4.0| AUL5LCV4TT73P|B0000223SK|
|    5.0|A1V3I3L5JKO7TM|B0000223SK|
|    5.0|A20X7NCNZ7T5ZK|B0000223SK|
|    5.0|A3OBWQ8DTRLW2Q|B0000223SI|
|    5.0|A398INYG0ZBUZB|B0000223SK|
|    5.0| AEBM08OO8Y9BJ|B0000223SK|
|    4.0|A358U1JEA514P6|B0000223SI|
|    5.0|A1Z584ZH824BU1|B0000223SI|
|    5.0|A3OBWQ8DTRLW2Q|B0000223SK|
|    5.0|A2B40VHCBDLC43|B0000223SK|
|    5.0|A34O4UAC27ECL6|B0000223SI|
|    5.0| ACDP5UBE4ZW3T|B0000223SI|
|    3.0|A11MN6521EQ9QD|B0000223SK|
|    4.0|A358U1JEA514P6|B0000223SK|
|    4.0| AAEPD6U1H2X37|B0000223SK|
|    5.0|A1ICJY2HU1QLS1|B0000223SK|
|    4.0|A1XUBWNW0UFITU|B0000223SK|
+-------+--------------+----------+
only showing top 20 rows



Nous allons essayer d'imprimer le schéma ici

In [None]:
data.printSchema()

root
 |-- overall: double (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- asin: string (nullable = true)



Le bloc de code suivant a été utilisé pour renommer les noms de colonnes par d'autres plus utiles

In [None]:
data = data.withColumnRenamed("overall", "rating").withColumnRenamed("reviewerID", "userID").withColumnRenamed("asin", "productID")

In [None]:
data.show()

+------+--------------+----------+
|rating|        userID| productID|
+------+--------------+----------+
|   5.0|A1JB7HFWHRYHT7|B0000223SI|
|   5.0|A2FCLJG5GV8SD6|B0000223SI|
|   5.0|A3IT9B33NWYQSL|B0000223SI|
|   4.0| AUL5LCV4TT73P|B0000223SK|
|   5.0|A1V3I3L5JKO7TM|B0000223SK|
|   5.0|A20X7NCNZ7T5ZK|B0000223SK|
|   5.0|A3OBWQ8DTRLW2Q|B0000223SI|
|   5.0|A398INYG0ZBUZB|B0000223SK|
|   5.0| AEBM08OO8Y9BJ|B0000223SK|
|   4.0|A358U1JEA514P6|B0000223SI|
|   5.0|A1Z584ZH824BU1|B0000223SI|
|   5.0|A3OBWQ8DTRLW2Q|B0000223SK|
|   5.0|A2B40VHCBDLC43|B0000223SK|
|   5.0|A34O4UAC27ECL6|B0000223SI|
|   5.0| ACDP5UBE4ZW3T|B0000223SI|
|   3.0|A11MN6521EQ9QD|B0000223SK|
|   4.0|A358U1JEA514P6|B0000223SK|
|   4.0| AAEPD6U1H2X37|B0000223SK|
|   5.0|A1ICJY2HU1QLS1|B0000223SK|
|   4.0|A1XUBWNW0UFITU|B0000223SK|
+------+--------------+----------+
only showing top 20 rows



Les deux cellules suivantes sont très nécessaires, car les modèles ALS n'acceptent pas les ID d'utilisateurs et d'articles non numériques, nous convertissons donc les ID en d'autres numériques.

In [None]:
indexer = StringIndexer(inputCol="userID", outputCol="userIDIndex")
data = indexer.fit(data).transform(data)

In [None]:
indexer = StringIndexer(inputCol="productID", outputCol="productIDIndex")
data = indexer.fit(data).transform(data)

In [None]:
data.show()

+------+--------------+----------+-----------+--------------+
|rating|        userID| productID|userIDIndex|productIDIndex|
+------+--------------+----------+-----------+--------------+
|   5.0|A1JB7HFWHRYHT7|B0000223SI|     1078.0|        1465.0|
|   5.0|A2FCLJG5GV8SD6|B0000223SI|     8222.0|        1465.0|
|   5.0|A3IT9B33NWYQSL|B0000223SI|     5689.0|        1465.0|
|   4.0| AUL5LCV4TT73P|B0000223SK|     1371.0|         720.0|
|   5.0|A1V3I3L5JKO7TM|B0000223SK|     7535.0|         720.0|
|   5.0|A20X7NCNZ7T5ZK|B0000223SK|     7742.0|         720.0|
|   5.0|A3OBWQ8DTRLW2Q|B0000223SI|     5800.0|        1465.0|
|   5.0|A398INYG0ZBUZB|B0000223SK|      153.0|         720.0|
|   5.0| AEBM08OO8Y9BJ|B0000223SK|     1814.0|         720.0|
|   4.0|A358U1JEA514P6|B0000223SI|     3527.0|        1465.0|
|   5.0|A1Z584ZH824BU1|B0000223SI|     7669.0|        1465.0|
|   5.0|A3OBWQ8DTRLW2Q|B0000223SK|     5800.0|         720.0|
|   5.0|A2B40VHCBDLC43|B0000223SK|     4949.0|         720.0|
|   5.0|

Imprimons une ligne spécifique

In [None]:
specific_line = data.limit(1).collect()[0]

In [None]:
specific_line

Row(rating=5.0, userID='A1JB7HFWHRYHT7', productID='B0000223SI', userIDIndex=1078.0, productIDIndex=1465.0)

Le nombre total de lignes disponibles

In [None]:
num_rows = data.count()
num_rows

77071

Ensuite, nous allons supprimer les valeurs nulles et les doublons

In [None]:
data = data.dropna().drop_duplicates()

Le nombre de points de données restants

In [None]:
num_rows = data.count()
num_rows

72131

Nous allons maintenant diviser l'ensemble de données en une répartition train/test

In [None]:
(train, test) = data.randomSplit([0.7, 0.3])

La cellule suivante contient le code nécessaire pour l'appel initial d'ALS

In [None]:
als = ALS(maxIter=11, regParam=0.01, userCol="userIDIndex", itemCol="productIDIndex", ratingCol="rating",
          coldStartStrategy="drop")

Ajustement du modèle sur l'ensemble de données du train


In [None]:
model = als.fit(train)

Faire des prédictions pour le test Dataset

In [None]:
predictions = model.transform(test)

Evaluation des prédictions initials

In [None]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 4.751013689962461


Utiliser Gridsearch pour voir les meilleurs paramètres pour notre ALS model

In [None]:
# Définir la grille des paramètres pour la recherche
param_grid = {
    "maxIter": [7, 12, 17],  # Nombre maximum d'itérations
    "regParam": [0.01, 0.05, 0.1],  # Paramètre de régularisation
    "rank": [5, 10, 15]  # Rang des facteurs latents
}

# Définir les métriques d'évaluation
metrics = ['rmse', 'mae', 'r2']
evaluators = {metric: RegressionEvaluator(metricName=metric, labelCol="rating", predictionCol="prediction") for metric in metrics}

# Initialiser les meilleurs modèles et scores pour chaque métrique
best_models = {metric: None for metric in metrics}
best_scores = {metric: float("inf") if metric != 'r2' else float("-inf") for metric in metrics}

# Tester chaque combinaison de paramètres
for params in itertools.product(*param_grid.values()):
    param_dict = dict(zip(param_grid.keys(), params))  # Créer un dictionnaire de paramètres
    als = ALS(maxIter=param_dict["maxIter"], regParam=param_dict["regParam"], rank=param_dict["rank"],
              userCol="userIDIndex", itemCol="productIDIndex", ratingCol="rating", coldStartStrategy="drop")

    model = als.fit(train)  # Entraîner le modèle sur les données d'entraînement
    predictions = model.transform(test)  # Faire des prédictions sur les données de test

    # Évaluer le modèle pour chaque métrique
    for metric, evaluator in evaluators.items():
        score = evaluator.evaluate(predictions)
        # Mettre à jour les meilleurs scores et modèles si nécessaire
        if (metric != 'r2' and score < best_scores[metric]) or (metric == 'r2' and score > best_scores[metric]):
            best_scores[metric] = score
            best_models[metric] = model

# Afficher les meilleurs paramètres et scores pour chaque métrique
for metric in metrics:
    print(f"Meilleurs paramètres pour {metric}: {best_models[metric]._java_obj.parent().extractParamMap()}")
    print(f"Meilleur {metric}: {best_scores[metric]}")


Sauvegarder le model pour le déploiment

In [None]:
best_models['rmse'].save("/content/drive/MyDrive/RECsys/RECsys_Datafiles/als_model")