In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, when, col, round, abs
spark = SparkSession.builder \
    .appName("OpenFoodFactsAnalysis") \
    .getOrCreate()


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
!ls "/content/drive/My Drive/Colab Notebooks/"


fr.openfoodfacts.org.products.csv  Ofoodfacts.ipynb  Untitled0.ipynb


In [None]:
df = spark.read.csv("/content/drive/My Drive/Colab Notebooks/fr.openfoodfacts.org.products.csv", sep='\t', header=True, inferSchema=True)

In [None]:
df_clean.printSchema()

root
 |-- code: double (nullable = true)
 |-- url: string (nullable = true)
 |-- creator: string (nullable = true)
 |-- created_t: integer (nullable = true)
 |-- created_datetime: timestamp (nullable = true)
 |-- last_modified_t: integer (nullable = true)
 |-- last_modified_datetime: timestamp (nullable = true)
 |-- last_modified_by: string (nullable = true)
 |-- last_updated_t: integer (nullable = true)
 |-- last_updated_datetime: timestamp (nullable = true)
 |-- product_name: string (nullable = true)
 |-- abbreviated_product_name: string (nullable = true)
 |-- generic_name: string (nullable = true)
 |-- quantity: string (nullable = true)
 |-- packaging: string (nullable = true)
 |-- packaging_tags: string (nullable = true)
 |-- packaging_fr: string (nullable = true)
 |-- packaging_text: string (nullable = true)
 |-- brands: string (nullable = true)
 |-- brands_tags: string (nullable = false)
 |-- brands_fr: string (nullable = true)
 |-- categories: string (nullable = true)
 |-- categ

<small>

**Signification de quelques colonnes de cette dataset OPEN FOOD FACTS**

**code** : Code-barres du produit (unique).  
**product_name** : Nom du produit.  
**brands** : Marques du produit (ex : Danone, Coca-Cola).  

**categories** : Catégories alimentaires (ex : "Boissons", "Snacks", "Céréales du petit-déjeuner").  

**countries** : Pays où le produit est vendu.  
**ingredients_text** : Liste des ingrédients (texte brut).  
**allergens** : Allergènes présents (ex : gluten, lait).

**traces** : Traces possibles d'allergènes (ex : "Peut contenir des traces de noix").  

**nutrition_grade_fr** : Score nutritionnel de A à E (A = meilleur).  
**energy_100g** : Énergie pour 100 g (en kJ).  
**fat_100g** : Lipides (gras) pour 100 g.  

**saturated-fat_100g** : Gras saturés pour 100 g. Trop de graisses saturées est mauvais pour la santé (ex: maladies cardio-vasculaires).

**carbohydrates_100g** : Glucides pour 100 g. Important pour les régimes ou le contrôle glycémique.

**sugars_100g** : Sucres pour 100 g. Détection des produits "riches en sucres" (attention au diabète !).  

**fiber_100g** : Fibres alimentaires pour 100 g. Important pour la digestion.

**proteins_100g** : Protéines pour 100 g.

**salt_100g** : Sel pour 100 g. Trop de sel = hypertension artérielle possible.

**sodium_100g** : Sodium pour 100 g.

**additives_tags** : Additifs alimentaires (codes comme E330, E100, etc.).  

**nova_group** : Degré de transformation alimentaire (de 1 = peu transformé à 4 = ultra-transformé).  

**packaging** : Type d'emballage (ex : boîte, bouteille plastique). Utile pour les analyses environnementales (ex : plastique vs carton).

**url** : Lien vers la fiche du produit sur Open Food Facts.

</small>


In [None]:
total_rows = df.count()
print("le nombre total des lignes de ce data set est :", total_rows)

le nombre total des lignes de ce data set est : 3809945


In [None]:
print("le nombre des valeures nulls dans chaque colonne d'un dataset: ")
null_count= df.select([count(when(col(c).isNull(),c).cast("int")).alias(c) for c in df.columns])
null_count.show()

In [None]:
# nonnull_percentage_df = null_count.withColumn(
#     "nonnull_percentage",
#     round((1 - (col("null_count") / total_rows)) * 100, 2)
# )
# nonnull_percentage_df.show()

In [None]:
print("Calculer le nombre des lignes dubliquées : ")
duplicated_rows = df.groupBy(df.columns).count().filter("count > 1")
duplicated_rows.show()

<small>

**ça veut dire on a aucunne ligne dubliquée dans ce jeu de données Open food facts**

</small>

In [None]:
print("En cherchant les valeurs incoherentes de l'energie sur 100g :")
df.filter(col("energy_100g") < 0).show(truncate=False)


<small>

