<a href="https://colab.research.google.com/github/kshitijmamgain/NLP/blob/master/Kshitij_Mamgain_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

To run spark in Colab, we need to first install all the dependencies in Colab environment. The commands below would do it

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

Now that we have installed Spark and Java in Colab, it is time to set the environment path that enables us to run PySpark in our Colab environment.

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

Initialize spark and create a test df

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [5]:
df = spark.createDataFrame([{"hello": "world"} for x in range(1000)])
df.show(3)



+-----+
|hello|
+-----+
|world|
|world|
|world|
+-----+
only showing top 3 rows



In [10]:
from google.colab import drive
drive.mount('/content/drive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive


In [11]:
!ls

drive  sample_data  spark-2.4.4-bin-hadoop2.7  spark-2.4.4-bin-hadoop2.7.tgz


In [0]:

dataset = spark.read.format("org.apache.spark.csv").option("delimiter","\t").option("mode", "PERMISSIVE").csv(r'/content/drive/My Drive/Machine Learning/ML/nltk/Restaurant_Reviews.tsv',inferSchema=True, header =True)

In [13]:
dataset.printSchema()

root
 |-- Review: string (nullable = true)
 |-- Liked: integer (nullable = true)



In [14]:
dataset.show()

+--------------------+-----+
|              Review|Liked|
+--------------------+-----+
|Wow... Loved this...|    1|
|  Crust is not good.|    0|
|Not tasty and the...|    0|
|Stopped by during...|    1|
|The selection on ...|    1|
|Now I am getting ...|    0|
|Honestly it didn'...|    0|
|The potatoes were...|    0|
|The fries were gr...|    1|
|      A great touch.|    1|
|Service was very ...|    1|
|  Would not go back.|    0|
|The cashier had n...|    0|
|I tried the Cape ...|    1|
|I was disgusted b...|    0|
|I was shocked bec...|    0|
| Highly recommended.|    1|
|Waitress was a li...|    0|
|This place is not...|    0|
|did not like at all.|    0|
+--------------------+-----+
only showing top 20 rows



So our spark data frame ready

In [0]:
dataset.dropna()
dataset.count()

1000

In [15]:
from pyspark.ml.feature import Tokenizer
tokenization=Tokenizer(inputCol='Review',outputCol='tokens')
tokenized_df=tokenization.transform(dataset)

tokenized_df.show(4,False)

+---------------------------------------------------------------------------------------+-----+-------------------------------------------------------------------------------------------------------+
|Review                                                                                 |Liked|tokens                                                                                                 |
+---------------------------------------------------------------------------------------+-----+-------------------------------------------------------------------------------------------------------+
|Wow... Loved this place.                                                               |1    |[wow..., loved, this, place.]                                                                          |
|Crust is not good.                                                                     |0    |[crust, is, not, good.]                                                                                |


The tokenizer is expected to tokenize the words but in the output above we see the problem. Our words still contain ellipsis '...' or fullstops '.' we need to get rid of these as well. Since ahead we are going to vectorize the words

In [27]:
from pyspark.ml.feature import RegexTokenizer
regexTokenizer = RegexTokenizer(inputCol="Review", outputCol="tokens", pattern="\\W")
tokenized_df=regexTokenizer.transform(dataset)

tokenized_df.show(4,False)

+---------------------------------------------------------------------------------------+-----+------------------------------------------------------------------------------------------------------+
|Review                                                                                 |Liked|tokens                                                                                                |
+---------------------------------------------------------------------------------------+-----+------------------------------------------------------------------------------------------------------+
|Wow... Loved this place.                                                               |1    |[wow, loved, this, place]                                                                             |
|Crust is not good.                                                                     |0    |[crust, is, not, good]                                                                                |
|Not 

In [33]:
# stopwords removal
from pyspark.ml.feature import StopWordsRemover
stopword_removal=StopWordsRemover(inputCol='tokens',outputCol='refined_tokens')
stopword_removal=StopWordsRemover(inputCol='tokens',outputCol='refined_tokens')
refined_df=stopword_removal.transform(tokenized_df)
refined_df.show(10,False)

+---------------------------------------------------------------------------------------------------------------+-----+-------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------+
|Review                                                                                                         |Liked|tokens                                                                                                                               |refined_tokens                                                         |
+---------------------------------------------------------------------------------------------------------------+-----+-------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------+
|Wow... Loved this pla

In [29]:
# Count Vectorizer
from pyspark.ml.feature import CountVectorizer
count_vec=CountVectorizer(inputCol='refined_tokens',outputCol='features')
cv_df=count_vec.fit(refined_df).transform(refined_df)
cv_df.show(4,False)

+---------------------------------------------------------------------------------------+-----+------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------+--------------------------------------------------------------------------------------+
|Review                                                                                 |Liked|tokens                                                                                                |refined_tokens                                                         |features                                                                              |
+---------------------------------------------------------------------------------------+-----+------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------+--------------

In [30]:
count_vec.fit(refined_df).vocabulary

['food',
 'place',
 'good',
 'service',
 'great',
 'back',
 'like',
 'go',
 'time',
 'really',
 'best',
 'ever',
 'friendly',
 'also',
 'one',
 'never',
 'nice',
 'restaurant',
 'amazing',
 've',
 'delicious',
 'vegas',
 'm',
 'experience',
 'pretty',
 'came',
 'even',
 'definitely',
 'staff',
 'love',
 'disappointed',
 'minutes',
 'get',
 'eat',
 'us',
 'won',
 'going',
 'bad',
 'much',
 'got',
 'first',
 'made',
 'chicken',
 'say',
 'think',
 'better',
 'way',
 'worst',
 'salad',
 'menu',
 'stars',
 'pizza',
 'always',
 'well',
 'ordered',
 'steak',
 'wait',
 'quality',
 'fresh',
 'wasn',
 'want',
 'didn',
 'server',
 'sushi',
 'times',
 'll',
 'taste',
 'went',
 'flavor',
 'burger',
 'enough',
 'everything',
 'night',
 'awesome',
 'fantastic',
 'buffet',
 'slow',
 'recommend',
 'know',
 'come',
 'bland',
 'feel',
 'next',
 'meal',
 'atmosphere',
 'still',
 'tasty',
 'order',
 'sauce',
 'little',
 'perfect',
 'worth',
 'coming',
 'lunch',
 'waited',
 'breakfast',
 'selection',
 'exce

In [47]:
#Tf-idf
from pyspark.ml.feature import HashingTF,IDF
hashing_vec=HashingTF(inputCol='refined_tokens',outputCol='tf_features')
hashing_df=hashing_vec.transform(refined_df)
hashing_df.select(['refined_tokens','tf_features']).show(5,False)

+-----------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------+
|refined_tokens                                                         |tf_features                                                                                            |
+-----------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------+
|[wow, loved, place]                                                    |(262144,[33933,59414,61231],[1.0,1.0,1.0])                                                             |
|[crust, good]                                                          |(262144,[113432,153353],[1.0,1.0])                                                                     |
|[tasty, texture, nasty]                                                |(262144,[63367,93123,227406],[1.0,1.0

In [38]:
tf_idf_vec=IDF(inputCol='tf_features',outputCol='tf_idf_features')
tf_idf_df=tf_idf_vec.fit(hashing_df).transform(hashing_df)
tf_idf_df.select(['tf_idf_features']).show(5,False)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|tf_idf_features                                                                                                                                                                                                                    |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|(262144,[33933,59414,61231],[4.51085950651685,5.52246041819533,2.264363880173848])                                                                                                                                                 |
|(262144,[113432,153353],[2.3869662022661804,5.810142490647111])                

In [44]:
tf_idf_df.columns

['Review',
 'Liked',
 'tokens',
 'refined_tokens',
 'tf_features',
 'tf_idf_features']

In [64]:
train_df, test_df = tf_idf_df.randomSplit([0.80, 0.20], seed=0)
train_df.count(), test_df.count()

(806, 194)

In [0]:
from pyspark.ml.classification import LogisticRegression
LR = LogisticRegression().setLabelCol('Liked').setFeaturesCol('tf_idf_features').setMaxIter(100)

In [0]:
# Fit the pipeline to training documents.
model = LR.fit(train_df)


In [67]:
# Make predictions.
predictions = model.transform(test_df)

# Select example rows to display.
predictions.show(5)

+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|              Review|Liked|              tokens|      refined_tokens|         tf_features|     tf_idf_features|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|!....THE OWNERS R...|    0|[the, owners, rea...|[owners, really, ...|(262144,[14,12946...|(262144,[14,12946...|[7.34284573033089...|[0.99935321248107...|       0.0|
|* Both the Hot & ...|    1|[both, the, hot, ...|[hot, sour, egg, ...|(262144,[35263,42...|(262144,[35263,42...|[-80.079608881996...|[1.66673958331062...|       1.0|
|- They never brou...|    0|[they, never, bro...|[never, brought, ...|(262144,[84488,17...|(262144,[84488,17...|[61.3825137191872...|[1.0,2.1974196632...|       0.0|
|Aft

In [0]:
# Select (prediction, true label) and compute test error
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol="Liked", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

In [69]:
accuracy

0.7422680412371134

In [70]:
model.summary.roc.show()

+--------------------+------------------+
|                 FPR|               TPR|
+--------------------+------------------+
|                 0.0|               0.0|
|                 0.0| 0.821608040201005|
|                 0.0|0.8316582914572864|
|                 0.0|0.8417085427135679|
|                 0.0|0.8517587939698492|
|                 0.0|0.8618090452261307|
|                 0.0| 0.871859296482412|
|                 0.0|0.8819095477386935|
|                 0.0|0.8919597989949749|
|                 0.0|0.9020100502512562|
|                 0.0|0.9120603015075377|
|                 0.0|0.9221105527638191|
|                 0.0|0.9321608040201005|
|                 0.0|0.9422110552763819|
|                 0.0|0.9522613065326633|
|                 0.0|0.9623115577889447|
|                 0.0|0.9723618090452262|
|                 0.0|0.9824120603015075|
|                 0.0| 0.992462311557789|
|0.002450980392156...|               1.0|
+--------------------+------------

In [0]:
from pyspark.ml import Pipeline
# Configure an ML pipeline, which consists of three stages: tokenizer, stopwords, hashingTF, tfidf and lr.
regexTokenizer = RegexTokenizer(inputCol="Review", outputCol="tokens", pattern="\\W")
stopword_removal=StopWordsRemover(inputCol=regexTokenizer.getOutputCol(),outputCol='refined_tokens')
hashing_vec=HashingTF(inputCol=stopword_removal.getOutputCol(),outputCol='tf_features')
tf_idf_vec=IDF(inputCol=hashing_vec.getOutputCol(),outputCol='tf_idf_features')

pipeline = Pipeline(stages=[regexTokenizer, stopword_removal, hashing_vec, tf_idf_vec, LR])

In [0]:
from pyspark.ml.classification import LinearSVC

lsvc = LinearSVC(maxIter=100, regParam=0.1).setLabelCol('Liked').setFeaturesCol('tf_idf_features')

modelSV = lsvc.fit(train_df)


In [72]:
predictionsv = modelSV.transform(test_df)
accuracy = evaluator.evaluate(predictionsv)
print(accuracy)
model.summary.roc.show()

0.7680412371134021
+--------------------+------------------+
|                 FPR|               TPR|
+--------------------+------------------+
|                 0.0|               0.0|
|                 0.0| 0.821608040201005|
|                 0.0|0.8316582914572864|
|                 0.0|0.8417085427135679|
|                 0.0|0.8517587939698492|
|                 0.0|0.8618090452261307|
|                 0.0| 0.871859296482412|
|                 0.0|0.8819095477386935|
|                 0.0|0.8919597989949749|
|                 0.0|0.9020100502512562|
|                 0.0|0.9120603015075377|
|                 0.0|0.9221105527638191|
|                 0.0|0.9321608040201005|
|                 0.0|0.9422110552763819|
|                 0.0|0.9522613065326633|
|                 0.0|0.9623115577889447|
|                 0.0|0.9723618090452262|
|                 0.0|0.9824120603015075|
|                 0.0| 0.992462311557789|
|0.002450980392156...|               1.0|
+--------------