In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.master("local").appName("Spark CSV Reader").getOrCreate()

In [3]:
df = spark.read.format("csv").option("header", "true").option("mode", "DROPMALFORMED").load('../compounded.csv')
df.show(5)

+--------------------+------------------+
|             Remarks|              rate|
+--------------------+------------------+
|Plan 4 (1,230 sq....|positiveOrNegative|
|This is your chan...|      veryPositive|
|Luxurious living ...|      veryPositive|
|A new Lennar Moon...|      veryPositive|
|BRAND NEW CONSTRU...|      veryPositive|
+--------------------+------------------+
only showing top 5 rows



In [4]:
from pyspark.sql.functions import length
data = df.withColumn('length', length(df['Remarks']))
data.show(5)

+--------------------+------------------+------+
|             Remarks|              rate|length|
+--------------------+------------------+------+
|Plan 4 (1,230 sq....|positiveOrNegative|   496|
|This is your chan...|      veryPositive|  1042|
|Luxurious living ...|      veryPositive|  1320|
|A new Lennar Moon...|      veryPositive|   715|
|BRAND NEW CONSTRU...|      veryPositive|   635|
+--------------------+------------------+------+
only showing top 5 rows



In [5]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer

# Create all the features to the data set
pos_neg_to_num = StringIndexer(inputCol='rate',outputCol='label')
tokenizer = Tokenizer(inputCol="Remarks", outputCol="token_remarks")
stopremove = StopWordsRemover(inputCol='token_remarks',outputCol='stop_tokens')
hashingTF = HashingTF(inputCol="token_remarks", outputCol='hash_token')
idf = IDF(inputCol='hash_token', outputCol='idf_token')

In [6]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

# Create feature vectors
clean_up = VectorAssembler(inputCols=['idf_token', 'length'], outputCol='features')

In [7]:
# Create a and run a data processing Pipeline
from pyspark.ml import Pipeline
data_prep_pipeline = Pipeline(stages=[pos_neg_to_num, tokenizer, stopremove, hashingTF, idf, clean_up])

In [8]:
# Fit and transform the pipeline
cleaner = data_prep_pipeline.fit(data)
cleaned = cleaner.transform(data)

In [9]:
cleaned.columns

['Remarks',
 'rate',
 'length',
 'label',
 'token_remarks',
 'stop_tokens',
 'hash_token',
 'idf_token',
 'features']

In [10]:
# Show label of ham spame and resulting features
cleaned.select(['label', 'features']).show(5)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  1.0|(262145,[5903,687...|
|  0.0|(262145,[3067,395...|
|  0.0|(262145,[2548,319...|
|  0.0|(262145,[1846,188...|
|  0.0|(262145,[1998,618...|
+-----+--------------------+
only showing top 5 rows



In [11]:
from pyspark.ml.classification import NaiveBayes
# Break data down into a training set and a testing set
training, testing = cleaned.randomSplit([0.7, 0.3])

# Create a Naive Bayes model and fit training data
nb = NaiveBayes()
predictor = nb.fit(training)

In [12]:
test_results = predictor.transform(testing)
test_results.show(5)

+--------------------+------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|             Remarks|              rate|length|label|       token_remarks|         stop_tokens|          hash_token|           idf_token|            features|       rawPrediction|         probability|prediction|
+--------------------+------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|!! PRICE REDUCED ...|positiveOrNegative|   359|  1.0|[!!, price, reduc...|[!!, price, reduc...|(262144,[9639,141...|(262144,[9639,141...|(262145,[9639,141...|[-1406.6031222466...|[0.99994627131965...|       0.0|
|!!! HUGE PRICE RE...|      veryPositive|   577|  0.0|[!!!, huge, price...|[!!!, huge, price...|(262144,[6872,963...|(262144,[6872,963...|(262145,[6

In [13]:
# Use the Class Evaluator for a cleaner description
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
acc

0.7311584410151937

In [14]:
spark.stop()