# Task 1

## Recipes data exploration

### Download dataset and install dependencies

In [1]:
# !wget https://s3-eu-west-1.amazonaws.com/dwh-test-resources/recipes.json -O /tmp/recipes.json
# !pip install -U pandas openpyxl

### Import dependencies

In [2]:
from pyspark.sql.types import StringType, StructType, StructField
from pyspark.sql.functions import *
from pyspark.sql import SparkSession, DataFrame
from functools import reduce
from datetime import date

SRC_PATH = '/tmp/recipes.json'
DEST_PATH = '/tmp/result'

### Configure SparkSession

In [3]:
spark = SparkSession.builder.getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

### First impressions of dataset

In [4]:
input_df = spark.read.json(SRC_PATH)
input_df

cookTime,datePublished,description,image,ingredients,name,prepTime,recipeYield,url
PT,2013-04-01,Got leftover East...,http://static.the...,12 whole Hard Boi...,Easter Leftover S...,PT15M,8,http://thepioneer...
PT10M,2011-06-06,I finally have ba...,http://static.the...,3/4 cups Fresh Ba...,Pasta with Pesto ...,PT6M,8,http://thepioneer...
PT15M,2011-09-15,This was yummy. A...,http://static.the...,2 whole Pork Tend...,Herb Roasted Pork...,PT5M,12,http://thepioneer...
PT20M,2012-04-23,I made this for a...,http://static.the...,1 pound Penne 4 w...,Chicken Florentin...,PT10M,10,http://thepioneer...
PT,2011-06-13,Iced coffee is my...,http://static.the...,1 pound Ground Co...,Perfect Iced Coffee,PT8H,24,http://thepioneer...
PT10M,2012-05-31,When I was in Alb...,http://static.the...,"1 whole Onion, Di...",Easy Green Chile ...,PT5M,4,http://thepioneer...
PT5M,2013-03-25,Imagine the Easte...,http://static.the...,4 Tablespoons But...,Krispy Easter Eggs,PT20M,12,http://thepioneer...
PT25M,2012-08-06,Who doesn't love ...,http://static.the...,1 stick Butter 1 ...,Patty Melts,PT10M,4,http://thepioneer...
PT2M,2012-08-10,Note from PW: On ...,http://static.the...,Doughnuts 1-1/8 ...,Yum. Doughnuts!,PT25M,18,http://thepioneer...
PT15M,2012-08-01,This is just a qu...,http://static.the...,1 pound Pasta (fe...,Buttery Lemon Par...,PT5M,8,http://thepioneer...


In [5]:
input_df.printSchema()

root
 |-- cookTime: string (nullable = true)
 |-- datePublished: string (nullable = true)
 |-- description: string (nullable = true)
 |-- image: string (nullable = true)
 |-- ingredients: string (nullable = true)
 |-- name: string (nullable = true)
 |-- prepTime: string (nullable = true)
 |-- recipeYield: string (nullable = true)
 |-- url: string (nullable = true)



In [6]:
input_df.describe()

summary,cookTime,datePublished,description,image,ingredients,name,prepTime,recipeYield,url
count,1042,1042,1042,1042,1042,1042,1042,1042,1042
mean,,,,,,,,9.738404452690167,
stddev,,,,,,,,6.529901914334942,
min,,2003-05-27,IMPORTANT: ...,http://static.the...,(Quantities Depe...,,,,http://thepioneer...
max,PT9M,2013-04-01,We just had a ...,http://www.101coo...,~1 1/2 cups (7 ou...,Zucchini Ricotta ...,PT950M,Serves six.,http://www.101coo...


### Now that I know the names and types of the dataset fields and the null count, I can enforce a simple schema

In [7]:
schema = StructType([
    StructField('cookTime', StringType()),
    StructField('datePublished', StringType()),
    StructField('description', StringType()),
    StructField('image', StringType()),
    StructField('ingredients', StringType()),
    StructField('name', StringType()),
    StructField('prepTime', StringType()),
    StructField('recipeYield', StringType()),
    StructField('url', StringType()),
])

input_df = spark.read.json(SRC_PATH, schema=schema)
input_df.createOrReplaceTempView('raw')

### Validating if 'datePublished' can be converted to date

In [8]:
spark.sql("SELECT count_if(to_date(datePublished) IS NULL) FROM raw")

