In [None]:
import os

# Install java
! apt-get update -qq
! apt-get install -y openjdk-8-jdk-headless -qq > /dev/null

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]
! java -version

openjdk version "1.8.0_275"
OpenJDK Runtime Environment (build 1.8.0_275-8u275-b01-0ubuntu1~18.04-b01)
OpenJDK 64-Bit Server VM (build 25.275-b01, mixed mode)


In [None]:
# Install pyspark
! pip install --ignore-installed -q pyspark==2.4.4
! pip install --ignore-installed -q spark-nlp==2.6.2

[K     |████████████████████████████████| 215.7MB 69kB/s 
[K     |████████████████████████████████| 204kB 45.3MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
[K     |████████████████████████████████| 133kB 4.3MB/s 
[?25h

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.sql.functions import *
import sparknlp
from sparknlp.annotator import *
from sparknlp.common import *
from sparknlp.base import *
from sparknlp.training import CoNLL
from sparknlp.embeddings import *

import pyspark.sql.functions as F

In [None]:

def start(gpu=False):
    builder = SparkSession.builder \
        .appName("Spark NLP") \
        .master("local[*]") \
        .config("spark.driver.memory", "8G") \
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")\
        .config("spark.kryoserializer.buffer.max", "1000M")
    if gpu:
        builder.config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp-gpu_2.11:2.5.1")
    else:
        builder.config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.11:2.5.1")

    return builder.getOrCreate()

  
spark = sparknlp.start()

In [None]:
print("Spark NLP version: ", sparknlp.version())
print("Apache Spark version: ", spark.version)

Spark NLP version:  2.6.2
Apache Spark version:  2.4.4


In [None]:
!ls -ltr drive/MyDrive/dataset

total 842908
-rw------- 1 root root     82639 Dec 10 04:07 food_categoies.json
-rw------- 1 root root  11772774 Dec 10 05:40 formatted_recipes.csv
-rw------- 1 root root     13500 Dec 10 06:02 recipes_train.csv
drwx------ 2 root root      4096 Dec 10 14:24 tmp_classifierDL_model
-rw------- 1 root root 851263885 Dec 11 06:40 search-engine-demo.mov


In [None]:
train_ds=spark.read.option("header", "true").csv('drive/MyDrive/dataset/recipes_train.csv')

In [None]:
train_ds.show()

+----------+--------------------+
|  category|                 ing|
+----------+--------------------+
|vegeterian|Apples baked stuf...|
|vegeterian|Broth mushroom Di...|
|vegeterian|Burger/Patty mush...|
|vegeterian|Burger/Patty veg ...|
|vegeterian|Burger/patty veg ...|
|vegeterian|Burger/patty veg ...|
|vegeterian|Burger/patty veg ...|
|vegeterian|Burger/patty veg ...|
|vegeterian|Cake apple spice ...|
|vegeterian|Cake chocolate cr...|
|vegeterian|Cake gingerbread ...|
|vegeterian|Candy almost inst...|
|vegeterian|Cappuccino soy mi...|
|vegeterian|Cashew Balls Grav...|
|vegeterian|Cereal granola go...|
|vegeterian|Cereal oatmeal po...|
|vegeterian| Cheese gooda Hummus|
|vegeterian|Cheesecake berry ...|
|vegeterian|Cheesecake lem un...|
|vegeterian|Chili w/texture s...|
+----------+--------------------+
only showing top 20 rows



In [None]:
train_ds.printSchema()

root
 |-- category: string (nullable = true)
 |-- ing: string (nullable = true)



Skip next step

In [None]:
def arrToStr(arr):
  ret=[]
  for i in arr:
    print("i::",i)
    res=' '.join([str(elem) for elem in i])
    print(res)
    res=res.replace('[','')
    res=res.replace(']','')
    ret.append(res)
  return res

import pandas as pd
df = pd.DataFrame({'category':['gmofree','halal','kosher','vegan','lowcarb','vegetarian'], 'ing':arrToStr([train_ds.select('gmofree').collect()[0] ,train_ds.select('halal').collect()[0],train_ds.select('kosher').collect()[0],train_ds.select('vegan').collect()[0],train_ds.select('lowcarb').collect()[0],
                                                                                                   train_ds.select('vegetarian').collect()[0]])})

In [None]:
document = DocumentAssembler().setInputCol("ing").setOutputCol("document")
tokenizer = Tokenizer().setInputCols(["sentence"]).setOutputCol("token")
sentence = SentenceDetector().setInputCols(['document']).setOutputCol('sentence')

In [None]:
bert_sent = BertSentenceEmbeddings.pretrained('sent_small_bert_L12_768').setInputCols(["document","token"]).setOutputCol("sentence_embeddings")

sent_small_bert_L12_768 download started this may take some time.
Approximate size to download 392.9 MB
[OK!]


In [None]:
use = UniversalSentenceEncoder.pretrained().setInputCols(["document"]).setOutputCol("sentence_embeddings")

tfhub_use download started this may take some time.
Approximate size to download 923.7 MB
[OK!]


In [None]:
#use = UniversalSentenceEncoder.pretrained("tfhub_use_lg","en").setInputCols(["document","token"]).setOutputCol("sentence_embeddings")

tfhub_use_lg download started this may take some time.
Approximate size to download 753.3 MB
[OK!]


In [None]:
from sparknlp.annotator import *

In [None]:
classsifierdl = ClassifierDLApproach().setBatchSize(235).setMaxEpochs(768).setInputCols(["sentence_embeddings"]).setOutputCol("class").setLabelColumn("category").setEnableOutputLogs(True)

In [None]:
trainPL = Pipeline(
    stages = [
        document,
        sentence,
        tokenizer,
        bert_sent,
        classsifierdl
    ])

In [None]:
trainModel = trainPL.fit(train_ds)

In [None]:
trainModel.stages[-1].write().overwrite().save('./tmp_classifierDL_model')

In [None]:
!cp -r ./tmp_classifierDL_model drive/MyDrive/dataset

In [None]:
classsifierdl = ClassifierDLModel.load("./tmp_classifierDL_model").setInputCols(["sentence_embeddings"]).setOutputCol("class")

pipeline = Pipeline(
    stages = [
        document,
        sentence,
        tokenizer,
        bert_sent,
        classsifierdl
    ])

In [None]:
text = [['potato paneer'],['boneless chicken'],['chicken']]

spark_df = spark.createDataFrame(text).toDF("ing")

spark_df.show(truncate=False)

+----------------+
|ing             |
+----------------+
|potato paneer   |
|boneless chicken|
|chicken         |
+----------------+



In [None]:
prediction = pipeline.fit(spark_df).transform(spark_df)

In [None]:

prediction.select("class.result").show()

prediction.select("class.metadata").show(truncate=False)

+------------+
|      result|
+------------+
|[vegeterian]|
|    [nonveg]|
|    [nonveg]|
+------------+

+-------------------------------------------------------------------+
|metadata                                                           |
+-------------------------------------------------------------------+
|[[sentence -> 0, vegeterian -> 0.9526913, nonveg -> 0.04730869]]   |
|[[sentence -> 0, vegeterian -> 5.6160206E-4, nonveg -> 0.99943835]]|
|[[sentence -> 0, vegeterian -> 0.051200792, nonveg -> 0.9487992]]  |
+-------------------------------------------------------------------+



Classify entire dataset

In [None]:
!ls -ltr drive/MyDrive/dataset/

total 11596
-rw------- 1 root root    82639 Dec 10 04:07 food_categoies.json
-rw------- 1 root root 11772774 Dec 10 05:40 formatted_recipes.csv
-rw------- 1 root root    13500 Dec 10 06:02 recipes_train.csv
drwx------ 4 root root     4096 Dec 10 14:24 tmp_classifierDL_model


In [None]:
classify_ds=spark.read.option("header", "true").csv('drive/MyDrive/dataset/formatted_recipes.csv')

In [None]:
classify_ds.print

+--------------------+--------------------+
|                  id|                 ing|
+--------------------+--------------------+
|rmK12Uau.ntP510Ke...|4 skinless bonele...|
|5ZpZE8hSVdPk2ZXo1...|2 (10.75 ounce) c...|
|clyYQv.CplpwJtjNa...|1/2 cup packed br...|
|BmqFAmCrDHiKNwX.I...|1 cup butter soft...|
|N.jCksRjB4MFwbgPF...|8 ounces whole wh...|
|kq.naD.8G19M4UU9d...|2 cups all-purpos...|
|lYrgWNn00EXblOupz...|For potato crust:...|
|Fu0DgGYFUGwc0BBlN...|3 cups all-purpos...|
|MBRNtqELRRuv8zJH4...|1 1/2 cups butter...|
|ZPyPoMiNvgAfrKcRp...|Sauce:  1/2 cup k...|
|ATMxl11LFhuvTbTi9...|1 (15 ounce) can ...|
|2GV8OxOPn2uKhaCVo...|2 cups all-purpos...|
|ybAYM2rjpeMIxNBRf...|1 pound cauliflow...|
|MubUROykDQT0pSI.F...|1 1/2 pounds thic...|
|MV4wp/7ZxjlGseBT0...|1 1/2 cups all-pu...|
|VQF9R/8uOgzCTKrPg...|1/2 cup Parmesan ...|
|wSt5d14gCsdnoX1mc...|4 1/2 cups all-pu...|
|unlpjtMrJRWxUfBoS...|1 pound sweet Ita...|
|Kt0fyyj39oSYwc0nb...|1/2 cup butter  1...|
|6IxFBbINcu1cvybDM...|1/2 cup wh

In [None]:
predictionDS = pipeline.fit(spark_df).transform(classify_ds).cache()

In [None]:
predictionDS.show()

In [22]:
predictionDS.select('ing','class.result').show(truncate=False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+
|ing                                                                                                                                                                                                                                                                                                                                                             