<a href="https://colab.research.google.com/github/youssef-ASSOIL/OpenFoodFacts_spark/blob/main/OpenFoodFacts_spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [8]:
!pip install pyspark



In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when, isnan, desc

In [7]:
!wget https://static.openfoodfacts.org/data/en.openfoodfacts.org.products.csv -O products.csv

--2025-06-14 19:48:28--  https://static.openfoodfacts.org/data/en.openfoodfacts.org.products.csv
Resolving static.openfoodfacts.org (static.openfoodfacts.org)... 213.36.253.214
Connecting to static.openfoodfacts.org (static.openfoodfacts.org)|213.36.253.214|:443... connected.
HTTP request sent, awaiting response... 302 Moved Temporarily
Location: https://openfoodfacts-ds.s3.eu-west-3.amazonaws.com/en.openfoodfacts.org.products.csv [following]
--2025-06-14 19:48:28--  https://openfoodfacts-ds.s3.eu-west-3.amazonaws.com/en.openfoodfacts.org.products.csv
Resolving openfoodfacts-ds.s3.eu-west-3.amazonaws.com (openfoodfacts-ds.s3.eu-west-3.amazonaws.com)... 3.5.205.248, 3.5.226.145
Connecting to openfoodfacts-ds.s3.eu-west-3.amazonaws.com (openfoodfacts-ds.s3.eu-west-3.amazonaws.com)|3.5.205.248|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 11220064070 (10G) [text/csv]
Saving to: ‘products.csv’


2025-06-14 19:51:05 (68.5 MB/s) - ‘products.csv’ saved [11220064070

In [3]:
from pyspark.sql import SparkSession

# Création de la session Spark
spark = SparkSession.builder \
    .appName("Data Profiling OpenFoodFacts") \
    .getOrCreate()


In [7]:
# À exécuter après avoir récupéré les produits via l’API
import pandas as pd
import numpy as np

# Normaliser le JSON
df_pd = pd.json_normalize(products)

# Garder uniquement les colonnes scalaires
def is_scalar(val):
    return isinstance(val, (str, int, float, bool, type(None)))

scalar_cols = [col for col in df_pd.columns if df_pd[col].map(is_scalar).all()]
df_pd_clean = df_pd[scalar_cols].copy()

# Forcer tous les types numériques à float64
for col in df_pd_clean.columns:
    if pd.api.types.is_numeric_dtype(df_pd_clean[col]):
        try:
            df_pd_clean[col] = df_pd_clean[col].astype(np.float64)
        except:
            df_pd_clean.drop(columns=[col], inplace=True)

# Créer le DataFrame Spark
df = spark.createDataFrame(df_pd_clean)


NameError: name 'products' is not defined

In [1]:
df.write.mode("overwrite").json("products_json")
df.write.mode("overwrite").parquet("products_parquet")

NameError: name 'df' is not defined

In [12]:
import time

start = time.time()
df_json = spark.read.json("products_json")
print("JSON load time:", time.time() - start)

start = time.time()
df_parquet = spark.read.parquet("products_parquet")
print("Parquet load time:", time.time() - start)

JSON load time: 113.68371391296387
Parquet load time: 0.40471339225769043


In [13]:
df.printSchema()

root
 |-- code: double (nullable = true)
 |-- url: string (nullable = true)
 |-- creator: string (nullable = true)
 |-- created_t: integer (nullable = true)
 |-- created_datetime: timestamp (nullable = true)
 |-- last_modified_t: integer (nullable = true)
 |-- last_modified_datetime: timestamp (nullable = true)
 |-- last_modified_by: string (nullable = true)
 |-- last_updated_t: integer (nullable = true)
 |-- last_updated_datetime: timestamp (nullable = true)
 |-- product_name: string (nullable = true)
 |-- abbreviated_product_name: string (nullable = true)
 |-- generic_name: string (nullable = true)
 |-- quantity: string (nullable = true)
 |-- packaging: string (nullable = true)
 |-- packaging_tags: string (nullable = true)
 |-- packaging_en: string (nullable = true)
 |-- packaging_text: string (nullable = true)
 |-- brands: string (nullable = true)
 |-- brands_tags: string (nullable = true)
 |-- brands_en: string (nullable = true)
 |-- categories: string (nullable = true)
 |-- catego

In [9]:
from pyspark.sql.functions import col, when, count, isnan
from pyspark.sql.types import DoubleType, FloatType

missing_counts = []

for field in df.schema.fields:
    cname = field.name
    ctype = field.dataType

    if isinstance(ctype, (DoubleType, FloatType)):
        expr = count(when(col(cname).isNull() | isnan(col(cname)), cname)).alias(cname)
    else:
        expr = count(when(col(cname).isNull(), cname)).alias(cname)

    missing_counts.append(expr)

df_missing = df.select(missing_counts)
df_missing.show(n=1, vertical=True, truncate=False)


NameError: name 'df' is not defined

In [2]:
total_rows = df.count()
missing_info = df_missing.first().asDict()
cols_to_drop = [col for col, miss in missing_info.items() if miss / total_rows > 0.9]
print("Colonnes à supprimer (>90% vides):", cols_to_drop)

NameError: name 'df' is not defined

In [None]:
df.groupBy("countries_tags").count().orderBy(desc("count")).show(10)
df.select("energy_100g", "fat_100g").describe().show()


In [None]:
import matplotlib.pyplot as plt
top_countries = df.groupBy("countries_tags").count().orderBy(desc("count")).limit(10)
top_pd = top_countries.toPandas()

top_pd.plot(kind='bar', x='countries_tags', y='count', legend=False)
plt.title("Top 10 des pays représentés")
plt.ylabel("Nombre de produits")
plt.xlabel("Pays")
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()


In [None]:
df.columns
df = df.toDF(*[c.strip().lower().replace(" ", "_") for c in df.columns])
print(df.columns)
df.groupBy("countries_tags").count().orderBy(desc("count")).show(10)
[c for c in df.columns if "country" in c.lower()]


In [None]:
df.groupBy("countries_tags").count().orderBy(desc("count")).show(10, truncate=False)
from pyspark.sql.functions import col, when, count, isnan

missing_df = df.select([
    count(when(col(c).isNull() | isnan(c) | (col(c) == ""), c)).alias(c)
    for c in df.columns
])
missing_df.show(n=1, vertical=True, truncate=False)
total_rows = df.count()
distinct_rows = df.dropDuplicates().count()
print(f"Total: {total_rows}, Uniques: {distinct_rows}, Doublons: {total_rows - distinct_rows}")
df.select("fat_100g").describe().show()
import matplotlib.pyplot as plt

top_countries = df.groupBy("countries_tags").count().orderBy(desc("count")).limit(10)
top_countries_pd = top_countries.toPandas()

plt.figure(figsize=(10,6))
plt.bar(top_countries_pd["countries_tags"], top_countries_pd["count"])
plt.title("Top 10 des pays")
plt.ylabel("Nombre de produits")
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()