In [1]:
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as f

In [2]:
spark = SparkSession \
    .builder \
    .appName('Indian Cuisine Data') \
    .getOrCreate()

In [3]:
spark

In [4]:
df = (spark.read
          .format("csv")
          .options(header='True', nullValue=-1, inferSchema='True')
          .csv("indian_food.csv"))

## 1. Some statistics

In [5]:
df.toPandas().head()

Unnamed: 0,name,ingredients,diet,prep_time,cook_time,flavor_profile,course,state,region
0,Balu shahi,"Maida flour, yogurt, oil, sugar",vegetarian,45.0,25.0,sweet,dessert,West Bengal,East
1,Boondi,"Gram flour, ghee, sugar",vegetarian,80.0,30.0,sweet,dessert,Rajasthan,West
2,Gajar ka halwa,"Carrots, milk, sugar, ghee, cashews, raisins",vegetarian,15.0,60.0,sweet,dessert,Punjab,North
3,Ghevar,"Flour, ghee, kewra, milk, clarified butter, su...",vegetarian,15.0,30.0,sweet,dessert,Rajasthan,West
4,Gulab jamun,"Milk powder, plain flour, baking powder, ghee,...",vegetarian,15.0,40.0,sweet,dessert,West Bengal,East


In [6]:
df.count()

255

In [7]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- ingredients: string (nullable = true)
 |-- diet: string (nullable = true)
 |-- prep_time: integer (nullable = true)
 |-- cook_time: integer (nullable = true)
 |-- flavor_profile: string (nullable = true)
 |-- course: string (nullable = true)
 |-- state: string (nullable = true)
 |-- region: string (nullable = true)



In [8]:
df.summary().toPandas()

Unnamed: 0,summary,name,ingredients,diet,prep_time,cook_time,flavor_profile,course,state,region
0,count,255,255,255,225.0,227.0,226,255,231,241
1,mean,,,,35.38666666666666,38.91189427312775,,,,
2,stddev,,,,76.24108144563532,49.42171058724399,,,,
3,min,Adhirasam,"Aloo, tomatoes, mustard oil, bay leaf, cinnamo...",non vegetarian,5.0,2.0,bitter,dessert,Andhra Pradesh,Central
4,25%,,,,10.0,20.0,,,,
5,50%,,,,10.0,30.0,,,,
6,75%,,,,20.0,45.0,,,,
7,max,Zunka,"Yogurt, milk, nuts, sugar",vegetarian,500.0,720.0,sweet,starter,West Bengal,West


In [9]:
#check duplicates
if df.count() > df.dropDuplicates(df.columns).count():
    raise ValueError('Data has duplicates')

## 2. Missing data

In [10]:
df.select([
    (1 - (f.count(c)/f.count('*'))).alias(c) for c in df.columns]).toPandas()

Unnamed: 0,name,ingredients,diet,prep_time,cook_time,flavor_profile,course,state,region
0,0.0,0.0,0.0,0.117647,0.109804,0.113725,0.0,0.094118,0.054902


In [11]:
from pyspark.ml.feature import Imputer

df2 = df.drop("state", "region")
df2 = df2.dropna(subset='cook_time')
imputer = Imputer()\
    .setInputCol('prep_time')\
    .setStrategy('mode')\
    .setOutputCol('prep_time')

In [12]:
df2 = imputer.fit(df2).transform(df2)
df2.show(5)

+--------------+--------------------+----------+---------+---------+--------------+-------+
|          name|         ingredients|      diet|prep_time|cook_time|flavor_profile| course|
+--------------+--------------------+----------+---------+---------+--------------+-------+
|    Balu shahi|Maida flour, yogu...|vegetarian|       45|       25|         sweet|dessert|
|        Boondi|Gram flour, ghee,...|vegetarian|       80|       30|         sweet|dessert|
|Gajar ka halwa|Carrots, milk, su...|vegetarian|       15|       60|         sweet|dessert|
|        Ghevar|Flour, ghee, kewr...|vegetarian|       15|       30|         sweet|dessert|
|   Gulab jamun|Milk powder, plai...|vegetarian|       15|       40|         sweet|dessert|
+--------------+--------------------+----------+---------+---------+--------------+-------+
only showing top 5 rows



