In [104]:
# Initiate a SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('explore data').getOrCreate()

In [105]:
# read from a .tsv file with proper separators being specified to avoid parsing issues 
df = spark.read.csv("openfoodfacts.tsv", sep=r'\t', header = True)

In [106]:
# Calculating total number of features
print(len(df.columns))
print(df.count())

# In the original dataset, we have 163 columns and 356027 rows     |    1 GB of data

163
356027


In [107]:
# Exploring column names 
df.columns

['code',
 'url',
 'creator',
 'created_t',
 'created_datetime',
 'last_modified_t',
 'last_modified_datetime',
 'product_name',
 'generic_name',
 'quantity',
 'packaging',
 'packaging_tags',
 'brands',
 'brands_tags',
 'categories',
 'categories_tags',
 'categories_en',
 'origins',
 'origins_tags',
 'manufacturing_places',
 'manufacturing_places_tags',
 'labels',
 'labels_tags',
 'labels_en',
 'emb_codes',
 'emb_codes_tags',
 'first_packaging_code_geo',
 'cities',
 'cities_tags',
 'purchase_places',
 'stores',
 'countries',
 'countries_tags',
 'countries_en',
 'ingredients_text',
 'allergens',
 'allergens_en',
 'traces',
 'traces_tags',
 'traces_en',
 'serving_size',
 'no_nutriments',
 'additives_n',
 'additives',
 'additives_tags',
 'additives_en',
 'ingredients_from_palm_oil_n',
 'ingredients_from_palm_oil',
 'ingredients_from_palm_oil_tags',
 'ingredients_that_may_be_from_palm_oil_n',
 'ingredients_that_may_be_from_palm_oil',
 'ingredients_that_may_be_from_palm_oil_tags',
 'nutritio

In [108]:
# Saving names of columns we would like to get rid of into a list populated from a previously created .txt file
fileobj = open("dropped columns.txt")
column_list = []
for column in fileobj:
    column_list.append(column.strip().replace("'", "").replace(",", ""))


In [109]:
# Dropping a total of 48 columns: these features are either redundant or irrelevant to our analysis 
df = df.drop(*column_list)

# Displaying updated schema
df.printSchema()

root
 |-- last_modified_datetime: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- quantity: string (nullable = true)
 |-- packaging: string (nullable = true)
 |-- brands: string (nullable = true)
 |-- categories_en: string (nullable = true)
 |-- labels_en: string (nullable = true)
 |-- countries_en: string (nullable = true)
 |-- ingredients_text: string (nullable = true)
 |-- allergens_en: string (nullable = true)
 |-- traces_en: string (nullable = true)
 |-- serving_size: string (nullable = true)
 |-- additives_n: string (nullable = true)
 |-- additives_en: string (nullable = true)
 |-- ingredients_from_palm_oil_n: string (nullable = true)
 |-- ingredients_that_may_be_from_palm_oil_n: string (nullable = true)
 |-- energy_100g: string (nullable = true)
 |-- fat_100g: string (nullable = true)
 |-- saturated-fat_100g: string (nullable = true)
 |-- -butyric-acid_100g: string (nullable = true)
 |-- -caproic-acid_100g: string (nullable = true)
 |-- -caprylic-acid_1

In [110]:
# Import functions 
# NOTE: originally contained only specific ones, but as out pool expanded, chose to add all
from pyspark.sql.functions import *

In [111]:
# Determine number of rows where both ingredients_text and categories_en are null -> 55558 records, %15.6 of the dataset
df.filter(isnull("ingredients_text") & isnull("categories_en") | col("ingredients_text").isNull() & col("categories_en").isNull()).count()

55558

In [112]:
# Note that since these records do not provide sufficient data on the product, we will have to drop them
df = df.filter(col("ingredients_text").isNotNull() | col("categories_en").isNotNull())
df.count()      # total numbers of records currently = 300469

300469

In [113]:
# Let's determine percentage of missing values for remaining columns
nan_percent = df.select([(count(when(isnan(c) | col(c).isNull(), c))/count(lit(1))).alias(c) for c in df.columns])

# Round to two decimal places for convenience
nan_percent = nan_percent.select(*[round(c, 2).alias(c) for c in nan_percent.columns])

nan_percent.show()

# 25-30% of null values are allowed per dimensionality reduction rule. 
# we might consider not dropping columns with _100g label considering they are not 100% empty 
# as it may provide significant insights 

+----------------------+------------+--------+---------+------+-------------+---------+------------+----------------+------------+---------+------------+-----------+------------+---------------------------+---------------------------------------+-----------+--------+------------------+------------------+------------------+-------------------+-----------------+-----------------+-------------------+-------------------+------------------+--------------------+------------------+---------------------+------------------+-------------------+-------------------+------------------------+------------------------+----------------+--------------------------+---------------------------+--------------------------+----------------+-------------------+----------------------+--------------------------+---------------------------------+----------------+----------------+------------------+------------------+---------------+-----------------+-------------------+--------------+----------------+------------

In [114]:
# Since we are dealing with a large number of columns, let's sort them based on NAN percentage 

# Convert PySpark DataFrame to Pandas DataFrame
pandas_df = nan_percent.toPandas()

row_index = 0  

# Get a sorted list of column names based on the row's values and rearrange them accordingly 
sorted_cols = pandas_df.iloc[row_index, :].sort_values(ascending=False).index.tolist()
pandas_df = pandas_df[sorted_cols]

# save results to a .csv file for further inspection 
pandas_df.to_csv("nan.csv")

  def _to_corrected_pandas_type(dt):


In [115]:
# Looks like we can safely remove columns where all rows are null

# Get columns from pandas_df where value = 1
pandas_df = pandas_df.loc[:, (pandas_df == 1).any()]

# drop these columns from our dataframe
df = df.drop(*pandas_df.columns)

len(df.columns)                                      # we are down to 51 feature 

# Based on the data provided in nan.csv, looks like three more columns - traces_en, labels_en, and packaging have NAN values around 0.7. 
# Since they are not critical to the analysis, let's remove them (leaves us with 47 features)

df = df.drop('quantity','traces_en', 'labels_en', 'packaging')

In [116]:
# Since nutri-score will be our primary column for the XGBoost, let's determine how many nulls it has 
clm1 = 'nutrition-score-fr_100g'
df.select(count(when(isnull(clm1) | col(clm1).isNull(), clm1)).alias("Null Nutri-score FR")).show()

+-------------------+
|Null Nutri-score FR|
+-------------------+
|              63016|
+-------------------+



In [117]:
# Since we also have 'nutrition-score-uk_100g', let's see if there are any records where UK score is null but France score is not
df.filter(col(clm1).isNull() & col("nutrition-score-uk_100g").isNotNull()).count()         # 0 
df.filter(col(clm1) != col("nutrition-score-uk_100g")).count()                             # 13063 total

# Different scores for France and UK are understandable since the former uses the original five-color system and the UK uses the Traffic Light system (only 3 colors)
# For the purpose of this research, we will drop the UK score column and rename clm1 to "nutri_score"
df = df.drop("nutrition-score-uk_100g")
df = df.withColumnRenamed(clm1, "nutri_score")
df.show()                                         # now we are down to 46 features 


+----------------------+--------------------+--------------------+-------------+-------------+--------------------+---------------+-----------+------------------+---------------------------+---------------------------------------+-----------+--------+------------------+------------------------+------------------------+--------------+----------------+------------------+-----------+----------+-------------+---------+-----------+------------+--------------+--------------+--------------+---------------+---------------+---------------+---------------+---------------+------------+----------------+---------------------+--------------+------------+---------------+---------+--------------+---------+-----------+--------------+---------------------------+-----------+
|last_modified_datetime|        product_name|              brands|categories_en| countries_en|    ingredients_text|   serving_size|additives_n|      additives_en|ingredients_from_palm_oil_n|ingredients_that_may_be_from_palm_oil_n|ene

In [118]:
# Let's take a look at an updated schema: as shown, all our columns contain String datatypes 
df.printSchema()

root
 |-- last_modified_datetime: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- brands: string (nullable = true)
 |-- categories_en: string (nullable = true)
 |-- countries_en: string (nullable = true)
 |-- ingredients_text: string (nullable = true)
 |-- serving_size: string (nullable = true)
 |-- additives_n: string (nullable = true)
 |-- additives_en: string (nullable = true)
 |-- ingredients_from_palm_oil_n: string (nullable = true)
 |-- ingredients_that_may_be_from_palm_oil_n: string (nullable = true)
 |-- energy_100g: string (nullable = true)
 |-- fat_100g: string (nullable = true)
 |-- saturated-fat_100g: string (nullable = true)
 |-- monounsaturated-fat_100g: string (nullable = true)
 |-- polyunsaturated-fat_100g: string (nullable = true)
 |-- trans-fat_100g: string (nullable = true)
 |-- cholesterol_100g: string (nullable = true)
 |-- carbohydrates_100g: string (nullable = true)
 |-- sugars_100g: string (nullable = true)
 |-- fiber_100g: string (null

In [119]:
# Now let's adjust datatypes for some of these columns 

# For all columns ending in _100g we will change the datatype to double, round to 2 decimals, and fill NA vals
double_cast = df.select(df.colRegex("`.+100g$`")).columns 
for col_name in double_cast:
    df = df.withColumn(col_name, round(when(col(col_name).isNull(), 0).otherwise(col(col_name).cast("double")), 2))

In [120]:
# For all columns ending in _n changing the datatype to int and fill NA vals with 0
int_cast = df.select(df.colRegex("`.*_n$`")).columns
for col_name in int_cast:
    df = df.withColumn(col_name, when(col(col_name).isNull(), 0).otherwise(col(col_name).cast("int")))


In [121]:
# Change last_modified_datetime to timestamp datatype and extract year from the column
df = df.withColumn("last_modified_datetime", year(to_timestamp(col("last_modified_datetime"), "yyyy-MM-dd'T'HH:mm:ss'Z'")))
df = df.withColumnRenamed("last_modified_datetime", "year")
df.printSchema()

root
 |-- year: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- brands: string (nullable = true)
 |-- categories_en: string (nullable = true)
 |-- countries_en: string (nullable = true)
 |-- ingredients_text: string (nullable = true)
 |-- serving_size: string (nullable = true)
 |-- additives_n: integer (nullable = true)
 |-- additives_en: string (nullable = true)
 |-- ingredients_from_palm_oil_n: integer (nullable = true)
 |-- ingredients_that_may_be_from_palm_oil_n: integer (nullable = true)
 |-- energy_100g: double (nullable = true)
 |-- fat_100g: double (nullable = true)
 |-- saturated-fat_100g: double (nullable = true)
 |-- monounsaturated-fat_100g: double (nullable = true)
 |-- polyunsaturated-fat_100g: double (nullable = true)
 |-- trans-fat_100g: double (nullable = true)
 |-- cholesterol_100g: double (nullable = true)
 |-- carbohydrates_100g: double (nullable = true)
 |-- sugars_100g: double (nullable = true)
 |-- fiber_100g: double (nullable = true)
 

In [122]:
# Drop rows where 'product_name' is null
df = df.na.drop(subset = ['product_name'])

In [123]:
# For the remaining ones, we will use "not specified" since we are not casting any other datatypes to String
for col_name in df.columns:
    if col_name not in double_cast and col_name not in int_cast and col_name != "year" and col_name != "nutri_score":
        df = df.withColumn(col_name, when(df[col_name].isNull(), "not specified").otherwise(df[col_name]))

df.show()

# Note that we are excluding two columns: "year" (to retain the established datatype) 
# and nutri_score since we would need it for model evaluation 

+----+--------------------+--------------------+-------------+-------------+--------------------+---------------+-----------+------------------+---------------------------+---------------------------------------+-----------+--------+------------------+------------------------+------------------------+--------------+----------------+------------------+-----------+----------+-------------+---------+-----------+------------+--------------+--------------+--------------+---------------+---------------+---------------+---------------+---------------+------------+----------------+---------------------+--------------+------------+---------------+---------+--------------+---------+-----------+--------------+---------------------------+-----------+
|year|        product_name|              brands|categories_en| countries_en|    ingredients_text|   serving_size|additives_n|      additives_en|ingredients_from_palm_oil_n|ingredients_that_may_be_from_palm_oil_n|energy_100g|fat_100g|saturated-fat_100g

In [133]:
# Let's check if we have any other null values remaining 
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

# We will drop NULLs for "Year" 
df = df.na.drop(subset=["year"])    # num of rows down to 297603

+----+------------+------+-------------+------------+----------------+------------+-----------+------------+---------------------------+---------------------------------------+-----------+--------+------------------+------------------------+------------------------+--------------+----------------+------------------+-----------+----------+-------------+---------+-----------+------------+--------------+--------------+--------------+---------------+---------------+---------------+---------------+---------------+------------+----------------+---------------------+--------------+------------+---------------+---------+--------------+---------+-----------+--------------+---------------------------+-----------+
|year|product_name|brands|categories_en|countries_en|ingredients_text|serving_size|additives_n|additives_en|ingredients_from_palm_oil_n|ingredients_that_may_be_from_palm_oil_n|energy_100g|fat_100g|saturated-fat_100g|monounsaturated-fat_100g|polyunsaturated-fat_100g|trans-fat_100g|choles

In [125]:
# Check years of edit starting with the most recent one
df.select('year').distinct().sort(desc('year')).show()

# NOTE: For future iterations of the project, look for more recent data!

+----+
|year|
+----+
|2017|
|2016|
|2015|
|2014|
|2013|
|2012|
+----+



In [134]:
# Let's determine the distribution of null values in nutri_score column per country
country_df = df.groupBy('countries_en').agg(count(when(isnull("nutri_score") | col("nutri_score").isNull(), "nutri_score")).alias("total null")).orderBy("total null", ascending = False)
country_df.show(n = country_df.count())

# Top 2 countries are US and France
# The rest of the NULL values comprise less than 1% of total data

+--------------------+----------+
|        countries_en|total null|
+--------------------+----------+
|       United States|     32964|
|              France|     16856|
|             Germany|      2108|
|         Switzerland|      1819|
|               Spain|      1605|
|      United Kingdom|      1386|
|              Russia|       984|
|           Australia|       755|
|             Belgium|       408|
|            Portugal|       180|
|               Italy|       160|
|  France,Switzerland|       141|
|France,United Kin...|       126|
|             Austria|       112|
|          Madagascar|       110|
|      Belgium,France|       104|
|              Brazil|        77|
|              Canada|        74|
|         Netherlands|        74|
|       not specified|        73|
|             Ireland|        62|
|              Serbia|        60|
|             Tunisia|        52|
|      France,Germany|        45|
|              Poland|        44|
|             Romania|        40|
|France,United

In [127]:
# Separate US null rows into a test dataset for subsequent XGBoost, 
# get rid of remaining nulls 
test_set = df.filter((col('countries_en') == 'United States') & (col('nutri_score').isNull()))
train_set = df.subtract(test_set)
train_set = train_set.dropna()

In [128]:
# For each nutri-score, rank from A to E 
# Note that drinks follow a different scale (only water can be ranked A!)
train_set = train_set.withColumn("nutri_score",
    # note that we are matching beverages only at the beginning 
    # to avoid conflict with 'plant-based foods and beverages' which contains solid foods;
    # milk can be matched at any position; making sure to set case-insensitive!                             
    when(col('nutri_score').rlike('(?i)^beverages.*milk.*'),
        when((train_set["nutri_score"] <= 1), "B")
        .when((train_set["nutri_score"] >= 2) & (train_set["nutri_score"] <= 5), "C")
        .when((train_set["nutri_score"] >= 6) & (train_set["nutri_score"] <= 9), "D")
        .when((train_set["nutri_score"] >= 10) & (train_set["nutri_score"] <= 40), "D")
        .otherwise("E"))
    .otherwise(
        when((train_set["nutri_score"] >= -15) & (train_set["nutri_score"] < -1), "A")
        .when((train_set["nutri_score"] >= 0) & (train_set["nutri_score"] <= 2), "B")
        .when((train_set["nutri_score"] >= 3) & (train_set["nutri_score"] <= 10), "C")
        .when((train_set["nutri_score"] >= 11) & (train_set["nutri_score"] <= 18), "D")
        .otherwise("E")))


In [129]:
# Check the final dataframes
# test_set.show()
train_set.show()

+----+--------------------+--------------------+-------------+-------------+--------------------+--------------------+-----------+--------------------+---------------------------+---------------------------------------+-----------+--------+------------------+------------------------+------------------------+--------------+----------------+------------------+-----------+----------+-------------+---------+-----------+------------+--------------+--------------+--------------+---------------+---------------+---------------+---------------+---------------+------------+----------------+---------------------+--------------+------------+---------------+---------+--------------+---------+-----------+--------------+---------------------------+-----------+
|year|        product_name|              brands|categories_en| countries_en|    ingredients_text|        serving_size|additives_n|        additives_en|ingredients_from_palm_oil_n|ingredients_that_may_be_from_palm_oil_n|energy_100g|fat_100g|satu

In [130]:
# SECTION IN PROGRESS! 

# Installing required modules for performing XGBoost 

# import sys
# !{sys.executable} -m pip install xgboost
from xgboost.spark import SparkXGBRegressor


In [131]:
# Determine the features [subject to change!]
features = ['energy_100g', 'fat_100g', 'saturated-fat_100g','monounsaturated-fat_100g',\
            'polyunsaturated-fat_100g',  'trans-fat_100g',  'cholesterol_100g',\
            'carbohydrates_100g', 'sugars_100g', 'fiber_100g', 'proteins_100g',\
            'salt_100g', 'sodium_100g',  'alcohol_100g', 'vitamin-a_100g',\
            'vitamin-d_100g', 'vitamin-c_100g', 'vitamin-b1_100g', 'vitamin-b2_100g',\
            'vitamin-pp_100g', 'vitamin-b6_100g', 'vitamin-b9_100g', 'folates_100g',\
            'vitamin-b12_100g', 'pantothenic-acid_100g', 'potassium_100g', 'calcium_100g',\
            'phosphorus_100g', 'iron_100g', 'magnesium_100g', 'zinc_100g', 'copper_100g',\
            'manganese_100g', 'fruits-vegetables-nuts_100g']

# Define the estimator for model generation 
spark_reg_estimator = SparkXGBRegressor(
  features_col = features,
  label_col = "nutri_score",                # our label_col is string! need to encode first
  use_gpu = True)

# Currently, we are splitting the data randomly at 70%/30% ratio 
trainDF, testDF = train_set.randomSplit([0.7, 0.3], seed = 24)

# NEED PIPELINE! 

# xgb_regressor_model = spark_reg_estimator.fit(trainDF)
# prediction_df = spark_reg_estimator.predict(testDF)


In [132]:
# Future work: edit and evaluate XGBoost model 
#              hyperparameter tuning if needed
#              rerun the model using test_set for testing 
#              check which brands have the highest nutri_scores (groupBy country) -> TBD
#              check which countries render the highest nutri_scores  