# PySpark NLP with Spark NLP

Set-up for spark nlp


In [None]:
#!wget http://setup.johnsnowlabs.com/colab.sh -O - | bash

Get the data

In [None]:
"""
!wget https://archive.ics.uci.edu/ml/machine-learning-databases/00331/sentiment%20labelled%20sentences.zip
!unzip 'sentiment labelled sentences'
!mkdir data
!mv  "sentiment labelled sentences"/* data
!cp -r data/yelp_labelled.txt ./ 
!ls
"""

Read the data

In [None]:
import sparknlp
spark = sparknlp.start()
df = spark.read.csv("yelp_labelled.txt",header=False,sep='\t',inferSchema=True)
df = df.withColumnRenamed("_c1", "label")
df = df.withColumnRenamed("_c0", "text")
df.show()
df.printSchema()

Create and assemble the pipeline


In [None]:
from sparknlp.annotator import *
from sparknlp.common import *
from sparknlp.base import *
from pyspark.ml.feature import StopWordsRemover, CountVectorizer, StringIndexer
from pyspark.ml import Pipeline

#doc assembler
assembler = DocumentAssembler().setInputCol('text').setOutputCol('document')
#tokenizer
tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol('tokenized')
#normalizer
normalizer = Normalizer().setInputCols(['tokenized']).setOutputCol('normalized')
#rm stop words
cleaner = StopWordsCleaner().setInputCols('normalized').setOutputCol('cleaned').setCaseSensitive(False)
#lemmatizer
lemmatizer = Stemmer().setInputCols(['cleaned']).setOutputCol('lemmatized')
#finisher
finisher = Finisher().setInputCols(["lemmatized"]).setOutputCols(["finished"]).setOutputAsArray(True).setCleanAnnotations(False)
#CountVectorizer
cv = CountVectorizer(inputCol="finished", outputCol="features")

pipeline = Pipeline(
    stages=[
            assembler,
            tokenizer,
            normalizer,
            cleaner,
            lemmatizer,
            finisher,
            cv
    ]
)
model = pipeline.fit(df)
ready = model.transform(df)
ready.show()

Split Data

In [None]:
(train, test) = ready.randomSplit([0.7, 0.3])
print(train.count())
print(train.show())
print(test.count())
print(test.show())

Testing Models

In [None]:
from pyspark.ml.classification import LinearSVC
lsvcModel = LinearSVC().fit(train)
predictions = lsvcModel.transform(test)
results = predictions.select("text","label","prediction")
results.show()

In [None]:
from sklearn.metrics import classification_report, confusion_matrix,accuracy_score
true = predictions.select("label").toPandas()
pred = predictions.select("prediction").toPandas()
print(classification_report(true.label, pred.prediction))
print(confusion_matrix(true,pred))
print(accuracy_score(true.label, pred.prediction))

In [None]:
from pyspark.ml.classification import LogisticRegression
lrModel = LogisticRegression().fit(train)
predictions = lrModel.transform(test)
results = predictions.select("text","label","prediction")
results.show()

In [None]:
true = predictions.select("label").toPandas()
pred = predictions.select("prediction").toPandas()
print(classification_report(true.label, pred.prediction))
print(confusion_matrix(true,pred))
print(accuracy_score(true.label, pred.prediction))

Note: For random forest classifier, changing the parameters can greatly improve performance

In [None]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="label", featuresCol="features")
rfModel = rf.fit(train)
predictions = rfModel.transform(test)
results = predictions.select("text","label","prediction")
results.show()

In [None]:
true = predictions.select("label").toPandas()
pred = predictions.select("prediction").toPandas()
print(classification_report(true.label, pred.prediction))
print(confusion_matrix(true,pred))
print(accuracy_score(true.label, pred.prediction))