count_if((to_date(raw.`datePublished`) IS NULL))
0


In [9]:
spark.sql("SELECT min(to_date(datePublished)), max(to_date(datePublished)) FROM raw")

min(to_date(raw.`datePublished`)),max(to_date(raw.`datePublished`))
2003-05-27,2013-04-01


### Validating if 'url' and 'image' have a valid URL format

In [10]:
spark.sql(r"""
    SELECT count_if(url NOT RLIKE '(https?)://(?:[^/$.?#]|\S).\S*') AS url,
           count_if(image NOT RLIKE '(https?)://(?:[^/$.?#]|\S).\S*') AS image 
    FROM raw
""")

url,image
0,0


### Validating if 'name' column has any empty value

In [11]:
spark.sql("SELECT * FROM raw WHERE length(name) = 0")

cookTime,datePublished,description,image,ingredients,name,prepTime,recipeYield,url
,2003-05-27,101 Cookbooks: Pe...,http://www.101coo...,Salt One 35-ounce...,,,,http://www.101coo...
,2006-01-07,A twist on a clas...,http://www.101coo...,onions - 4 medium...,,,,http://www.101coo...
PT45M,2009-10-05,In the realm of g...,http://www.101coo...,1 quart (4 cups) ...,,PT10M,,http://www.101coo...


These three empty occurrences can be filled based on the values of description, image and url

### Validating if 'ingredients' can be converted to array[string\]

In [12]:
spark.sql("""
    SELECT explode(split(ingredients, '\n'))
    FROM raw
    ORDER BY 1 DESC
""")

col
For the glaze:
Fine-grain sea s...
5 1/2 cups / 1.3...
3/4 teaspoon bak...
3/4 cup / 180 ml...
"3 large eggs, at..."
"3 cloves garlic,..."
2/3 cup / 3 oz /...
"2 large eggs, li..."
1/4 teaspoon fin...


In [13]:
spark.sql("""
    SELECT explode(split(ingredients, '\n'))
    FROM raw
    ORDER BY 1
""")