**ça veut dire il y a aucune valeure incoherente de l'energie sur 100g** (C'est impossible d'avoir une energie negative)

</small>

In [None]:
df_energy_check = df.withColumn("kcal_theorique", col("energy_100g") / 4.184)\
                    .withColumn("diff_incoherente", abs(col("kcal_theorique") - col("energy-kcal_100g")) > 1)
df_incoherents = df_energy_check.filter(col("diff_incoherente") == True)
df_incoherents.select("energy_100g", "energy-kcal_100g", "kcal_theorique").show(truncate=False)

print("Le nombre de ligne qui ne verifie pas la regle de la conversion entre le kj et le kcal est : ",total_rows - df_incoherents.count())

+-----------+----------------+------------------+
|energy_100g|energy-kcal_100g|kcal_theorique    |
+-----------+----------------+------------------+
|392.0      |141.0           |93.69024856596558 |
|2401.0     |100.0           |573.8527724665391 |
|1620.0     |376.0           |387.189292543021  |
|1510.0     |358.0           |360.89866156787764|
|1852.0     |428.0           |442.6386233269598 |
|1917.0     |32.0            |458.1739961759082 |
|1940.0     |286.0           |463.67112810707454|
|3770.0     |900.0           |901.0516252390057 |
|1460.0     |350.0           |348.9483747609942 |
|1217.0     |289.0           |290.8699808795411 |
|1880.0     |448.0           |449.33078393881453|
|1056.0     |249.0           |252.39005736137668|
|1139.0     |110.0           |272.22753346080304|
|1600.0     |384.0           |382.40917782026764|
|1253.0     |381.0           |299.47418738049714|
|1867.0     |443.0           |446.22370936902485|
|1465.0     |371.0           |350.1434034416826 |


In [None]:
df = df.withColumn("kcal_theorique", round(col("energy_100g") / 4.184, 4))
df = df.drop("energy-kcal_100g").withColumnRenamed("kcal_theorique", "energy-kcal_100g")
df.select("energy_100g", "energy-kcal_100g").show(truncate=False)



+-----------+----------------+
|energy_100g|energy-kcal_100g|
+-----------+----------------+
|NULL       |NULL            |
|1389.0     |331.979         |
|2415.0     |577.1989        |
|NULL       |NULL            |
|NULL       |NULL            |
|392.0      |93.6902         |
|NULL       |NULL            |
|2401.0     |573.8528        |
|929.0      |222.0363        |
|1620.0     |387.1893        |
|962.0      |229.9235        |
|NULL       |NULL            |
|4.0        |0.956           |
|1510.0     |360.8987        |
|293.0      |70.0287         |
|NULL       |NULL            |
|1852.0     |442.6386        |
|NULL       |NULL            |
|NULL       |NULL            |
|188.0      |44.9331         |
+-----------+----------------+
only showing top 20 rows



<small>

**Alors On a 3630171 conversions incoherentes de kj à kcal ~~ 95.28% de la dataset**
</small>

In [None]:
print("En venant de comparer la quantité des glucides sur 100g avec la quantité de sucres en 100g : ")
carbohydrate_sucre_incoherent = df.filter(col("sugars_100g") > col("carbohydrates_100g"))
carbohydrate_sucre_incoherent.show()

<small>

**Pourquoi une valeure de sugars_100g > carbohydrates_100g sont des valeurs incoherentes car :** Carbohydrates regroupent  les Sucres , les Amidons, les Fibres alimentaires dans 100g d' un produit alimentaire

Sachant que l'unité avec quelle on calcule les carbohydrates et le sucre est en grammes (g)

</small>

In [None]:
print("le nombre des valeurs incoherente au niveau des carbohydrates et les sucres dans chaque 100g : ", carbohydrate_sucre_incoherent.count())

In [None]:
fatsaturated_fat_incoherent=df.filter(col("saturated-fat_100g") > col("fat_100g"))
fatsaturated_fat_incoherent.select("saturated-fat_100g", "fat_100g")
print(fatsaturated_fat_incoherent.count())

In [None]:
df.limit(5).show()

In [None]:
df.select(col("url")).show(truncate=False)

In [None]:
#Delete the url column
df_clean = df.drop("url")

<small>

**Durant mes consultation des 20 premieres lignes url de la colonne "url" dans  cette dataset j'ai trouvé que quelques urls me derige à des pages introuvables  sinon je trouve qu'il existe une forte perturbation au niveau des données affichées sur la page. Donc je prends la decision de delete la colonne "url"**
</small>

In [None]:
# Je veux etudier la force de ce code :
total_code_unique = df.select("code").distinct().count()
print("afficher le nombre des produits alimentaires avec un code unique : ", total_code_unique)
print("afficher le nombre des produits alimentaires avec un code non unique : ", total_rows - total_code_unique)

