In [1]:
import sys
import subprocess
import findspark
findspark.init()
findspark.find()

from pyspark.sql import SQLContext
from pyspark.sql import SparkSession


from pyspark.sql.types import (StructType, StructField, 
                               IntegerType, StringType, Row, FloatType)
from pyspark.sql.functions import * 

#### Entry Points of Spark

In [3]:
spark = SparkSession\
    .builder\
    .master('local')\
    .appName('food_data_pipeline')\
    .config("spark.driver.extraClassPath", "/home/joseluis/driver_jdbc/postgresql-42.6.0.jar")\
    .getOrCreate()

sqlContext = SQLContext(spark)

print("Spark object is created...")

23/05/04 20:03:27 WARN Utils: Your hostname, lamamalona resolves to a loopback address: 127.0.1.1; using 192.168.1.78 instead (on interface eno1)
23/05/04 20:03:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/04 20:03:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark object is created...




#### Download of the data

In [None]:
url = "https://static.openfoodfacts.org/data/en.openfoodfacts.org.products.csv"
output_file = "./download/openfoodfacts-database.csv"

In [None]:
try:
    subprocess.run(['wget','-O', output_file, url], check=True)
except:
    print("Theres something wrong with the download")
    sys.exit(1)
else:
    print("File downloaded successfully...")

#### Open the file as a CSV 

SOME ERROR HERE

In [19]:
""" 
Defining the first column of the dataset as a string because every id it's too long
to represent it as a int 
"""

customSchema = StructType([StructField("code", StringType(), True),
                           StructField("energy-kcal_100g", FloatType(), True),
                           StructField("energy_100g", FloatType(), True),
                           StructField("fat_100g", FloatType(), True),
                           StructField("saturated-fat_100g", FloatType(), True),
                           StructField("carbohydrates_100g", FloatType(), True),
                           StructField("sugars_100g", FloatType(), True),
                           StructField("fiber_100g", FloatType(), True),
                           StructField("proteins_100g", FloatType(), True),
                           StructField("salt_100g", FloatType(), True),
                           StructField("sodium_100g", FloatType(), True),
                           
                           ])


In [20]:
openfoodfactsDF = (
    spark
    .read
    .format("csv")
    .option("header","true")
    .option("schema",customSchema)
    .option("delimiter","\t")
    .option("mode","FAILFAST")
    .load("./download/openfoodfacts-database.csv")
    )

#### Saving as a Parquet

In [None]:
openfoodfactsDF.write.parquet("./download/openfoodfacts-database.parquet")

##### Opening the parquet file

In [4]:
FoodDatabaseDF = (
    spark
    .read
    .format("parquet")
    .load("./download/openfoodfacts-database.parquet/")
)

In [5]:
FoodDatabaseDF = FoodDatabaseDF.select("code",
"url",
"product_name",
"brands",
"categories_en",
"countries_en",
"ingredients_text",
"additives_n",
"additives_en",
"main_category_en",
"image_url",
"energy-kcal_100g",
"energy_100g",
"fat_100g",
"saturated-fat_100g",
"carbohydrates_100g",
"sugars_100g",
"fiber_100g",
"proteins_100g",
"salt_100g",
"sodium_100g",
)

##### Data exploration

In [5]:
FoodDatabaseDF.show(5)

+--------------------+--------------------+----------------+-------+--------------------+------------+--------------------+-----------+------------+----------------+--------------------+----------------+-----------+--------+------------------+------------------+-----------+----------+-------------+---------+-----------+
|                code|                 url|    product_name| brands|       categories_en|countries_en|    ingredients_text|additives_n|additives_en|main_category_en|           image_url|energy-kcal_100g|energy_100g|fat_100g|saturated-fat_100g|carbohydrates_100g|sugars_100g|fiber_100g|proteins_100g|salt_100g|sodium_100g|
+--------------------+--------------------+----------------+-------+--------------------+------------+--------------------+-----------+------------+----------------+--------------------+----------------+-----------+--------+------------------+------------------+-----------+----------+-------------+---------+-----------+
|   00000000000000225|http://world


Amount of rows in the dataset

In [6]:
FoodDatabaseDF.count()

