In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('nlp').getOrCreate()

In [0]:
from pyspark.ml.feature import Tokenizer,RegexTokenizer

In [0]:
from pyspark.sql.functions import col,udf
from pyspark.sql.types import IntegerType

In [0]:
sentence_df = spark.createDataFrame([
  (0, 'Hello everyone and welcome to the tools for natural language processing lecture part 1.')
   ,(1, 'Before we jump straight into the code along project I want to take a brief moment to explore a few of the tools that SPARC has for dealing with text data understanding these tools.')
   ,(2, 'Were going to be able to use them easily in our custom coatl project.')
   ,(3, 'So were going to learn a lot of these basic features that youll find yourself using all the time.')
   ,(4, 'If,you,end,up,dealing,with,texts,they,show,a,spark,and,Python.')
],['id','sentence'])

In [0]:
sentence_df.show()

In [0]:
tokenizer = Tokenizer(inputCol='sentence',outputCol='words')

In [0]:
regex_tokenizer = RegexTokenizer(inputCol='sentence',outputCol='words',pattern='\\W')

In [0]:
count_tokens=udf(lambda words:len(words),IntegerType())

In [0]:
df_tokenized = tokenizer.transform(sentence_df)

In [0]:
df_tokenized.show()

In [0]:
df_tokenized.withColumn('tokens',count_tokens(col('words'))).show()

In [0]:
regex_tokenized = regex_tokenizer.transform(sentence_df)

In [0]:
regex_tokenized.withColumn('tokens',count_tokens(col('words'))).show()

In [0]:
from pyspark.ml.feature import StopWordsRemover

In [0]:
sentence_df2 = spark.createDataFrame([
  (0, ['Hello','everyone','and','welcome to the tools for natural language processing lecture part 1.'])
   ,(1, ['Before','we','jump','straight into the code along project I want to take a brief moment to explore a few of the tools that SPARC has for dealing with text data understanding these tools.'])
   ,(2, ['Were','going','to','be able to use them easily in our custom coatl project.'])
   ,(3, ['So','were','going','to learn a lot of these basic features that youll find yourself using all the time.'])
   ,(4, ['If','you','end','up','dealing','with','texts','they','show','a','spark','and','Python.'])
],['id','tokens'])

In [0]:
sentence_df2.show()

In [0]:
remover = StopWordsRemover(inputCol='tokens',outputCol='filtered')

In [0]:
remover.transform(sentence_df2).show()

In [0]:
from pyspark.ml.feature import NGram

In [0]:
word_df = spark.createDataFrame([
  (0, ['Hello','everyone','and','welcome to the tools for natural language processing lecture part 1.'])
   ,(1, ['Before','we','jump','straight into the code along project I want to take a brief moment to explore a few of the tools that SPARC has for dealing with text data understanding these tools.'])
   ,(2, ['Were','going','to','be able to use them easily in our custom coatl project.'])
   ,(3, ['So','were','going','to learn a lot of these basic features that youll find yourself using all the time.'])
   ,(4, ['If','you','end','up','dealing','with','texts','they','show','a','spark','and','Python.'])
],['id','words'])

In [0]:
ngram = NGram(n=2,inputCol='words',outputCol='grams')

In [0]:
ngram.transform(word_df).show()

In [0]:
ngram.transform(word_df).select('grams').show(truncate=False)

In [0]:
from pyspark.ml.feature import HashingTF,IDF,Tokenizer

In [0]:
df_tokenized.show(truncate=False)

In [0]:
hashing_tf = HashingTF(inputCol='words',outputCol='rawFeatures')

In [0]:
df_featurized = hashing_tf.transform(df_tokenized)

In [0]:
idf = IDF(inputCol='rawFeatures',outputCol='features')

In [0]:
idf_model = idf.fit(df_featurized)

In [0]:
df_rescaled = idf_model.transform(df_featurized)

In [0]:
df_rescaled.select('id','features').show()

In [0]:
from pyspark.ml.feature import CountVectorizer

In [0]:
df = spark.createDataFrame([
  (0,'a b c'.split(' '))
  ,(1, 'a b b c a'.split(' '))
], ['id', 'words'])