<small>

**On a constaté que le code n'est pas une valeur unique pour chaque produit, car on a trouvé des code dubliqués donc on doit consulter les lignes dubliquées pour bien comprendre la situation !!**
</small>

In [None]:
# Grouper par code et compter combien de fois chaque code apparaît
code_counts = df.groupBy("code", "creator").agg(count("*").alias("count"))

# Garder seulement ceux qui apparaissent plus d'une fois
duplicate_codes = code_counts.filter(col("count") > 1)

duplicated_rows = df.join(duplicate_codes, on="code", how="inner")

# Afficher les lignes concernées
duplicated_rows.limit(30).show(truncate=False)

In [None]:
#Cela supprimera toutes les lignes ayant les mêmes valeurs pour code et product_name
#Et gardera la première occurrence
df_clean = df.dropDuplicates(["code", "product_name"])

In [None]:
df_clean = df.filter(col("product_name").isNotNull())

In [None]:
df.select("product_name").show(truncate=False)

In [None]:
new_total_rows_after_cleaning = df_clean.count()
print(new_total_rows_after_cleaning)

3564020


In [None]:
df_clean = df.drop(
    "created_t",
    "created_datetime",
    "last_modified_t",
    "last_modified_datetime",
    "last_modified_by",
    "last_updated_t",
    "last_updated_datetime"
)

Données produit (noms, marques, catégories...)

In [None]:
df.filter(col("generic_name").isNotNull()).show()

+------------+--------------------+--------------------+----------+-------------------+---------------+----------------------+----------------+--------------+---------------------+--------------------+------------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------+-----------------+-------------+--------------------+-------------------------+--------------------+--------------------+--------------------+---------+--------------+------------------------+------+-----------+--------------------+--------------------+-------------------+--------------------+----------------+--------------------+--------------------+-------------------------+--------------------+------------+--------------------+--------------------+--------------------+--------------+----------------

In [None]:
print(df.filter(col("generic_name").isNotNull()).count())

173995


<small>

**->Total de lignes : 3 561 284**
**->Valeurs non nulles dans generic_name : 172 468**
**->Taux de complétion : environ 4.8%**
</small>

In [None]:
df_clean = df.drop("generic_name")

In [None]:
print(df.filter(col("brands").isNotNull()).count())

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/socket.py", line 718, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

In [None]:
df_clean= df.fillna({"brands": "brand name unknown"})
df_clean.select("brands").show(10, truncate=False)

+------------------+
|brands            |
+------------------+
|brand name unknown|
|Gut & Gunstig     |
|Jeff de Bruges    |
|breizh cola       |
|AdvoCare          |
|SoLo              |
|YI Nutrition      |
|Oreos             |
|brand name unknown|
|Dukes             |
+------------------+
only showing top 10 rows



In [None]:
df.select("brands","brands_tags").show(truncate=False)

In [None]:
df_clean= df.fillna({"brands_tags": "xx:brand_name_unknown"})
#to make sure que j ai le nombre complet des lignes non null :
print(df_clean.filter(col("brands_tags").isNotNull()).count())

3809945


In [None]:
df.select("categories", "main_category","categories_tags", "categories_fr", "main_category_fr" ).show(truncate=False)

In [None]:
print("le nombre de valeur null dans la colonne categories:",new_total_rows_after_cleaning - df_clean.filter(col("categories").isNotNull()).count())

In [None]:
#Le but c'est de comparer est ce que le nombre des nulls dans la colonne categories est le meme sur la colonne main_category

print("le nombre de valeur null dans la colonne main_category:",new_total_rows_after_cleaning - df_clean.filter(col("main_category").isNotNull()).count())

<small>

**Je compte m'appuyer fortement sur les catégories pour matcher avec les ingrédients des recettes donc je viens de supprimer les lignes où categories est null**

**Simulation:Si tu as une recette avec le mot "chocolat", tu veux retrouver les produits de catégorie "desserts, chocolats, confiseries". Si la colonne categories est vide, ce match ne sera jamais possible.**
</small>

In [None]:
df_clean=df.filter(col("categories").isNotNull())
# just to make sure
print("le nombre de valeur null dans la colonne categories apres suppression:",new_total_rows_after_cleaning - df.filter(col("categories").isNotNull()).count())

In [None]:
categories=df.select(col("categories")).distinct()
categories.show(truncate=False)
print("le nombre des categories distinct : ", categories.count())

<small>

**->Counter of distinct main_category est 65644**
**->Counter of distinct main_category est 269195**

</small>