19529


Creating a function that calculate the amount of missing data per column in the dataframe

In [7]:
def missing_values_count(DF):
    
    missing_values_count = DF\
    .select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in DF.columns])

    return missing_values_count


Amount of missing data per column in the dataframe

In [8]:
result = missing_values_count(FoodDatabaseDF)
result.show()

+----+---+------------+------+-------------+------------+----------------+-----------+------------+----------------+---------+----------------+-----------+--------+------------------+------------------+-----------+----------+-------------+---------+-----------+
|code|url|product_name|brands|categories_en|countries_en|ingredients_text|additives_n|additives_en|main_category_en|image_url|energy-kcal_100g|energy_100g|fat_100g|saturated-fat_100g|carbohydrates_100g|sugars_100g|fiber_100g|proteins_100g|salt_100g|sodium_100g|
+----+---+------------+------+-------------+------------+----------------+-----------+------------+----------------+---------+----------------+-----------+--------+------------------+------------------+-----------+----------+-------------+---------+-----------+
|   0|  0|         564|  6952|         9323|          46|            9185|       9185|       12708|            9323|     8934|            2660|       2604|    2740|              4076|              2695|       3266|

                                                                                


Calculating the amount of fields in all the dataset

In [9]:
rows = FoodDatabaseDF.count()
columns = len(FoodDatabaseDF.columns)
total_cells = rows*columns
print(total_cells)

410109



trying to the missing data of all the dataset

In [10]:
missing_values_count.select(sum(*[missing_values_count[column] for column in missing_values_count.columns]))

AttributeError: 'function' object has no attribute 'select'


Showing the categories of the dataset and the number of times that is repeated

In [11]:
categories_table = FoodDatabaseDF.groupBy("categories_en").count().orderBy("count", ascending=False)
categories_table.show()

+--------------------+-----+
|       categories_en|count|
+--------------------+-----+
|                null| 9323|
|              Snacks|  734|
|Snacks,Sweet snac...|  320|
|Condiments,Sauces...|  320|
|Snacks,Sweet snac...|  317|
|       Salted-snacks|  313|
|Snacks,Sweet snac...|  280|
|Desserts,Frozen f...|  258|
|Dairies,Fermented...|  254|
|Snacks,Sweet snac...|  164|
|Plant-based foods...|  164|
|Condiments,Groceries|  148|
|Plant-based foods...|  138|
|        Frozen foods|  132|
|Plant-based foods...|  126|
|Plant-based foods...|  114|
|Meats and their p...|  114|
|Plant-based foods...|  109|
|Dairies,Fermented...|  106|
|Plant-based foods...|   98|
+--------------------+-----+
only showing top 20 rows



Showing the countries of the dataset and the number of times that is repeated

In [12]:
countries_table = FoodDatabaseDF.groupBy("countries_en").count().orderBy("count", ascending=False)
countries_table.show()

+--------------------+-----+
|        countries_en|count|
+--------------------+-----+
|       United States|15013|
|              France| 2268|
|      United Kingdom|  377|
|               Spain|  366|
|              Canada|  297|
|               Italy|  239|
|             Germany|  159|
|France,United Kin...|  132|
|France,United States|   81|
|             Belgium|   63|
|             Ireland|   61|
|                null|   46|
|         Switzerland|   35|
|           Australia|   27|
|      France,Germany|   25|
|         Puerto Rico|   22|
|             Romania|   17|
|              Mexico|   16|
|Germany,United St...|   10|
|              Panama|    9|
+--------------------+-----+
only showing top 20 rows



Showing the number of additives grouped by quantity in the dataset and the number of times that is repeated

In [13]:
additives_table = FoodDatabaseDF.groupBy("additives_n").count().orderBy("count", ascending=False)
additives_table.show()

+-----------+-----+
|additives_n|count|
+-----------+-----+
|       null| 9185|
|          0| 3523|
|          1| 1686|
|          2| 1130|
|          3|  824|
|          4|  694|
|          5|  567|
|          6|  422|
|          7|  404|
|          8|  307|
|          9|  186|
|         10|  129|
|         11|   98|
|         12|   67|
|         13|   63|
|         15|   31|
|         14|   30|
|         16|   27|
|         20|   22|
|         23|   19|
+-----------+-----+
only showing top 20 rows



