In [113]:
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,udf,lit,lower,unix_timestamp,count,coalesce, regexp_extract,year,when,lit,avg
from pyspark.sql.window import Window
import os 

In [2]:
spark = SparkSession.builder.master("local").appName("HelloFreshTest").getOrCreate()

21/12/04 15:00:22 WARN Utils: Your hostname, Sannis-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.29.191 instead (on interface en0)
21/12/04 15:00:22 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/12/04 15:00:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
path = 'input/'

In [4]:
df = spark.read.json(path)

                                                                                

In [17]:
df.printSchema()

root
 |-- cookTime: string (nullable = true)
 |-- datePublished: string (nullable = true)
 |-- description: string (nullable = true)
 |-- image: string (nullable = true)
 |-- ingredients: string (nullable = true)
 |-- name: string (nullable = true)
 |-- prepTime: string (nullable = true)
 |-- recipeYield: string (nullable = true)
 |-- url: string (nullable = true)



In [28]:
df.groupBy("prepTime").agg(count("*")).show()

+--------+--------+
|prepTime|count(1)|
+--------+--------+
|    PT2M|       7|
|   PT15M|     137|
|    PT1H|      11|
|   PT24H|       3|
|  PT950M|       1|
|    PT5M|     108|
|    PT6M|       1|
|    PT3M|       2|
|      PT|       3|
|    PT1M|       5|
|   PT40M|       5|
|   PT20M|      99|
| PT1H15M|       1|
|    PT6H|       3|
|   PT60M|       7|
|    PT4H|       4|
|   PT18H|       1|
|  PT900M|       1|
|   PT65M|       1|
|   PT35M|       3|
+--------+--------+
only showing top 20 rows



In [30]:
df.groupBy("recipeYield").agg(count("*")).show(truncate=False)

+--------------------------------------------+--------+
|recipeYield                                 |count(1)|
+--------------------------------------------+--------+
|Makes four sandwiches.                      |1       |
|Serves about 4 - 6 as a side.               |1       |
|Makes two mega scones.                      |1       |
|Makes about 1 1/2 to 2 cups of puree.       |1       |
|7                                           |1       |
|Serves 2 - 4.                               |4       |
|Makes ~1 1/2 cups of spread.                |1       |
|Makes 18 - 24 medium cookies.               |1       |
|Makes a big pot - enough for 8 -10 servings.|1       |
|Serves 12 - 16 modest slices.               |1       |
|Makes 2 dozen wide-cut fries.               |1       |
|Serves about 12.                            |2       |
|Makes one generous, family-style platter.   |1       |
|Makes one carrot cake.                      |1       |
|Serves 6 or more.                           |1 

In [15]:
df.count()

1042

In [26]:
beefDF = df.filter(lower(col("ingredients")).contains("beef"))

In [31]:
beefDF.count()

47

In [46]:
df.where(col("ingredients").isNull() | (col("ingredients")=="")).show()

+--------+-------------+-----------+-----+-----------+----+--------+-----------+---+
|cookTime|datePublished|description|image|ingredients|name|prepTime|recipeYield|url|
+--------+-------------+-----------+-----+-----------+----+--------+-----------+---+
+--------+-------------+-----------+-----+-----------+----+--------+-----------+---+



In [47]:
###########

In [61]:
from pyspark.sql.types import DateType, StringType, StructField, StructType


class RecipeSchema:
    RECIPE = StructType(
        [
            StructField("name", StringType(), nullable=False),
            StructField("ingredients", StringType(), nullable=False),
            StructField("url", StringType()),
            StructField("image", StringType()),
            StructField("cookTime", StringType()),
            StructField("recipeYield", StringType()),
            StructField("datePublished", DateType()),
            StructField("prepTime", StringType()),
            StructField("description", StringType()),
        ]
    )