col
Cook time: 10 ...
Cook time: 45 ...
(Quantities Depe...
(add 1/2 Teaspoo...
**Slightly Adapt...
- my favorite fi...
1 Or 2 Slices Of...
1 tablespoon ext...
1/2 Cup Olive Oil
1/2 Hot Chili Pe...


For the subjectiveness of the ingredients column, it is better to keep it as a string

### Validating if 'recipeYield' can be converted to integer

In [14]:
spark.sql("SELECT count(1) FROM raw WHERE CAST(recipeYield AS INT) IS NULL")

count(1)
503


In [15]:
spark.sql("SELECT * FROM raw WHERE CAST(recipeYield AS INT) IS NULL")

cookTime,datePublished,description,image,ingredients,name,prepTime,recipeYield,url
,2009-07-06,From the Big Sur ...,http://www.101coo...,5 cups all-purpos...,Big Sur Bakery Hi...,,,http://www.101coo...
PT30M,2009-08-27,An old-fashioned ...,http://www.101coo...,1 cup plus 2 tabl...,Old-Fashioned Blu...,PT10M,Serves 8 - 10.,http://www.101coo...
PT60M,2009-10-25,An apple and carr...,http://www.101coo...,1/4 cup / 2 ounce...,Apple and Carrot ...,PT10M,,http://www.101coo...
PT15M,2009-11-15,Rustic orange-sce...,http://www.101coo...,3 cups whole whea...,Orange and Oat Sc...,PT10M,,http://www.101coo...
PT10M,2009-12-04,I made these for ...,http://www.101coo...,1/2 cup / 3.5 oz ...,Sparkling Ginger ...,PT30M,,http://www.101coo...
,2009-12-09,This olive oil fl...,http://www.101coo...,4 1/2 cups / 1 lb...,Seeded Flatbread ...,,,http://www.101coo...
PT45M,2010-03-10,"These jammy, fig-...",http://www.101coo...,Dry mix: 1 cup / ...,Figgy Buckwheat S...,PT100M,,http://www.101coo...
PT25M,2010-03-30,These muffins are...,http://www.101coo...,butter to grease ...,Lucia Muffins,PT20M,Makes 10 - 12 muf...,http://www.101coo...
PT60M,2010-04-10,"Dense, gooey choc...",http://www.101coo...,butter for greasi...,Chocolate Cherry ...,PT900M,,http://www.101coo...
PT60M,2010-04-24,"A rustic, minimal...",http://www.101coo...,butter to grease ...,Quinoa Skillet Bread,PT10M,Makes one 10 1/2 ...,http://www.101coo...


### Almost half of 'recipeYield' values are not possible to cast to integer

Analyzing the dataset, this column can sometimes have a sentence that describes the recipe's yields, or even does not have any info at all. For this absence of info, I can consider a null value, for the other ones, it is better to further the column analysis.

In [16]:
spark.sql("SELECT recipeYield FROM raw WHERE CAST(recipeYield AS INT) IS NULL AND length(recipeYield) > 0").show(20, False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

There is no good way to infer an integer value to 'recipeYield' value

### Validating if 'cookTime' and 'prepTime' can be converted to integer

In [17]:
spark.sql("SELECT distinct cookTime FROM raw")

cookTime
PT2M
PT9M
PT22M
PT55M
PT15M
PT1H
PT5M
PT1H10M
PT6M
PT8M


In [18]:
spark.sql("SELECT distinct prepTime FROM raw")

prepTime
PT2M
PT15M
PT1H
PT24H
PT950M
PT5M
PT6M
PT3M
PT
PT1M


In [19]:
spark.sql(r"""
    SELECT 'prepTime' AS source, prepTime RLIKE 'PT(?:\d+H)?(?:\d+M)?', count(1) FROM raw GROUP BY 1, 2
    UNION  
    SELECT 'cookTime' AS source, cookTime RLIKE 'PT(?:\d+H)?(?:\d+M)?', count(1) FROM raw GROUP BY 1, 2
""")

source,prepTime RLIKE PT(?:d+H)?(?:d+M)?,count(1)
prepTime,True,739
cookTime,True,716
prepTime,False,303
cookTime,False,326


Both of these columns are usually on **PT(\d+H)?(\d+M)?** pattern, which indicates hours and minutes to prepare or cook a dish. 

- Visiting some recipes on Pioneer Woman website, the absence of a time info indicates that a dish does not need to be cooked or prepared.
- Visiting some recipes on 101 Cookbooks website, the absence of a time info does not necessary indicates a the dish does not need to be cooked or prepared. It can indicate just the absence of the info at all.

In [20]:
spark.sql(r"""
    SELECT regexp_extract(url, '^(?:https?:\/\/)?(?:[^@\/\n]+@)?(?:www\.)?([^:\/?\n]+)', 1) AS domain, 'prepTime' AS source, prepTime RLIKE 'PT(?:\d+H)?(?:\d+M)?', count(1) FROM raw GROUP BY 1, 2, 3
    UNION  
    SELECT regexp_extract(url, '^(?:https?:\/\/)?(?:[^@\/\n]+@)?(?:www\.)?([^:\/?\n]+)', 1) AS domain, 'cookTime' AS source, cookTime RLIKE 'PT(?:\d+H)?(?:\d+M)?', count(1) FROM raw GROUP BY 1, 2, 3
    ORDER BY 1, 2
""")

domain,source,prepTime RLIKE PT(?:d+H)?(?:d+M)?,count(1)
101cookbooks.com,cookTime,True,177
101cookbooks.com,cookTime,False,326
101cookbooks.com,prepTime,False,303
101cookbooks.com,prepTime,True,200
thepioneerwoman.com,cookTime,True,539
thepioneerwoman.com,prepTime,True,539


This pattern can be used to fully convert the time columns of Pioneer Woman website, but only partially from 101 Cookbooks website

In [21]:
spark.sql(r"""
    SELECT source RLIKE '(PT(?:\d+H)?(?:\d+M)?)' AS test, collect_list(source)
    FROM (
        SELECT prepTime AS source FROM raw 
        WHERE regexp_extract(url, '^(?:https?:\/\/)?(?:[^@\/\n]+@)?(?:www\.)?([^:\/?\n]+)', 1) = '101cookbooks.com'
        UNION
        SELECT cookTime AS source FROM raw 
        WHERE regexp_extract(url, '^(?:https?:\/\/)?(?:[^@\/\n]+@)?(?:www\.)?([^:\/?\n]+)', 1) = '101cookbooks.com'
    ) 
    GROUP BY 1
""").show(2, False)

+-----+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|test |collect_list(source)                                                                                                                                                                     |
+-----+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|true |[PT2M, PT15M, PT950M, PT5M, PT8M, PT3M, PT40M, PT20M, PT60M, PT50M, PT900M, PT65M, PT70M, PT35M, PT100M, PT150M, PT180M, PT245M, PT90M, PT240M, PT45M, PT10M, PT30M, PT25M, PT120M, PT7M]|
|false|[]                                                                                                                                                                                       |
+-----+-----------------------

In [22]:
spark.sql(r"""
    SELECT source, count(1)
    FROM (
        SELECT prepTime AS source FROM raw 
        WHERE regexp_extract(url, '^(?:https?:\/\/)?(?:[^@\/\n]+@)?(?:www\.)?([^:\/?\n]+)', 1) = '101cookbooks.com'
        UNION ALL
        SELECT cookTime AS source FROM raw 
        WHERE regexp_extract(url, '^(?:https?:\/\/)?(?:[^@\/\n]+@)?(?:www\.)?([^:\/?\n]+)', 1) = '101cookbooks.com'
    ) 
    GROUP BY 1
    ORDER BY 2 DESC
""")

source,count(1)
,629
PT10M,105
PT5M,67
PT15M,38
PT20M,35
PT30M,24
PT60M,23
PT45M,17
PT25M,15
PT40M,8


### Implementing function to convert  'PT(?:\d+H)?(?:\d+M)?' pattern to minutes with integer type

In [23]:
def cast_pt_time_to_minutes_integer(col):
    hours = coalesce(regexp_extract(col, r'(\d+)H', 1).cast('int'), lit(0))
    minutes = coalesce(regexp_extract(col, r'(\d+)M', 1).cast('int'), lit(0))
    return hours * 60 + minutes

In [24]:
test_df = input_df

for columnName in ['cookTime', 'prepTime']:
    test_df = test_df.withColumn(columnName, cast_pt_time_to_minutes_integer(col(columnName)))
    
test_df

cookTime,datePublished,description,image,ingredients,name,prepTime,recipeYield,url
0,2013-04-01,Got leftover East...,http://static.the...,12 whole Hard Boi...,Easter Leftover S...,15,8,http://thepioneer...
10,2011-06-06,I finally have ba...,http://static.the...,3/4 cups Fresh Ba...,Pasta with Pesto ...,6,8,http://thepioneer...
15,2011-09-15,This was yummy. A...,http://static.the...,2 whole Pork Tend...,Herb Roasted Pork...,5,12,http://thepioneer...
20,2012-04-23,I made this for a...,http://static.the...,1 pound Penne 4 w...,Chicken Florentin...,10,10,http://thepioneer...
0,2011-06-13,Iced coffee is my...,http://static.the...,1 pound Ground Co...,Perfect Iced Coffee,480,24,http://thepioneer...
10,2012-05-31,When I was in Alb...,http://static.the...,"1 whole Onion, Di...",Easy Green Chile ...,5,4,http://thepioneer...
5,2013-03-25,Imagine the Easte...,http://static.the...,4 Tablespoons But...,Krispy Easter Eggs,20,12,http://thepioneer...
25,2012-08-06,Who doesn't love ...,http://static.the...,1 stick Butter 1 ...,Patty Melts,10,4,http://thepioneer...
2,2012-08-10,Note from PW: On ...,http://static.the...,Doughnuts 1-1/8 ...,Yum. Doughnuts!,25,18,http://thepioneer...
15,2012-08-01,This is just a qu...,http://static.the...,1 pound Pasta (fe...,Buttery Lemon Par...,5,8,http://thepioneer...


This function extracts hour and minutes from the columns and sum them with the result in minutes. When there is absence of a value, the extraction returns 0. But for the 101 Cookbook, a 0 is considered to be null if both columns (prepTime and cookTime) are 0.

This demonstrates that there are different rules for each website, so there is a need to split the data source by domain

### Splitting the data source based on website

In [25]:
spark.sql("SELECT distinct regexp_extract(url, '^(?:https?:\/\/)?(?:[^@\/\n]+@)?(?:www\.)?([^:\/?\n]+)', 1) FROM raw")

"regexp_extract(url, ^(?:https?://)?(?:[^@/ ]+@)?(?:www.)?([^:/? ]+), 1)"
thepioneerwoman.com
101cookbooks.com


In this data source, as identified two websites: thepioneerwoman.com and 101cookbooks.com. Based on these values, I can split the dataset into two dataframes.

In [26]:
input_df = input_df.withColumn('url_domain', regexp_extract(input_df.url, r'^(?:https?:\/\/)?(?:[^@\/\n]+@)?(?:www\.)?([^:\/?\n]+)', 1))

pioneer_woman_df = input_df.filter(input_df.url_domain.like('%thepioneerwoman.com')).drop('url_domain')
cook_books_df = input_df.filter(input_df.url_domain.like('%101cookbooks.com')).drop('url_domain')

In [27]:
pioneer_woman_df

cookTime,datePublished,description,image,ingredients,name,prepTime,recipeYield,url
PT,2013-04-01,Got leftover East...,http://static.the...,12 whole Hard Boi...,Easter Leftover S...,PT15M,8,http://thepioneer...
PT10M,2011-06-06,I finally have ba...,http://static.the...,3/4 cups Fresh Ba...,Pasta with Pesto ...,PT6M,8,http://thepioneer...
PT15M,2011-09-15,This was yummy. A...,http://static.the...,2 whole Pork Tend...,Herb Roasted Pork...,PT5M,12,http://thepioneer...
PT20M,2012-04-23,I made this for a...,http://static.the...,1 pound Penne 4 w...,Chicken Florentin...,PT10M,10,http://thepioneer...
PT,2011-06-13,Iced coffee is my...,http://static.the...,1 pound Ground Co...,Perfect Iced Coffee,PT8H,24,http://thepioneer...
PT10M,2012-05-31,When I was in Alb...,http://static.the...,"1 whole Onion, Di...",Easy Green Chile ...,PT5M,4,http://thepioneer...
PT5M,2013-03-25,Imagine the Easte...,http://static.the...,4 Tablespoons But...,Krispy Easter Eggs,PT20M,12,http://thepioneer...
PT25M,2012-08-06,Who doesn't love ...,http://static.the...,1 stick Butter 1 ...,Patty Melts,PT10M,4,http://thepioneer...
PT2M,2012-08-10,Note from PW: On ...,http://static.the...,Doughnuts 1-1/8 ...,Yum. Doughnuts!,PT25M,18,http://thepioneer...
PT15M,2012-08-01,This is just a qu...,http://static.the...,1 pound Pasta (fe...,Buttery Lemon Par...,PT5M,8,http://thepioneer...


In [28]:
cook_books_df

cookTime,datePublished,description,image,ingredients,name,prepTime,recipeYield,url
,2009-07-06,From the Big Sur ...,http://www.101coo...,5 cups all-purpos...,Big Sur Bakery Hi...,,,http://www.101coo...
PT30M,2009-08-27,An old-fashioned ...,http://www.101coo...,1 cup plus 2 tabl...,Old-Fashioned Blu...,PT10M,Serves 8 - 10.,http://www.101coo...
PT60M,2009-10-25,An apple and carr...,http://www.101coo...,1/4 cup / 2 ounce...,Apple and Carrot ...,PT10M,,http://www.101coo...
PT15M,2009-11-15,Rustic orange-sce...,http://www.101coo...,3 cups whole whea...,Orange and Oat Sc...,PT10M,,http://www.101coo...
PT10M,2009-12-04,I made these for ...,http://www.101coo...,1/2 cup / 3.5 oz ...,Sparkling Ginger ...,PT30M,,http://www.101coo...
,2009-12-09,This olive oil fl...,http://www.101coo...,4 1/2 cups / 1 lb...,Seeded Flatbread ...,,,http://www.101coo...
PT45M,2010-03-10,"These jammy, fig-...",http://www.101coo...,Dry mix: 1 cup / ...,Figgy Buckwheat S...,PT100M,,http://www.101coo...
PT25M,2010-03-30,These muffins are...,http://www.101coo...,butter to grease ...,Lucia Muffins,PT20M,Makes 10 - 12 muf...,http://www.101coo...
PT60M,2010-04-10,"Dense, gooey choc...",http://www.101coo...,butter for greasi...,Chocolate Cherry ...,PT900M,,http://www.101coo...
PT60M,2010-04-24,"A rustic, minimal...",http://www.101coo...,butter to grease ...,Quinoa Skillet Bread,PT10M,Makes one 10 1/2 ...,http://www.101coo...


### Implementing domain rules for Pioneer Woman

It was not identified any special rule for this half of the data source, but it is a good practice to include the *cast_pt_time_to_minutes_integer* function here, as if it was called for the whole data source, a new website could not follow the same structure.

In [29]:
def apply_pioneer_woman_rules(df):
    for columnName in ['cookTime', 'prepTime']:
        df = df.withColumn(columnName, cast_pt_time_to_minutes_integer(col(columnName)))
    return df

### Implementing domain rules for 101 Cookbook

- If cookTime and prepTime are both 0, then they should be null

In [30]:
def transform_cook_prepare_time(df):
    for columnName in ['cookTime', 'prepTime']:
        df = df.withColumn(columnName, when(col(columnName) != '', cast_pt_time_to_minutes_integer(col(columnName))))
    return df

- If name is empty, then it should be searched on url, description and image columns

In [31]:
def extract_recipe_name(df):
    def extract_recipe_name_from_url(col):
        url_last_param = reverse(regexp_extract(reverse(col), '(?i)/?(.+?)/', 1))
        param_without_extension_and_numbers = regexp_replace(url_last_param, r'\d+|(?:\..+)', '')
        recipe_name = initcap(translate(param_without_extension_and_numbers, '-', ' '))
        return when(param_without_extension_and_numbers != '', recipe_name)
    
    def extract_recipe_name_from_description(col):
        description_extract = regexp_extract(df.description, r'101\s+Cookbooks:\s+((?:(?:\w+)|(?:\s+))+)', 1)
        return when(description_extract != '', description_extract)
        
    name_from_description = extract_recipe_name_from_description(df.description)
    name_from_url = extract_recipe_name_from_url(df.url)
    name_from_image = extract_recipe_name_from_url(df.image)

    return when(df.name != '', df.name).otherwise(coalesce(name_from_description,name_from_url,name_from_image))

- If recipeYield has more than one line, only the first should me considered

In [32]:
def extract_recipe_yield(col):
    return split(col, '\n').getItem(0)

In [33]:
def apply_cookbooks_rules(df):
    df = transform_cook_prepare_time(df)
    df = df.withColumn('name', extract_recipe_name(df)).withColumn('recipeYield', extract_recipe_yield(df.recipeYield))
    return df

### Consolidate partitions results

In [34]:
domain_rules = {
    'thepioneerwoman.com': apply_pioneer_woman_rules,
    '101cookbooks.com': apply_cookbooks_rules
}

input_df = input_df.withColumn('datePublished', input_df.datePublished.cast('date'))

input_df_partitions = [
    rule(input_df.filter(input_df.url_domain.like(f'%{url}')).drop('url_domain'))
    for url, rule in domain_rules.items()
]

result_df = reduce(DataFrame.unionAll, input_df_partitions)

In [35]:
result_df

cookTime,datePublished,description,image,ingredients,name,prepTime,recipeYield,url
0,2013-04-01,Got leftover East...,http://static.the...,12 whole Hard Boi...,Easter Leftover S...,15,8,http://thepioneer...
10,2011-06-06,I finally have ba...,http://static.the...,3/4 cups Fresh Ba...,Pasta with Pesto ...,6,8,http://thepioneer...
15,2011-09-15,This was yummy. A...,http://static.the...,2 whole Pork Tend...,Herb Roasted Pork...,5,12,http://thepioneer...
20,2012-04-23,I made this for a...,http://static.the...,1 pound Penne 4 w...,Chicken Florentin...,10,10,http://thepioneer...
0,2011-06-13,Iced coffee is my...,http://static.the...,1 pound Ground Co...,Perfect Iced Coffee,480,24,http://thepioneer...
10,2012-05-31,When I was in Alb...,http://static.the...,"1 whole Onion, Di...",Easy Green Chile ...,5,4,http://thepioneer...
5,2013-03-25,Imagine the Easte...,http://static.the...,4 Tablespoons But...,Krispy Easter Eggs,20,12,http://thepioneer...
25,2012-08-06,Who doesn't love ...,http://static.the...,1 stick Butter 1 ...,Patty Melts,10,4,http://thepioneer...
2,2012-08-10,Note from PW: On ...,http://static.the...,Doughnuts 1-1/8 ...,Yum. Doughnuts!,25,18,http://thepioneer...
15,2012-08-01,This is just a qu...,http://static.the...,1 pound Pasta (fe...,Buttery Lemon Par...,5,8,http://thepioneer...


In [36]:
result_df.printSchema()

root
 |-- cookTime: integer (nullable = true)
 |-- datePublished: date (nullable = true)
 |-- description: string (nullable = true)
 |-- image: string (nullable = true)
 |-- ingredients: string (nullable = true)
 |-- name: string (nullable = true)
 |-- prepTime: integer (nullable = true)
 |-- recipeYield: string (nullable = true)
 |-- url: string (nullable = true)



In [37]:
result_df.describe()

summary,cookTime,description,image,ingredients,name,prepTime,recipeYield,url
count,716.0,1042,1042,1042,1042,739.0,1042,1042
mean,32.65782122905028,,,,,40.53585926928282,9.738404452690167,
stddev,48.14438026738152,,,,,119.58444193548256,6.529901914334942,
min,0.0,IMPORTANT: ...,http://static.the...,(Quantities Depe...,A Blast From the ...,0.0,,http://thepioneer...
max,540.0,We just had a ...,http://www.101coo...,~1 1/2 cups (7 ou...,Zucchini Ricotta ...,1440.0,Serves six.,http://www.101coo...


### Saving result as Parquet

In [38]:
execution_date = date.today().strftime('%Y-%m-%d')
result_df.coalesce(20).write.parquet(f'{DEST_PATH}/execution_date={execution_date}/')
# result_df.toPandas().to_excel('/tmp/result.xlsx')

This file format was chosen because of the following:
- As a compression-optimized, metadata-oriented format, it will have an advantage over plaintext formats, like JSON, CSV.
- As a columnar format, it will have an advantage on ad-hoc queries on columns compared to Avro
- The README from this challenge said that it was used by Kafka extraction jobs, so there is a familiarity/preference of HelloFresh, compared to ORC

The **coalesce** method was used to reduce the possible amount of small files persisted by the write method. This method was used instead of **repartition** to prevent shuffling of data between Spark nodes, at the cost of unbalanced file sizes.

I did not partitioned the dataframe by **datePublished**, because this JSON is a dump from the table, containing historical data. Also, I forced a partition value directly on the write path, was it would be wasteful to create a new column with the execution_date and tell Spark to partition the output by it.

If the process that generates this JSON would just provide the updated/deleted/created information, instead of the entire table, I would consider using a file format that enables CRUD operations (like Apache Hudi, Apache Iceberg and Databricks DeltaLake), and partition by **datePublished** while configuring the history of the file to be kept indefinitely, at the expense of some performance.

Also, if the URL domain could be used as a partition, I would skip entirely the UNION step and persist which partition directly on the filesystem, like the following.

In [39]:
# execution_date = date.today().strftime('%Y-%m-%d')

# for url, rule in domain_rules.items():
#     result_df = rule(input_df.filter(input_df.url_domain.like(f'%{url}')).drop('url_domain'))
#     result_df.coalesce(20).write.parquet(f'/tmp/result/execution_date={execution_date}/url_domain={url}')

# Task 2

### Filtering only recipes that has 'beef' as ingredient

At first, I was only going to consider literal **beef** (ignoring case) on ingredients, but after validating that some recipes did have 'beef' on its name or description, I decided that the ingredient of these recipes should be analyzed.

In [40]:
result_df.createOrReplaceTempView('result')

In [41]:
spark.sql("""
    SELECT count(1) 
    FROM result 
    WHERE lower(ingredients) LIKE '%beef%'
""")

count(1)
47


In [42]:
spark.sql("""
    SELECT count(1) 
    FROM result 
    WHERE lower(ingredients) LIKE '%beef%'
""")

count(1)
47


In [43]:
spark.sql("""
    SELECT url
    FROM result 
    WHERE lower(ingredients) NOT LIKE '%beef%' 
    AND (lower(description) LIKE '%beef%' OR lower(name) LIKE '%beef%') 
""").show(10, False)

+-------------------------------------------------------------------------+
|url                                                                      |
+-------------------------------------------------------------------------+
|http://thepioneerwoman.com/cooking/2011/06/caprese-salad/                |
|http://thepioneerwoman.com/cooking/2010/10/beef-with-snow-peas/          |
|http://thepioneerwoman.com/cooking/2010/08/beer-braised-beef-with-onions/|
|http://thepioneerwoman.com/cooking/2011/03/beef-with-peppers/            |
|http://thepioneerwoman.com/cooking/2011/05/beef-noodle-salad-bowls/      |
|http://thepioneerwoman.com/cooking/2011/02/beef-fajita-nachos/           |
|http://thepioneerwoman.com/cooking/2010/10/chicken-cacciatore/           |
|http://thepioneerwoman.com/cooking/2007/06/olive_cheese_br/              |
|http://www.101cookbooks.com/archives/cracker-lasagna-recipe.html         |
+-------------------------------------------------------------------------+



Then I manually visited these 9 URL's to see the recipe photo and description. I turned out that some occurrences were just mentions and did not mean that the recipe had any beef. But for the four occurrences that really had a beef on the photo, the word 'steak' was on the ingredients list.

In [44]:
from pyspark.ml.feature import StopWordsRemover

remover = StopWordsRemover(inputCol='ingredients', outputCol='final')

df = spark.sql(r"""
    SELECT split(ingredients_element, ' ') AS ingredients
    FROM (
        SELECT explode(split(regexp_replace(lower(ingredients), '[^a-z ]', ''), '\n')) AS ingredients_element
        FROM result
        WHERE lower(ingredients) NOT LIKE '%beef%' 
        AND (lower(description) LIKE '%beef%' OR lower(name) LIKE '%beef%') 
    )
""")

remover.transform(df).where('size(ingredients) > size(final)')\
       .select(explode(array_distinct(col('final'))).alias('ingredients'))\
       .groupby(col('ingredients')).count().orderBy(col('count').desc())

ingredients,count
,8
oil,8
sliced,7
whole,7
olive,7
pepper,6
cup,6
onion,5
salt,5
weight,5


In the end, I preferred to not include any other word on my search.

### Averaging the total cook time based on recipe's difficulty

In [45]:
result_df.printSchema()

root
 |-- cookTime: integer (nullable = true)
 |-- datePublished: date (nullable = true)
 |-- description: string (nullable = true)
 |-- image: string (nullable = true)
 |-- ingredients: string (nullable = true)
 |-- name: string (nullable = true)
 |-- prepTime: integer (nullable = true)
 |-- recipeYield: string (nullable = true)
 |-- url: string (nullable = true)



In [46]:
spark.sql("""
    SELECT CASE WHEN total_cook_time < 30 THEN 'easy' 
                WHEN total_cook_time < 60 THEN 'medium'
                ELSE 'hard ' END AS difficulty, AVG(total_cook_time) AS avg_total_cooking_time 
    FROM (
        SELECT IFNULL(cookTime, 0) + IFNULL(prepTime, 0) AS total_cook_time
        FROM result
        WHERE lower(ingredients) LIKE '%beef%'
        AND IFNULL(cookTime, prepTime) IS NOT NULL
    )
    GROUP BY 1
""")

difficulty,avg_total_cooking_time
hard,174.4814814814815
medium,40.0
easy,19.625


In [47]:
filtered_df = result_df.filter(lower(result_df.ingredients).like('%beef%') & coalesce(result_df.cookTime, result_df.prepTime).isNotNull())
sum_df = filtered_df.select((coalesce(filtered_df.cookTime, lit(0)) + coalesce(filtered_df.prepTime, lit(0))).alias('total_cook_time'))
classified_df = sum_df.select(when(sum_df.total_cook_time < 30, 'easy').when(sum_df.total_cook_time < 60, 'medium').otherwise('hard').alias('difficulty'), sum_df.total_cook_time)
result_df = classified_df.groupby('difficulty').agg(avg(sum_df.total_cook_time).alias('avg_total_cooking_time'))
result_df

difficulty,avg_total_cooking_time
medium,40.0
hard,174.4814814814815
easy,19.625


In [48]:
result_df.repartition(1).write.csv('/tmp/output', header=True)