In [14]:
energy_table = FoodDatabaseDF.groupBy("energy-kcal_100g").count().orderBy("count", ascending=False)
energy_table.show()

+----------------+-----+
|energy-kcal_100g|count|
+----------------+-----+
|            null| 2660|
|               0|  797|
|             500|  351|
|             400|  287|
|             357|  227|
|             200|  210|
|             250|  157|
|             300|  155|
|             375|  153|
|             333|  151|
|             100|  149|
|              50|  137|
|             600|  129|
|             571|  123|
|             286|  111|
| 357.14285714286|  108|
|             429|   97|
|             393|   96|
| 333.33333333333|   90|
|             350|   81|
+----------------+-----+
only showing top 20 rows



In [15]:
energy1_table = FoodDatabaseDF.groupBy("energy_100g").count().orderBy("count", ascending=False)
energy1_table.show()

+-----------+-----+
|energy_100g|count|
+-----------+-----+
|       null| 2604|
|          0|  799|
|       1494|  353|
|       2092|  348|
|       1674|  284|
|        837|  208|
|       1644|  189|
|       1255|  154|
|       1046|  152|
|       1569|  150|
|       1393|  150|
|        418|  149|
|        209|  137|
|       2510|  129|
|       2389|  123|
|       1197|  106|
|       1395|  102|
|        448|   99|
|       1795|   97|
|       1623|   80|
+-----------+-----+
only showing top 20 rows



In [16]:
fat_table = FoodDatabaseDF.groupBy("fat_100g").count().orderBy("count", ascending=False)
fat_table.show()

+--------+-----+
|fat_100g|count|
+--------+-----+
|       0| 4493|
|    null| 2740|
|      50|  234|
|      10|  183|
|      25|  177|
|      20|  161|
|      30|  147|
|     0.5|  131|
|       4|  128|
|    1.79|  119|
|   32.14|  104|
|   28.57|  102|
|       1|   93|
|   16.67|   85|
|   21.43|   82|
|       6|   79|
|      40|   79|
|      12|   77|
|      15|   73|
|    3.57|   72|
+--------+-----+
only showing top 20 rows



In [16]:
s_fat_table = FoodDatabaseDF.groupBy("saturated-fat_100g").count().orderBy("count", ascending=False)
s_fat_table.show()

+------------------+-----+
|saturated-fat_100g|count|
+------------------+-----+
|                 0| 5270|
|              null| 4076|
|                 5|  198|
|              3.57|  144|
|                 1|  140|
|               0.1|  135|
|              7.14|  133|
|                10|  132|
|              3.33|  127|
|                20|  126|
|               0.2|  114|
|               0.5|  109|
|                 3|  104|
|             14.29|   92|
|             17.86|   89|
|   6.6666666666667|   84|
|                 2|   84|
|              6.67|   79|
|              5.36|   79|
|              6.25|   74|
+------------------+-----+
only showing top 20 rows



In [17]:
carbo_table = FoodDatabaseDF.groupBy("carbohydrates_100g").count().orderBy("count", ascending=False)
carbo_table.show()

+------------------+-----+
|carbohydrates_100g|count|
+------------------+-----+
|              null| 2695|
|                 0| 1778|
|                50|  215|
|              3.57|  206|
|                 5|  188|
|                10|  188|
|                60|  181|
|                20|  176|
|                75|  144|
|                80|  136|
|              6.67|  133|
|                40|  127|
|   3.5714285714286|  120|
|                25|  119|
|               100|  117|
|             66.67|  104|
|                12|   97|
|             28.57|   92|
|                70|   87|
|              3.33|   83|
+------------------+-----+
only showing top 20 rows



In [18]:
sug_table = FoodDatabaseDF.groupBy("sugars_100g").count().orderBy("count", ascending=False)
sug_table.show()

