https://medium.com/trustyou-engineering/topic-modelling-with-pyspark-and-spark-nlp-a99d063f1a6e

In [67]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Spark NLP")\
    .master("local[4]")\
    .config("spark.driver.memory","16G")\
    .config("spark.driver.maxResultSize", "0") \
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.11:2.6.3")\
    .config("spark.kryoserializer.buffer.max", "1000M")\
    .getOrCreate()

spark

In [51]:
from pyspark.sql import functions as F
path = 'data/Musical_instruments_reviews.csv'
data = spark.read.csv(path, header=True)
text_col = 'reviewText'
review_text = data.select(text_col).filter(F.col(text_col).isNotNull())
review_text

DataFrame[reviewText: string]

In [68]:
data.select(text_col).show()

+--------------------+
|          reviewText|
+--------------------+
|    that's just like|
|The product does ...|
|The primary job o...|
|Nice windscreen p...|
|This pop filter i...|
|So good that I bo...|
|I have used monst...|
|I now use this ca...|
|Perfect for my Ep...|
|Monster makes the...|
|Monster makes a w...|
|I got it to have ...|
|If you are not us...|
|I love it, I used...|
|I bought this to ...|
|I bought this to ...|
|This Fender cable...|
|wanted it just on...|
|I've been using t...|
|Fender cords look...|
+--------------------+
only showing top 20 rows



In [28]:
from sparknlp.base import DocumentAssembler
documentAssembler = DocumentAssembler().setInputCol(text_col).setOutputCol('document')
documentAssembler

DocumentAssembler_fd4a738ba700

In [29]:
from sparknlp.annotator import Tokenizer
tokenizer = Tokenizer() \
     .setInputCols(['document']) \
     .setOutputCol('tokenized')
tokenizer

Tokenizer_44fbea5a4aef

In [30]:
from sparknlp.annotator import Normalizer
normalizer = Normalizer() \
     .setInputCols(['tokenized']) \
     .setOutputCol('normalized') \
     .setLowercase(True)

In [31]:
from sparknlp.annotator import LemmatizerModel
lemmatizer = LemmatizerModel.pretrained() \
     .setInputCols(['normalized']) \
     .setOutputCol('lemmatized')

lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[OK!]


In [38]:
import nltk
from nltk.corpus import stopwords
nltk.download('stopwords')
eng_stopwords = stopwords.words('english')

[nltk_data] Downloading package stopwords to /home/ubuntu/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [41]:
from sparknlp.annotator import StopWordsCleaner
stopwords_cleaner = StopWordsCleaner() \
     .setInputCols(['lemmatized']) \
     .setOutputCol('no_stop_lemmatized') \
     .setStopWords(eng_stopwords)

stopwords_cleaner

StopWordsCleaner_b0209b9bee71

In [42]:
from sparknlp.annotator import PerceptronModel
pos_tagger = PerceptronModel.pretrained('pos_anc') \
     .setInputCols(['document', 'lemmatized']) \
     .setOutputCol('pos')

pos_anc download started this may take some time.
Approximate size to download 4.3 MB
[OK!]


In [44]:
from sparknlp.annotator import Chunker
allowed_tags = ['<JJ>+<NN>', '<NN>+<NN>']
chunker = Chunker() \
     .setInputCols(['document', 'pos']) \
     .setOutputCol('ngrams') \
     .setRegexParsers(allowed_tags)
chunker

Chunker_20016ff25b6d

In [56]:
from sparknlp.base import Finisher
# finisher = Finisher().setInputCols(['unigrams', 'ngrams'])
finisher = Finisher().setInputCols(['ngrams'])
finisher

Finisher_ea2eaf2d963a

In [57]:
from pyspark.ml import Pipeline
pipeline = Pipeline() \
     .setStages([documentAssembler,
                 tokenizer,
                 normalizer,
                 lemmatizer,
                 stopwords_cleaner,
                 pos_tagger,
                 chunker,
                 finisher])
pipeline

Pipeline_a2cbc3446c94

In [59]:
processed_review = pipeline.fit(review_text).transform(review_text)
processed_review

DataFrame[reviewText: string, finished_ngrams: array<string>]

In [62]:
from pyspark.sql.functions import concat
processed_review = processed_review.withColumn('final',
     concat(
#          F.col('finished_unigrams'), 
            F.col('finished_ngrams')))
processed_review

DataFrame[reviewText: string, finished_ngrams: array<string>, final: array<string>]

In [72]:
processed_review.show()

+--------------------+--------------------+--------------------+
|          reviewText|     finished_ngrams|               final|
+--------------------+--------------------+--------------------+
|    that's just like|                  []|                  []|
|The product does ...|[double screened,...|[double screened,...|
|The primary job o...|[primary job, not...|[primary job, not...|
|Nice windscreen p...|[Nice windscreen,...|[Nice windscreen,...|
|This pop filter i...|[pop filter, stud...|[pop filter, stud...|
|So good that I bo...|[heavy cord, last...|[heavy cord, last...|
|I have used monst...|[good reason, Sim...|[good reason, Sim...|
|I now use this ca...|[pedal board, hig...|[pedal board, hig...|
|Perfect for my Ep...|[Epiphone Sherato...|[Epiphone Sherato...|
|Monster makes the...|[best cables, Mon...|[best cables, Mon...|
|Monster makes a w...|[wide array, high...|[wide array, high...|
|I got it to have ...|                  []|                  []|
|If you are not us...|[la