In [22]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
import pandas as pd
from typing import Optional
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation

In [2]:
# Allocate 8GB of RAM to the driver
spark = (
    SparkSession.builder.appName("Dessert or Not?")
    .config("spark.driver.memory", "8g")
    .getOrCreate()
)

# Set the maximum number of pd rows to display to make data exploration easier
pd.set_option("display.max_rows", 1000) 


### Exploratory Data Analysis

In [3]:
food = spark.read.csv('./data/epi_r.csv', inferSchema=True, header=True)

print(food.count(), len(food.columns))

food.printSchema()

20057 680
root
 |-- title: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- calories: string (nullable = true)
 |-- protein: double (nullable = true)
 |-- fat: double (nullable = true)
 |-- sodium: double (nullable = true)
 |-- #cakeweek: double (nullable = true)
 |-- #wasteless: double (nullable = true)
 |-- 22-minute meals: double (nullable = true)
 |-- 3-ingredient recipes: double (nullable = true)
 |-- 30 days of groceries: double (nullable = true)
 |-- advance prep required: double (nullable = true)
 |-- alabama: double (nullable = true)
 |-- alaska: double (nullable = true)
 |-- alcoholic: double (nullable = true)
 |-- almond: double (nullable = true)
 |-- amaretto: double (nullable = true)
 |-- anchovy: double (nullable = true)
 |-- anise: double (nullable = true)
 |-- anniversary: double (nullable = true)
 |-- anthony bourdain: double (nullable = true)
 |-- aperitif: double (nullable = true)
 |-- appetizer: double (nullable = true)
 |-- apple: double (nullabl

Many columns contain undesirable characters such as # (from hashtags), and invalid characters. These should be standardised.

In [4]:
def sanitize_column_name(name):
    """Remove unwanted characters from a column name."""
    out = name
    for i, j in ((" ", "_"), ("-", "_"), ("/", "_"), ("&", "and")):
        out = out.replace(i, j)
    # Keep only letters, numbers and underscores
    return "".join(
        [char for char in out if char.isalpha() or char.isdigit() or char == "-"]
    )

In [5]:
food = food.toDF(*[sanitize_column_name(name) for name in food.columns])

Identify binary columns 

In [6]:
is_binary = food.agg(
    # If the number of distinct values in the column is 2, then it's probably binary
    *[(F.size(F.collect_set(x)) == 2).alias(x) for x in food.columns]
).toPandas()

# Unpivot the dataframe to make it easier to read in the terminal
is_binary.unstack().sort_values()

title                  0    False
rating                 0    False
calories               0    False
protein                0    False
fat                    0    False
sodium                 0    False
cakeweek               0    False
wasteless              0    False
passionfruit           0     True
passover               0     True
pasta                  0     True
pastamaker             0     True
pastry                 0     True
pea                    0     True
peach                  0     True
peanut                 0     True
pecan                  0     True
peanutfree             0     True
pear                   0     True
pasadena               0     True
pennsylvania           0     True
pepper                 0     True
pernod                 0     True
persiannewyear         0     True
persimmon              0     True
peanutbutter           0     True
party                  0     True
paris                  0     True
parsley                0     True
orange        

`cakeweek` and `wasteless` do not appear to be binary:

In [7]:
# Show the distinct values for the suspect columns
food.agg(*[F.collect_set(x) for x in ("cakeweek", "wasteless")]).show(1, False)

# Show the first and last few records with non-binary values to check for data alignment issues
(
    food
    .where("cakeweek > 1.0 or wasteless > 1.0")
    .select("title", "rating", "wasteless", "cakeweek", food.columns[-1])
    .show(truncate=False)
)

+-------------------------------+----------------------+
|collect_set(cakeweek)          |collect_set(wasteless)|
+-------------------------------+----------------------+
|[0.0, 1.0, 1188.0, 24.0, 880.0]|[0.0, 1.0, 1439.0]    |
+-------------------------------+----------------------+

+----------------------------------------------------------------+------------------------+---------+--------+------+
|title                                                           |rating                  |wasteless|cakeweek|turkey|
+----------------------------------------------------------------+------------------------+---------+--------+------+
|"Beet Ravioli with Pine Nut ""Goat Cheese"" Rosemary-Cream Sauce| Aged Balsamic Vinegar "|0.0      |880.0   |0.0   |
|"Seafood ""Cataplana"" with Saffron                             | Vermouth               |1439.0   |24.0    |0.0   |
|"""Pot Roast"" of Seitan                                        | Aunt Gloria-Style "    |0.0      |1188.0  |0.0   |
+-----

Since this is only a small number of records compared to the total dataset size, we remove them.

In [8]:
# Keep only legit values for cakeweek and wasteless
food = (
    food.where(
        (F.col("cakeweek").isin([0.0, 1.0]) | F.col("cakeweek").isNull())
        &
        (F.col("wasteless").isin([0.0, 1.0]) | F.col("wasteless").isNull())   
    )
)

print(food.count(), len(food.columns))

20054 680


Classifying variable types

In [9]:
# Columns containing information unique to each record
IDENTIFIERS = ["title"]

# Columns containing ML features
CONTINUOUS_COLUMNS = [
    "rating",
    "calories",
    "protein",
    "fat",
    "sodium"
]

# Columns containing the features we wish to predict
TARGET_COLUMN = ["dessert"]

# Columns containing binary features
BINARY_COLUMNS = [
    x for x in food.columns 
    if x not in CONTINUOUS_COLUMNS 
    and x not in TARGET_COLUMN
    and x not in IDENTIFIERS
]

We remove records that have only `null` values. After that, we equate `null` to `False` and fill zero as a default value. 

In [10]:
# Remove records that have only null values
food = food.dropna(
    how="all",
    subset=[x for x in food.columns if x not in IDENTIFIERS]
)

food = food.dropna(subset=TARGET_COLUMN)

print(food.count(), len(food.columns))

20049 680


In [11]:
food = food.fillna(0.0, subset=BINARY_COLUMNS)

print(food.where(F.col(BINARY_COLUMNS[0]).isNull()).count())

0


Cleaning continuous columns

In [12]:
# If the value is not null, try casting it to a float.
# If this fails, it's not a number.
@F.udf(T.BooleanType())
def is_a_number(value: Optional[str]) -> bool:
    if not value:
        return True
    try:
        _ = float(value)
    except ValueError:
        return False
    return True

In [13]:
# Show non-numerical values in the continuous columns
food.where(~is_a_number(F.col("rating"))).select(*CONTINUOUS_COLUMNS).show()

+---------+------------+-------+----+------+
|   rating|    calories|protein| fat|sodium|
+---------+------------+-------+----+------+
| Cucumber| and Lemon "|   3.75|null|  null|
+---------+------------+-------+----+------+



In [14]:
# Remove rogue continuous column values, cast remaining values to double
for column in CONTINUOUS_COLUMNS:
    food = food.where(is_a_number(F.col(column)))
    food = food.withColumn(column, F.col(column).cast(T.DoubleType()))

print(food.count(), len(food.columns))

20048 680


We examine the summary statistics of our continuous columns to look for remaining non-sensible values.

In [15]:
food.select(CONTINUOUS_COLUMNS).summary(
    "mean",
    "stddev",
    "min",
    "1%",
    "5%",
    "50%",
    "95%",
    "99%",
    "max"
).show()

+-------+------------------+------------------+------------------+-----------------+-----------------+
|summary|            rating|          calories|           protein|              fat|           sodium|
+-------+------------------+------------------+------------------+-----------------+-----------------+
|   mean| 3.714460295291301|6324.0634571930705|100.17385283565179|346.9398083953107|6226.927244193346|
| stddev|1.3409187660508959|359079.83696340164|3840.6809971287403|20458.04034412409|333349.5680370268|
|    min|               0.0|               0.0|               0.0|              0.0|              0.0|
|     1%|               0.0|              18.0|               0.0|              0.0|              1.0|
|     5%|               0.0|              62.0|               0.0|              0.0|              5.0|
|    50%|             4.375|             331.0|               8.0|             17.0|            294.0|
|    95%|               5.0|            1315.0|              75.0|       

Some remaining nutrition values are very high (way higher than the 75th percentile). We cap the values at the 99th percentile.

In [16]:
# Remove rows with values above the 99th percentile.
# Use hardcoded maxima to ensure analysis is consistent across runs.
maximum = {
    "calories": 3184.0,
    "protein": 173.0,
    "fat": 207.0,
    "sodium": 5649.0
}

# Replace large values while holding onto null values:
for k, v in maximum.items():
    food = food.withColumn(
        k,
        F.when(F.isnull(F.col(k)), F.col(k)).otherwise(
            F.least(F.col(k), F.lit(v))
        )
    )   

Weed out binary columns which are not present enough to be reliable predictors. 

In [17]:
# Remove binary features that occur too little or too often
# We choose a threshold of 10.
inst_sum_of_binary_columns = [
    F.sum(F.col(x)).alias(x) for x in BINARY_COLUMNS
]

# Using .head() brings data back from the distributed cluster
# to the Spark driver
sum_of_binary_columns = (
    food.select(*inst_sum_of_binary_columns).head().asDict()
)

num_rows = food.count()
# If the count of the ones / sum of the column is below 10 or above
# the number of records - 10, we remove the column.
too_rare_features = [
    k for k, v in sum_of_binary_columns.items()
    if v < 10 or v > (num_rows - 10)
]

# Rather than deleting the columns from the food dataframe, we remove
# them from the BINARY_COLUMNS list
BINARY_COLUMNS = list(set(BINARY_COLUMNS) - set(too_rare_features))

len(too_rare_features)

print(too_rare_features)

['cakeweek', 'wasteless', '30daysofgroceries', 'alabama', 'alaska', 'anthonybourdain', 'applejuice', 'arizona', 'aspen', 'atlanta', 'australia', 'beverlyhills', 'biscuit', 'boston', 'bran', 'brooklyn', 'brownie', 'buffalo', 'bulgaria', 'burrito', 'cambridge', 'camping', 'canada', 'caviar', 'chicago', 'chili', 'cobblercrumble', 'columbus', 'cooklikeadiner', 'cookbookcritic', 'costamesa', 'cranberrysauce', 'crêpe', 'crmedecacao', 'cuba', 'cupcake', 'custard', 'dallas', 'denver', 'digestif', 'dominicanrepublic', 'doriegreenspan', 'eaudevie', 'eggnog', 'egypt', 'emerillagasse', 'england', 'entertaining', 'epiushg', 'epilovesthemicrowave', 'flatbread', 'frankenrecipe', 'freezerfood', 'friendsgiving', 'frittata', 'fritter', 'germany', 'grains', 'grandmarnier', 'granola', 'grappa', 'guam', 'haiti', 'hamburger', 'hawaii', 'healdsburg', 'hollywood', 'housecocktail', 'houston', 'hummus', 'icedcoffee', 'idaho', 'illinois', 'indiana', 'iowa', 'israel', 'italy', 'jamaica', 'japan', 'juicer', 'kansa

Feature engineering and refinement

- Create a few custom features using continuous feature columns
- Measure correlation between original and generated continuous features

Since we know that there are 4kcal per g of protein and 9kcal per g of fat, we can create feature columns representing the proportion of the recipe's calories that are made up by protein and fat. Perhaps desserts have a higher proportion of fat calories?

In [18]:
# Create columns representing the proportion of calories contributed
# by protein and fat
food = (
    food
    .withColumn("protein_ratio", F.col("protein") * 4 / F.col("calories"))
    .withColumn("fat_ratio", F.col("fat") * 9 / F.col("calories"))
)

# Fill the columns where NA
food.fillna(0.0, subset=["protein_ratio", "fat_ratio"])

# Add the new columns to the CONTINUOUS_COLUMNS list
CONTINUOUS_COLUMNS += ["protein_ratio", "fat_ratio"]

Removing highly correlated features

The `Correlation` object's `corr` method computes the correlation between features in a `Vector`.

We use the `VectorAssembler` transformer to create a `continuous_features` column containing a `Vector` of continuous features.

Correlation won't work well if we blend categorical and/or binary features together. Correlation coefficients, such as Pearson correlation coefficient, are used to quantify the strength and direction of a linear relationship between two numerical variables. They are not suitable for categorical or binary variables because these types of variables do not have a natural linear ordering or continuous values.

In [21]:
continuous_features = VectorAssembler(
    inputCols=CONTINUOUS_COLUMNS, outputCol="continuous_features"
)

# Prepare continuous columns to be transformed into a vector.
# Vector columns can't have null values, so we remove them
vector_food = food.select(CONTINUOUS_COLUMNS)
for x in CONTINUOUS_COLUMNS:
    vector_food = vector_food.where(~F.isnull(F.col(x)))

vector_variable = continuous_features.transform(vector_food)

vector_variable.select("continuous_features").show(3, False)

vector_variable.select("continuous_features").printSchema()

+---------------------------------------------------------------------+
|continuous_features                                                  |
+---------------------------------------------------------------------+
|[2.5,426.0,30.0,7.0,559.0,0.28169014084507044,0.14788732394366197]   |
|[4.375,403.0,18.0,23.0,1439.0,0.17866004962779156,0.5136476426799007]|
|[3.75,165.0,6.0,7.0,165.0,0.14545454545454545,0.38181818181818183]   |
+---------------------------------------------------------------------+
only showing top 3 rows

root
 |-- continuous_features: vector (nullable = true)



In [23]:
# Take a data frame and a Vector column and generate a single-row,
# single-column data frame containing the correlation matrix
# Uses Pearson correlation coefficient by default.
correlation = Correlation.corr(
    vector_variable, "continuous_features"
)

correlation.printSchema()

root
 |-- pearson(continuous_features): matrix (nullable = false)



In [38]:
# The resulting DenseMatrix is a column in a row in a Pyspark DataFrame.
# It is small enough to retrieve to the driver node using collect().
print(correlation.collect()[0][0])

DenseMatrix([[1.        , 0.10220829, 0.11315532, 0.11135616, 0.06561056,
              0.09412346, 0.12947093],
             [0.10220829, 1.        , 0.75817328, 0.91809823, 0.5170557 ,
              0.16501637, 0.17709785],
             [0.11315532, 0.75817328, 1.        , 0.66480495, 0.58562236,
              0.6003379 , 0.10906338],
             [0.11135616, 0.91809823, 0.66480495, 1.        , 0.42208754,
              0.1254943 , 0.42534458],
             [0.06561056, 0.5170557 , 0.58562236, 0.42208754, 1.        ,
              0.33943422, 0.03396702],
             [0.09412346, 0.16501637, 0.6003379 , 0.1254943 , 0.33943422,
              1.        , 0.02374545],
             [0.12947093, 0.17709785, 0.10906338, 0.42534458, 0.03396702,
              0.02374545, 1.        ]])


In [41]:
correlation_array = correlation.collect()[0][0].toArray()
correlation_pd = pd.DataFrame(
    correlation_array,
    index = CONTINUOUS_COLUMNS,
    columns = CONTINUOUS_COLUMNS
)

In [42]:
# Show the Pearson correlations
print(correlation_pd)

                 rating  calories   protein       fat    sodium  \
rating         1.000000  0.102208  0.113155  0.111356  0.065611   
calories       0.102208  1.000000  0.758173  0.918098  0.517056   
protein        0.113155  0.758173  1.000000  0.664805  0.585622   
fat            0.111356  0.918098  0.664805  1.000000  0.422088   
sodium         0.065611  0.517056  0.585622  0.422088  1.000000   
protein_ratio  0.094123  0.165016  0.600338  0.125494  0.339434   
fat_ratio      0.129471  0.177098  0.109063  0.425345  0.033967   

               protein_ratio  fat_ratio  
rating              0.094123   0.129471  
calories            0.165016   0.177098  
protein             0.600338   0.109063  
fat                 0.125494   0.425345  
sodium              0.339434   0.033967  
protein_ratio       1.000000   0.023745  
fat_ratio           0.023745   1.000000  