In [76]:
df = spark.read.json(path, schema=RecipeSchema.RECIPE).withColumn(
    'cookTime_sec', 
    coalesce(regexp_extract('cookTime', r'(\d+)H', 1).cast('int'), F.lit(0)) * 3600 + 
    coalesce(regexp_extract('cookTime', r'(\d+)M', 1).cast('int'), F.lit(0)) * 60 + 
    coalesce(regexp_extract('cookTime', r'(\d+)S', 1).cast('int'), F.lit(0))
).withColumn(
    'prepTime_sec', 
    coalesce(regexp_extract('prepTime', r'(\d+)H', 1).cast('int'), F.lit(0)) * 3600 + 
    coalesce(regexp_extract('prepTime', r'(\d+)M', 1).cast('int'), F.lit(0)) * 60 + 
    coalesce(regexp_extract('prepTime', r'(\d+)S', 1).cast('int'), F.lit(0))
).withColumn('year',year(col('datePublished')))

# https://stackoverflow.com/questions/67338933/how-to-convert-a-time-value-inside-a-string-from-pt-format-to-seconds

In [77]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- ingredients: string (nullable = true)
 |-- url: string (nullable = true)
 |-- image: string (nullable = true)
 |-- cookTime: string (nullable = true)
 |-- recipeYield: string (nullable = true)
 |-- datePublished: date (nullable = true)
 |-- prepTime: string (nullable = true)
 |-- description: string (nullable = true)
 |-- cookTime_sec: integer (nullable = false)
 |-- prepTime_sec: integer (nullable = false)
 |-- year: integer (nullable = true)



In [78]:
df.show()

+--------------------+--------------------+--------------------+--------------------+--------+-----------+-------------+--------+--------------------+------------+------------+----+
|                name|         ingredients|                 url|               image|cookTime|recipeYield|datePublished|prepTime|         description|cookTime_sec|prepTime_sec|year|
+--------------------+--------------------+--------------------+--------------------+--------+-----------+-------------+--------+--------------------+------------+------------+----+
|Creamy Cheese Gri...|4-1/2 cups Water
...|http://thepioneer...|http://static.the...|   PT45M|          8|   2010-10-14|    PT5M|I have a good, ba...|        2700|         300|2010|
|     Big Steak Salad|2 whole Rib-eye O...|http://thepioneer...|http://static.the...|   PT20M|          4|   2010-10-20|    PT1M|There are few thi...|        1200|          60|2010|
|My Favorite Turke...|3 cups Apple Juic...|http://thepioneer...|http://static.the...|   PT

In [80]:
df.groupBy("year").agg(count("*")).show()

                                                                                

+----+--------+
|year|count(1)|
+----+--------+
|2003|       4|
|2007|      94|
|2006|      35|
|2013|      36|
|2004|      12|
|2012|     137|
|2009|     172|
|2005|      44|
|2010|     170|
|2011|     157|
|2008|     181|
+----+--------+



In [82]:
# repartition subject to change based on data
df.repartition(2).write.partitionBy("year").mode("overwrite").parquet("pre_processed_data")

                                                                                

In [90]:
df = spark.read.parquet("pre_processed_data").filter(lower(col("ingredients")).contains("beef"))

In [92]:
df.count()



47

In [123]:
op = df.withColumn("total_cook_time",col("cookTime_sec")+col("prepTime_sec")).withColumn("difficulty",
when(
(col("total_cook_time") < 1800),lit("EASY")
).otherwise(
when(
(col("total_cook_time") >= 1800) & (col("total_cook_time") <= 3600),lit("MEDIUM")
).otherwise(
when(
(col("total_cook_time") > 3600),lit("HARD"))
))).groupBy("difficulty").agg((avg(col("total_cook_time"))/60).alias("avg_total_cooking_time"))

In [124]:
op.show()



+----------+----------------------+
|difficulty|avg_total_cooking_time|
+----------+----------------------+
|      EASY|                19.625|
|    MEDIUM|                  45.0|
|      HARD|     194.3913043478261|
+----------+----------------------+



In [125]:
op.repartition(1).write.option("header","true").mode("overwrite").csv("output")

                                                                                