+---------------+-----+
|    sugars_100g|count|
+---------------+-----+
|           null| 3266|
|              0| 3191|
|           3.57|  338|
|           3.33|  210|
|              1|  170|
|             20|  163|
|              5|  149|
|             10|  147|
|3.5714285714286|  140|
|3.3333333333333|  139|
|             25|  133|
|             40|  130|
|           6.67|  114|
|            0.5|  110|
|             50|  109|
|              2|  104|
|           7.14|  103|
|              4|   87|
|          38.89|   78|
|          33.33|   72|
+---------------+-----+
only showing top 20 rows



In [19]:
fib_table = FoodDatabaseDF.groupBy("fiber_100g").count().orderBy("count", ascending=False)
fib_table.show()

+---------------+-----+
|     fiber_100g|count|
+---------------+-----+
|           null| 7742|
|              0| 3995|
|            3.6|  324|
|            0.8|  200|
|              2|  194|
|            1.4|  186|
|            2.4|  185|
|            1.2|  182|
|            1.8|  176|
|            3.3|  164|
|             10|  162|
|            1.6|  159|
|              1|  151|
|            7.1|  143|
|              5|  138|
|            2.5|  132|
|3.5714285714286|  126|
|            2.8|  123|
|            1.7|  116|
|            2.3|  112|
+---------------+-----+
only showing top 20 rows



In [20]:
proteins_table = FoodDatabaseDF.groupBy("proteins_100g").count().orderBy("count", ascending=False)
proteins_table.show()

+---------------+-----+
|  proteins_100g|count|
+---------------+-----+
|              0| 3669|
|           null| 2711|
|             10|  303|
|           7.14|  232|
|             25|  211|
|              5|  205|
|           3.33|  203|
|           6.67|  183|
|             20|  183|
|           12.5|  174|
|3.3333333333333|  138|
|6.6666666666667|  127|
|           3.57|  125|
|              4|  121|
|           5.56|  119|
|              8|  114|
|            2.5|  110|
|7.1428571428571|  108|
|           0.83|   98|
|5.5555555555556|   97|
+---------------+-----+
only showing top 20 rows



In [21]:
salt_table = FoodDatabaseDF.groupBy("salt_100g").count().orderBy("count", ascending=False)
salt_table.show()

+---------+-----+
|salt_100g|count|
+---------+-----+
|     null| 4139|
|        0| 2284|
|      0.1|  160|
|    0.125|  141|
|     1.25|  123|
|        1|  119|
|     0.25|  105|
|      1.5|   99|
|     0.03|   99|
|     0.02|   91|
|        2|   87|
|      2.5|   82|
|     0.01|   81|
|      0.3|   77|
|    1.875|   71|
|      0.5|   69|
|   1.5175|   67|
|      0.6|   66|
|      1.1|   62|
|     0.15|   61|
+---------+-----+
only showing top 20 rows



In [22]:
salt_table = FoodDatabaseDF.groupBy("sodium_100g").count().orderBy("count", ascending=False)
salt_table.show()

+-----------+-----+
|sodium_100g|count|
+-----------+-----+
|       null| 4139|
|          0| 2284|
|       0.04|  159|
|       0.05|  141|
|        0.5|  123|
|        0.4|  119|
|        0.1|  105|
|      0.012|   99|
|        0.6|   99|
|      0.008|   91|
|        0.8|   87|
|          1|   82|
|      0.004|   81|
|       0.12|   77|
|       0.75|   72|
|        0.2|   70|
|      0.607|   67|
|       0.24|   66|
|       0.44|   62|
|       0.06|   61|
+-----------+-----+
only showing top 20 rows



#### Cleaning the data


Drop rows with null data of the columns "product_name" and "brands"

In [17]:
FoodDatabaseDF = FoodDatabaseDF.na.drop(subset=["product_name","brands"])

In [18]:
FoodDatabaseDF.count()

12418

Handling with missing values in categorical data

In the next cases, we will imput the value "missing", to treats it as a separate category

In [19]:
FoodDatabaseDF = FoodDatabaseDF.na.fill(value="missing",subset=["categories_en"])

In [20]:
FoodDatabaseDF = FoodDatabaseDF.na.fill(value="missing",subset=["ingredients_text"])

In [21]:
FoodDatabaseDF = FoodDatabaseDF.na.fill(value='missing',subset=["additives_en"])