In [0]:
cv = CountVectorizer(inputCol='words',outputCol='features',vocabSize=3,minDF=2.0)

In [0]:
cv_model = cv.fit(df)

In [0]:
results = cv_model.transform(df)

In [0]:
results.show(truncate=False)

In [0]:
df_spam = spark.read.csv('/FileStore/tables/SMSSpamCollection',inferSchema=True,sep='\t')

In [0]:
df_spam.show()

In [0]:
df_spam = df_spam.withColumnRenamed('_c0','class').withColumnRenamed('_c1','text')

In [0]:
from pyspark.sql.functions import length

In [0]:
df_spam = df_spam.withColumn('length',length(df_spam['text']))

In [0]:
df_spam.show()

In [0]:
df_spam.groupBy('class').mean().show()

In [0]:
from pyspark.ml.feature import StringIndexer

In [0]:
spam_tokenizer = Tokenizer(inputCol='text',outputCol='token_text')
spam_stop_remover = StopWordsRemover(inputCol='token_text',outputCol='stop_token')
spam_count_vectorizer = CountVectorizer(inputCol='stop_token',outputCol='count_vec')
idf = IDF(inputCol='count_vec',outputCol='tf_idf')
ham_spam_to_numeric = StringIndexer(inputCol='class',outputCol='label')

In [0]:
from pyspark.ml.feature import VectorAssembler

In [0]:
data_cleanser = VectorAssembler(inputCols=['tf_idf','length'],outputCol='features')

In [0]:
from pyspark.ml.classification import NaiveBayes

In [0]:
nb = NaiveBayes()

In [0]:
from pyspark.ml import Pipeline

In [0]:
data_preprocessing_pipeline = Pipeline(stages=[ham_spam_to_numeric
                                              ,spam_tokenizer
                                              ,spam_stop_remover
                                              ,spam_count_vectorizer
                                              ,idf
                                              ,data_cleanser])

In [0]:
job_clean = data_preprocessing_pipeline.fit(df_spam)

In [0]:
df_clean = job_clean.transform(df_spam)

In [0]:
df_clean.columns

In [0]:
df_clean = df_clean.select('label','features')

In [0]:
df_clean.show()

In [0]:
train_df,test_df = df_clean.randomSplit([0.7,0.3])

In [0]:
spam_detector = nb.fit(train_df)

In [0]:
df_spam.printSchema()

In [0]:
test_results = spam_detector.transform(test_df)

In [0]:
test_results.show()

In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [0]:
acc_eval = MulticlassClassificationEvaluator()

In [0]:
acc_nb = acc_eval.evaluate(test_results)

In [0]:
print('Accuracy of Naive Bayes Model')
print(acc_nb)

In [0]:
from pyspark.ml.classification import LogisticRegression,DecisionTreeClassifier,RandomForestClassifier,GBTClassifier

In [0]:
logreg = LogisticRegression()
dtc = DecisionTreeClassifier()
rfc = RandomForestClassifier()
gbt = GBTClassifier()

In [0]:
spam_detector_logreg = logreg.fit(train_df)

In [0]:
logreg_test_results = spam_detector_logreg.transform(test_df)

In [0]:
acc_logreg = acc_eval.evaluate(logreg_test_results)

In [0]:
print('Accuracy of Logistic Regression Model')
print(acc_logreg)

In [0]:
spam_detector_dtc = dtc.fit(train_df)

dtc_test_results = spam_detector_dtc.transform(test_df)

acc_dtc = acc_eval.evaluate(dtc_test_results)

print('Accuracy of Decision Tree Model')
print(acc_dtc)

In [0]:
spam_detector_rfc = rfc.fit(train_df)

rfc_test_results = spam_detector_rfc.transform(test_df)

acc_rfc = acc_eval.evaluate(rfc_test_results)

print('Accuracy of Random Forest Model')
print(acc_rfc)

In [0]:
spam_detector_gbt = gbt.fit(train_df)

gbt_test_results = spam_detector_gbt.transform(test_df)

acc_gbt = acc_eval.evaluate(gbt_test_results)

print('Accuracy of Gradient Boosted Trees Model')
print(acc_gbt)