In [13]:
df2.select([
    (1 - (f.count(c)/f.count('*'))).alias(c) for c in df2.columns]).toPandas()

Unnamed: 0,name,ingredients,diet,prep_time,cook_time,flavor_profile,course
0,0.0,0.0,0.0,0.0,0.0,0.105727,0.0


## 3. One hot encode categorical data

In [14]:
from pyspark.sql.functions import array
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml.feature import Tokenizer
tokenizer = Tokenizer().setInputCol("ingredients").setOutputCol("tokenized")
wordsData = tokenizer.transform(df2)
hashingTF = HashingTF(numFeatures=pow(2, 9), inputCol='tokenized', outputCol="features")
tf = hashingTF.transform(wordsData)
idf = IDF(inputCol="features", outputCol="outfeats").fit(tf)
tfidf = idf.transform(tf)
tfidf.show(5)

+--------------+--------------------+----------+---------+---------+--------------+-------+--------------------+--------------------+--------------------+
|          name|         ingredients|      diet|prep_time|cook_time|flavor_profile| course|           tokenized|            features|            outfeats|
+--------------+--------------------+----------+---------+---------+--------------+-------+--------------------+--------------------+--------------------+
|    Balu shahi|Maida flour, yogu...|vegetarian|       45|       25|         sweet|dessert|[maida, flour,, y...|(512,[48,243,362,...|(512,[48,243,362,...|
|        Boondi|Gram flour, ghee,...|vegetarian|       80|       30|         sweet|dessert|[gram, flour,, gh...|(512,[117,143,456...|(512,[117,143,456...|
|Gajar ka halwa|Carrots, milk, su...|vegetarian|       15|       60|         sweet|dessert|[carrots,, milk,,...|(512,[35,143,176,...|(512,[35,143,176,...|
|        Ghevar|Flour, ghee, kewr...|vegetarian|       15|       30|  

In [15]:
df2 = tfidf.drop("tokenized", "features")

In [16]:
from pyspark.ml.feature import Bucketizer
bucketizer = Bucketizer(splits=[ 0, 30, 60, float('Inf')],inputCol="cook_time", outputCol="bin_cook_time")
df2 = bucketizer.transform(df2).drop("cook_time")

In [44]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline

strIdx = StringIndexer(inputCols=['diet', 'course', 'flavor_profile'], outputCols=['dietIdx', 'courseIdx', 'flavorIdx'])
strIdx.setHandleInvalid('skip')
ohe = OneHotEncoder(inputCols=['dietIdx', 'courseIdx', 'flavorIdx'], outputCols=['dietOhe', 'courseOhe', 'flavorOhe'])
pipeline = Pipeline(stages=[strIdx, ohe])
df3 = pipeline.fit(df2).transform(df2).drop("diet", "course", "name", "ingredients", "flavor_profile", 'dietIdx', 'courseIdx', 'flavorIdx')

In [46]:
df3.show(5)

+---------+--------------------+-------------+-------------+-------------+-------------+
|prep_time|            outfeats|bin_cook_time|      dietOhe|    courseOhe|    flavorOhe|
+---------+--------------------+-------------+-------------+-------------+-------------+
|       45|(512,[48,243,362,...|          0.0|(1,[0],[1.0])|(3,[1],[1.0])|(3,[1],[1.0])|
|       80|(512,[117,143,456...|          1.0|(1,[0],[1.0])|(3,[1],[1.0])|(3,[1],[1.0])|
|       15|(512,[35,143,176,...|          2.0|(1,[0],[1.0])|(3,[1],[1.0])|(3,[1],[1.0])|
|       15|(512,[29,52,55,64...|          1.0|(1,[0],[1.0])|(3,[1],[1.0])|(3,[1],[1.0])|
|       15|(512,[45,63,93,14...|          1.0|(1,[0],[1.0])|(3,[1],[1.0])|(3,[1],[1.0])|
+---------+--------------------+-------------+-------------+-------------+-------------+
only showing top 5 rows