In [22]:
FoodDatabaseDF = FoodDatabaseDF.na.fill(value='missing',subset=["main_category_en"])

In [23]:
FoodDatabaseDF = FoodDatabaseDF.na.fill(value='missing',subset=["image_url"])

Creating a function that imputing the most repeated data on the column of origin

In [24]:
def categorical_mode_imputing(DF, table, column_name):

    # getting the bigger value of the agrouped table countries
    max_count = table.where(col(column_name) != 'null').select(max("count")).first()[0]

    #getting the name of the country that is the most repeated 
    table_mode = table.select(column_name).where(col("count") == max_count).first()[0]

    #Filling the null fields
    DF = DF.na.fill(value = table_mode,subset=[column_name])

    return DF

In [25]:
FoodDatabaseDF = categorical_mode_imputing(FoodDatabaseDF, countries_table, "countries_en")

In [26]:
FoodDatabaseDF = categorical_mode_imputing(FoodDatabaseDF, additives_table ,"additives_n")

Imputing numerical data

Creating a function that calculate the mean of a specific column

In [27]:
def mean_numerical_tables(DF, column ):

    table = DF.where(col(column) != 'null').select(column)
    table = table.withColumn(column, col(column).cast('float'))

    total_values = table.count()
    total_sum = table.select(sum(table[column])).first()[0]
    mean = total_sum/total_values

    return mean


Creating a function that imputing the mean in a specific column

In [28]:
def imputing_mean (DF, column):

    mean = mean_numerical_tables(DF, column)
    DF = DF.withColumn(column, col(column).cast('float'))
    DF = DF.na.fill(value = mean ,subset=[column])
    
    return DF

In [29]:
column_list = ["energy-kcal_100g", "energy_100g", "fat_100g", "saturated-fat_100g", "carbohydrates_100g",
                "sugars_100g", "fiber_100g", "proteins_100g", "salt_100g", "sodium_100g"]

In [30]:
for column in column_list:
    FoodDatabaseDF = imputing_mean(FoodDatabaseDF, column)

In [31]:
result = missing_values_count(FoodDatabaseDF)
result.show()

+----+---+------------+------+-------------+------------+----------------+-----------+------------+----------------+---------+----------------+-----------+--------+------------------+------------------+-----------+----------+-------------+---------+-----------+
|code|url|product_name|brands|categories_en|countries_en|ingredients_text|additives_n|additives_en|main_category_en|image_url|energy-kcal_100g|energy_100g|fat_100g|saturated-fat_100g|carbohydrates_100g|sugars_100g|fiber_100g|proteins_100g|salt_100g|sodium_100g|
+----+---+------------+------+-------------+------------+----------------+-----------+------------+----------------+---------+----------------+-----------+--------+------------------+------------------+-----------+----------+-------------+---------+-----------+
|   0|  0|           0|     0|            0|           0|               0|          0|           0|               0|        0|               0|          0|       0|                 0|                 0|          0|

Load of the data

In [32]:
import os

In [33]:
os.environ['DB_USER'] = "user_data"
os.environ['DB_PASSWORD'] = "admin"
os.environ['DB_HOST'] = "localhost"
os.environ['DB_PORT'] = "5432"
os.environ['DB_NAME'] = "myinner_db"

In [34]:
db_user = os.environ['DB_USER'] 
db_password = os.environ['DB_PASSWORD']
db_host = os.environ['DB_HOST']
db_port = os.environ['DB_PORT']
db_name = os.environ['DB_NAME']


In [35]:
POSTGRES_CONFIG = {
    "url":f"jdbc:postgresql://{db_host}:{db_port}/{db_name}",
    "properties":{
        "user": db_user, 
        "password":db_password,
        "driver":"org.postgresql.Driver",
    },
}

In [36]:
POSTGRES_CONFIG

{'url': 'jdbc:postgresql://localhost:5432/myinner_db',
 'properties': {'user': 'user_data',
  'password': 'admin',
  'driver': 'org.postgresql.Driver'}}

In [38]:
FoodDatabaseDF.select("*").write.jdbc(**POSTGRES_CONFIG, table="FoodDatabaseDF", mode="overwrite")

In [96]:
spark.stop()