In [86]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, RegexTokenizer, CountVectorizer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, MultilayerPerceptronClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

In [2]:
spark = SparkSession.builder.appName('SentimentClassification').getOrCreate()

In [8]:
df = spark.read.csv('datasets/sentiments.csv', header=True)

In [9]:
df.show(5)

+--------------------+---------+
|                text|sentiment|
+--------------------+---------+
|Kickers on my wat...|        1|
|user: AAP MOVIE. ...|        1|
|user I'd be afrai...|        1|
|   MNTA Over 12.00  |        1|
|    OI  Over 21.37  |        1|
+--------------------+---------+
only showing top 5 rows



In [10]:
df.printSchema

<bound method DataFrame.printSchema of DataFrame[text: string, sentiment: string]>

In [13]:
df.count()

5792

In [23]:
ff = df.na.drop()

In [24]:
ff.count()

5791

In [40]:
train, test = ff.randomSplit([0.8, 0.2])

In [41]:
train.count()

4655

In [42]:
test.count()

1136

In [55]:
indexer = StringIndexer(inputCol='sentiment', outputCol='label')
tokenizer = RegexTokenizer(inputCol='text', outputCol='tokens', pattern='\\W')
vectorizer = CountVectorizer(inputCol='tokens', outputCol='vector')
classifier = LogisticRegression(featuresCol='vector', labelCol='label')
mlp_classifier = MultilayerPerceptronClassifier(featuresCol='vector', labelCol='label', layers=[8683, 50, 2])

pipeline = Pipeline(stages=[indexer, tokenizer, vectorizer, classifier])
pipeline_2 = Pipeline(stages=[indexer, tokenizer, vectorizer, mlp_classifier])

In [43]:
model = pipeline.fit(train)
predict = model.transform(test)

In [47]:
predict.show(5)

+--------------------+---------+-----+--------------------+--------------------+--------------------+--------------------+----------+
|                text|sentiment|label|              tokens|              vector|       rawPrediction|         probability|prediction|
+--------------------+---------+-----+--------------------+--------------------+--------------------+--------------------+----------+
| ES,EC,JNK,DAX  I...|       -1|  1.0|[es, ec, jnk, dax...|(8683,[0,10,17,24...|[11.4207341120768...|[0.99998903437389...|       0.0|
| NEW Blog Post: T...|        1|  0.0|[new, blog, post,...|(8683,[5,17,52,15...|[-11.364508037238...|[1.15998355446098...|       1.0|
|#Gold prices slip...|       -1|  1.0|[gold, prices, sl...|(8683,[8,11,12,31...|[-58.618891627509...|[3.4844868580198E...|       1.0|
|#ed obin GB boing!  |        1|  0.0|[ed, obin, gb, bo...|(8683,[211,2709,4...|[40.3793836952215...|           [1.0,0.0]|       0.0|
|#vmware VMW where...|       -1|  1.0|[vmware, vmw, whe...|(86

In [37]:
evaluator = BinaryClassificationEvaluator()

In [44]:
evaluator.evaluate(predict)

0.8022719896137605

In [None]:
model_2 = pipeline_2.fit(train)
predict_2 = model_2.transform(test)

In [52]:
evaluator.evaluate(predict_2)

0.7981926578861974

### Exercise:

In [57]:
train = spark.read.csv('datasets/sent_train.csv', header=True)
valid = spark.read.csv('datasets/sent_valid.csv', header=True)

In [67]:
train = train.na.drop()
valid = valid.na.drop()

In [59]:
train.printSchema

<bound method DataFrame.printSchema of DataFrame[text: string, label: string]>

In [68]:
indexer = StringIndexer(inputCol='label', outputCol='class')
tokenizer = RegexTokenizer(inputCol='text', outputCol='tokens', pattern='\\W')
vectorizer = CountVectorizer(inputCol='tokens', outputCol='vector')
classifier = LogisticRegression(featuresCol='vector', labelCol='class')

pipeline = Pipeline(stages=[indexer, tokenizer, vectorizer, classifier])


In [69]:
model = pipeline.fit(train)
predict = model.transform(valid)

In [82]:
predict.show()

+--------------------+-----+-----+--------------------+--------------------+--------------------+--------------------+----------+
|                text|label|class|              tokens|              vector|       rawPrediction|         probability|prediction|
+--------------------+-----+-----+--------------------+--------------------+--------------------+--------------------+----------+
|$ALLY - Ally Fina...|    0|  2.0|[ally, ally, fina...|(20413,[0,1,2,97,...|[26.2749520552343...|[0.99998394736861...|       0.0|
|$DELL $HPE - Dell...|    0|  2.0|[dell, hpe, dell,...|(20413,[0,1,2,9,6...|[-42.184339346812...|[2.03479513399581...|       2.0|
|$PRTY - Moody's t...|    0|  2.0|[prty, moody, s, ...|(20413,[0,1,2,5,9...|[-14.948512742105...|[2.51480040296357...|       2.0|
|$SAN: Deutsche Ba...|    0|  2.0|[san, deutsche, b...|(20413,[3,85,182,...|[-1.3100604844918...|[8.78121485885613...|       2.0|
|$SITC: Compass Po...|    0|  2.0|[sitc, compass, p...|(20413,[3,182,243...|[-8.6380744848

In [87]:
evaluator = MulticlassClassificationEvaluator(labelCol='class')

In [89]:
evaluator.evaluate(predict)

Py4JJavaError: An error occurred while calling o2396.evaluate.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 378.0 failed 1 times, most recent failure: Lost task 0.0 in stage 378.0 (TID 338) (Quangg executor driver): org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (`StringIndexerModel$$Lambda$3714/0x0000000101615040`: (string) => double).
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:198)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:197)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:621)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:624)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.SparkException: Unseen label:  when it comes. To handle unseen labels, set Param handleInvalid to keep.
	at org.apache.spark.ml.feature.StringIndexerModel.$anonfun$getIndexer$1(StringIndexer.scala:406)
	at org.apache.spark.ml.feature.StringIndexerModel.$anonfun$getIndexer$1$adapted(StringIndexer.scala:391)
	... 22 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2898)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2834)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2833)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2833)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1253)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1253)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1253)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3102)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3036)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3025)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:995)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$collectAsMap$1(PairRDDFunctions.scala:738)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:737)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.confusions$lzycompute(MulticlassMetrics.scala:61)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.confusions(MulticlassMetrics.scala:52)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.labelCountByClass$lzycompute(MulticlassMetrics.scala:66)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.labelCountByClass(MulticlassMetrics.scala:64)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.weightedFMeasure(MulticlassMetrics.scala:227)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.weightedFMeasure$lzycompute(MulticlassMetrics.scala:235)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.weightedFMeasure(MulticlassMetrics.scala:235)
	at org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator.evaluate(MulticlassClassificationEvaluator.scala:152)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (`StringIndexerModel$$Lambda$3714/0x0000000101615040`: (string) => double).
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:198)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:197)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:621)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:624)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
Caused by: org.apache.spark.SparkException: Unseen label:  when it comes. To handle unseen labels, set Param handleInvalid to keep.
	at org.apache.spark.ml.feature.StringIndexerModel.$anonfun$getIndexer$1(StringIndexer.scala:406)
	at org.apache.spark.ml.feature.StringIndexerModel.$anonfun$getIndexer$1$adapted(StringIndexer.scala:391)
	... 22 more
