In [8]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.feature import VectorAssembler, MinMaxScaler
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnull, when, count, col, lit

In [9]:
spark = SparkSession.builder \
                .appName("openfood-analytics") \
                .master("local[*]") \
                .config("spark.driver.cores", "2") \
                .config("spark.driver.memory", "4g") \
                .config("spark.executor.memory", "10g") \
                .getOrCreate()

In [10]:
df = spark.read.option("sep", "\t").option("header", True).csv("../data/en.openfoodfacts.org.products.csv")
df.printSchema()

root
 |-- code: string (nullable = true)
 |-- url: string (nullable = true)
 |-- creator: string (nullable = true)
 |-- created_t: string (nullable = true)
 |-- created_datetime: string (nullable = true)
 |-- last_modified_t: string (nullable = true)
 |-- last_modified_datetime: string (nullable = true)
 |-- last_modified_by: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- abbreviated_product_name: string (nullable = true)
 |-- generic_name: string (nullable = true)
 |-- quantity: string (nullable = true)
 |-- packaging: string (nullable = true)
 |-- packaging_tags: string (nullable = true)
 |-- packaging_en: string (nullable = true)
 |-- packaging_text: string (nullable = true)
 |-- brands: string (nullable = true)
 |-- brands_tags: string (nullable = true)
 |-- categories: string (nullable = true)
 |-- categories_tags: string (nullable = true)
 |-- categories_en: string (nullable = true)
 |-- origins: string (nullable = true)
 |-- origins_tags: string (nulla

In [11]:
num_cols = [
 'energy-kj_100g',
 'energy-kcal_100g',
 'energy_100g',
 'energy-from-fat_100g',
 'fat_100g',
 'saturated-fat_100g',
 'butyric-acid_100g',
 'caproic-acid_100g',
 'caprylic-acid_100g',
 'capric-acid_100g',
 'lauric-acid_100g',
 'myristic-acid_100g',
 'palmitic-acid_100g',
 'stearic-acid_100g',
 'arachidic-acid_100g',
 'behenic-acid_100g',
 'lignoceric-acid_100g',
 'cerotic-acid_100g',
 'montanic-acid_100g',
 'melissic-acid_100g',
 'unsaturated-fat_100g',
 'monounsaturated-fat_100g',
 'polyunsaturated-fat_100g',
 'omega-3-fat_100g',
 'alpha-linolenic-acid_100g',
 'eicosapentaenoic-acid_100g',
 'docosahexaenoic-acid_100g',
 'omega-6-fat_100g',
 'linoleic-acid_100g',
 'arachidonic-acid_100g',
 'gamma-linolenic-acid_100g',
 'dihomo-gamma-linolenic-acid_100g',
 'omega-9-fat_100g',
 'oleic-acid_100g',
 'elaidic-acid_100g',
 'gondoic-acid_100g',
 'mead-acid_100g',
 'erucic-acid_100g',
 'nervonic-acid_100g',
 'trans-fat_100g',
 'cholesterol_100g',
 'carbohydrates_100g',
 'sugars_100g',
 'added-sugars_100g',
 'sucrose_100g',
 'glucose_100g',
 'fructose_100g',
 'lactose_100g',
 'maltose_100g',
 'maltodextrins_100g',
 'starch_100g',
 'polyols_100g',
 'erythritol_100g',
 'fiber_100g',
 'soluble-fiber_100g',
 'insoluble-fiber_100g',
 'proteins_100g',
 'casein_100g',
 'serum-proteins_100g',
 'nucleotides_100g',
 'salt_100g',
 'added-salt_100g',
 'sodium_100g',
 'alcohol_100g',
 'vitamin-a_100g',
 'beta-carotene_100g',
 'vitamin-d_100g',
 'vitamin-e_100g',
 'vitamin-k_100g',
 'vitamin-c_100g',
 'vitamin-b1_100g',
 'vitamin-b2_100g',
 'vitamin-pp_100g',
 'vitamin-b6_100g',
 'vitamin-b9_100g',
 'folates_100g',
 'vitamin-b12_100g',
 'biotin_100g',
 'pantothenic-acid_100g',
 'silica_100g',
 'bicarbonate_100g',
 'potassium_100g',
 'chloride_100g',
 'calcium_100g',
 'phosphorus_100g',
 'iron_100g',
 'magnesium_100g',
 'zinc_100g',
 'copper_100g',
 'manganese_100g',
 'fluoride_100g',
 'selenium_100g',
 'chromium_100g',
 'molybdenum_100g',
 'iodine_100g',
 'caffeine_100g',
 'taurine_100g',
 'ph_100g',
 'fruits-vegetables-nuts_100g',
 'fruits-vegetables-nuts-dried_100g',
 'fruits-vegetables-nuts-estimate_100g',
 'fruits-vegetables-nuts-estimate-from-ingredients_100g',
 'collagen-meat-protein-ratio_100g',
 'cocoa_100g',
 'chlorophyl_100g',
 'carbon-footprint_100g',
 'carbon-footprint-from-meat-or-fish_100g',
 'nutrition-score-fr_100g',
 'nutrition-score-uk_100g',
 'glycemic-index_100g',
 'water-hardness_100g',
 'choline_100g',
 'phylloquinone_100g',
 'beta-glucan_100g',
 'inositol_100g',
 'carnitine_100g'
]

cols_to_keep = [col("product_name"), col("main_category")] + [col(x).cast("float") for x in num_cols]
df = df.select(cols_to_keep)

In [12]:
df.show(n=5)

+----------------+----------------+--------------+----------------+-----------+--------------------+--------+------------------+-----------------+-----------------+------------------+----------------+----------------+------------------+------------------+-----------------+-------------------+-----------------+--------------------+-----------------+------------------+------------------+--------------------+------------------------+------------------------+----------------+-------------------------+--------------------------+-------------------------+----------------+------------------+---------------------+-------------------------+--------------------------------+----------------+---------------+-----------------+-----------------+--------------+----------------+------------------+--------------+----------------+------------------+-----------+-----------------+------------+------------+-------------+------------+------------+------------------+-----------+------------+---------------+-

In [13]:
# Find columns with > 60% null values
nan_percentage = df.select([(lit(100) * count(when(isnull(c), c)) / count("*")).alias(c) for c in df.columns])\
                    .collect()[0].asDict()
non_nulls_cols = [k for k, v in nan_percentage.items() if v < 40]

In [14]:
# Fill nans and drop "null" columns
df = df.select(non_nulls_cols)
df = df.na.fill(0.0).na.fill("unk")

In [15]:
# Now there is no nulls
df.select([(lit(100) * count(when(isnull(c), c)) / count("*")).alias(c) for c in df.columns]).show()

+------------+----------------+-----------+--------+------------------+------------------+-----------+-------------+---------+-----------+
|product_name|energy-kcal_100g|energy_100g|fat_100g|saturated-fat_100g|carbohydrates_100g|sugars_100g|proteins_100g|salt_100g|sodium_100g|
+------------+----------------+-----------+--------+------------------+------------------+-----------+-------------+---------+-----------+
|         0.0|             0.0|        0.0|     0.0|               0.0|               0.0|        0.0|          0.0|      0.0|        0.0|
+------------+----------------+-----------+--------+------------------+------------------+-----------+-------------+---------+-----------+



In [16]:
input_features = list(set(num_cols) & set(non_nulls_cols))
df = VectorAssembler(inputCols=input_features, outputCol="raw_features").setHandleInvalid("error").transform(df)

In [17]:
scaler = MinMaxScaler().setInputCol("raw_features").setOutputCol("features")
scaler_model = scaler.fit(df)
df = scaler_model.transform(df)

In [18]:
metric = ClusteringEvaluator()
results = []
for k in range(2, 21, 3):
    for max_iter in [5, 10, 20, 50]:
        kmeans = KMeans().setK(k).setSeed(1).setMaxIter(max_iter)
        cluster_model = kmeans.fit(df)
        predictions = cluster_model.transform(df)
        performance_measure = metric.evaluate(predictions)
        results.append((k, max_iter, performance_measure))

In [25]:
best_params = max(results, key=lambda x: x[2])
print(f"Best params for k-means (k, max_iter, clustering_score): {best_params}")

Best params for k-means (k, max_iter, clustering_score): (2, 5, 0.9999969973987277)


In [26]:
kmeans = KMeans().setK(2).setSeed(1).setMaxIter(5)
cluster_model = kmeans.fit(df)
cluster_model.save("../data/openfood_kmeans.model")