In [None]:
packaging_type = df.select("packaging").distinct()
packaging_type.show()
print(" le nombre des differents types de packaging : ", packaging_type.count())

In [None]:
packaging_text= df.select("packaging_text")
packaging_text.show("packaging_text")

In [None]:
df.filter(col("packaging_text").isNotNull()).show()

In [None]:
packaging_text_notnull = df.filter(col("packaging_text").isNotNull()).count()
print("Pourcentage de valeurs non nulles dans cette colonne :", (packaging_text_notnull / new_total_rows_after_cleaning) * 100, "%")


<small>

**~~ 0.797% de valeurs NotNull dans la colonne "packaging_text", alors on prend la decision de supprimer la colonne!!**

In [None]:
df = df.drop("packaging_text")

In [None]:
df.filter(
    col("first_packaging_code_geo").isNotNull() &
    col("emb_codes").isNotNull() &
    col("emb_codes_tags").isNotNull()
).show()


<small>

Ce code "emb_codes" se trouve sur les produits alimentaires européens, il peut te permettre de :
1. Identifier l’entreprise responsable du conditionnement.
2. Repérer les produits provenant du même emballeur.
3. Analyser la traçabilité géographique et industrielle (souvent, une partie du code indique un département/région).
4. Détecter des patterns industriels (ex : un emballeur travaillant pour plusieurs marques).

Je vais pas supprimer cette colonne car j aurai besoin d'elle durant la anlyse et lextraction des insights
</small>

In [None]:
#remplire les valeurs nulls
df= df.fillna({"emb_codes": "emb code unknown"})
df= df.fillna({"emb_codes_tags": "emb_code_unknown"})

In [None]:

first_packaging_code_geo = df.filter(col("first_packaging_code_geo").isNotNull()) \
  .select("first_packaging_code_geo")
first_packaging_code_geo.show(20, truncate=False)
print(first_packaging_code_geo.count())

<small>

**Je vais pas supprimer cette colonne, peut etre j'aurai besoin d'elle**
</small>

In [None]:
origins=df.filter(col("origins").isNotNull()).select("origins").show(truncate=False)

In [None]:
# Liste des colonnes souhaitées
selected_columns = [
    "code", "product_name", "brands", "main_category", "countries", "stores",
    "ingredients_text", "quantity", "allergens", "traces",
    "energy_100g", "categories", "fat_100g", "saturated-fat_100g", "trans-fat_100g",
    "cholesterol_100g", "carbohydrates_100g", "sugars_100g", "added-sugars_100g",
    "sucrose_100g", "glucose_100g", "fructose_100g", "galactose_100g", "lactose_100g",
    "maltose_100g", "fiber_100g", "salt_100g", "added-salt_100g", "sodium_100g", "alcohol_100g"
]

# Création de la DataFrame filtrée
df_nutrition = df_clean.select(*selected_columns)

# Affichage pour vérifier
df_nutrition.show(5)


+-----+--------------------+--------------+--------------------+--------------------+------+--------------------+------------------+---------+------+-----------+--------------------+--------+------------------+--------------+----------------+------------------+-----------+-----------------+------------+------------+-------------+--------------+------------+------------+----------+---------+---------------+-----------+------------+
| code|        product_name|        brands|       main_category|           countries|stores|    ingredients_text|          quantity|allergens|traces|energy_100g|          categories|fat_100g|saturated-fat_100g|trans-fat_100g|cholesterol_100g|carbohydrates_100g|sugars_100g|added-sugars_100g|sucrose_100g|glucose_100g|fructose_100g|galactose_100g|lactose_100g|maltose_100g|fiber_100g|salt_100g|added-salt_100g|sodium_100g|alcohol_100g|
+-----+--------------------+--------------+--------------------+--------------------+------+--------------------+-----------------

In [None]:
# 1. Exporter dans un dossier temporaire (Spark oblige)
df_nutrition.coalesce(1).write.mode("overwrite").option("header", True).csv("/content/temp_nutrition")

# 2. Trouver le fichier CSV généré et le déplacer/renommer
import os
import shutil

# Dossier Spark temporaire
temp_folder = "/content/temp_nutrition"
# Chemin de destination
final_path = "/content/drive/My Drive/Colab Notebooks/dataset_nutrition.csv"

# Recherche du fichier généré par Spark
for file_name in os.listdir(temp_folder):
    if file_name.startswith("part-") and file_name.endswith(".csv"):
        temp_file_path = os.path.join(temp_folder, file_name)
        shutil.move(temp_file_path, final_path)
        print(f"✅ Fichier enregistré avec succès à : {final_path}")
        break
else:
    print("❌ Fichier CSV non trouvé dans le dossier temporaire.")


NameError: name 'df' is not defined