In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("CW") \
    .getOrCreate()

sc = spark.sparkContext
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [2]:
from preprocessing.Preprocessor import Preprocessor
from metrics.metrics import Metrics
pp = Preprocessor(spark)

In [3]:
spark

In [4]:
# Create ML Model

In [5]:
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

# BoW

In [6]:
train = pp.to_bow('Cleaned Data/train.csv', min_frequency=275.)  # eg.: load in bag-of-words encoding
test = pp.to_bow('Cleaned Data/val.csv')

#train = pp.to_bow('/Users/jackrh2019/train.csv', min_frequency=500.) 
#test = pp.to_bow('/Users/jackrh2019/test.csv')

In [7]:
train.show(5)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(1948,[0,1,2,3,4,...|    2|
|(1948,[0,1,6,7,16...|    0|
|(1948,[0,1,2,3,4,...|    0|
|(1948,[5,27,34,74...|    2|
|(1948,[0,1,2,4,5,...|    2|
+--------------------+-----+
only showing top 5 rows



In [8]:
train.cache()
test.cache()

DataFrame[features: vector, label: int]

In [9]:
# Chain indexers and tree in a Pipeline

In [10]:
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(train)
featureIndexer=VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(train)
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])

In [11]:
model = pipeline.fit(train)

In [12]:
# Make predictions.
predictions = model.transform(test)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)


+----------+------------+--------------------+
|prediction|indexedLabel|            features|
+----------+------------+--------------------+
|       0.0|         1.0|(1948,[0,1,2,4,6,...|
|       0.0|         0.0|(1948,[0,1,2,3,4,...|
|       0.0|         1.0|(1948,[2,3,4,5,7,...|
|       0.0|         0.0|(1948,[0,1,2,3,4,...|
|       0.0|         0.0|(1948,[1,2,3,4,5,...|
+----------+------------+--------------------+
only showing top 5 rows



In [13]:
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))

Py4JJavaError: An error occurred while calling o410.evaluate.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 25.0 failed 1 times, most recent failure: Lost task 0.0 in stage 25.0 (TID 78) (sachins-mbp.home executor driver): org.apache.spark.SparkException: Failed to execute user defined function(VectorIndexerModel$$Lambda$3654/0x0000000801dd3040: (struct<type:tinyint,size:int,indices:array<int>,values:array<double>>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_subExpr_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
	at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.spark.SparkException: VectorIndexer encountered invalid value 4.0 on feature index 1280. To handle or skip invalid value, try setting VectorIndexer.handleInvalid.
	at org.apache.spark.ml.feature.VectorIndexerModel.$anonfun$transformFunc$1(VectorIndexer.scala:398)
	at org.apache.spark.ml.feature.VectorIndexerModel.$anonfun$transform$1(VectorIndexer.scala:427)
	... 20 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2253)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2202)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2201)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2201)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1078)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1078)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1078)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2440)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2382)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2371)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2202)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2223)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2242)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2267)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$collectAsMap$1(PairRDDFunctions.scala:737)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:736)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.confusions$lzycompute(MulticlassMetrics.scala:61)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.confusions(MulticlassMetrics.scala:52)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.tpByClass$lzycompute(MulticlassMetrics.scala:78)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.tpByClass(MulticlassMetrics.scala:76)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.accuracy$lzycompute(MulticlassMetrics.scala:188)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.accuracy(MulticlassMetrics.scala:188)
	at org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator.evaluate(MulticlassClassificationEvaluator.scala:154)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:564)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function(VectorIndexerModel$$Lambda$3654/0x0000000801dd3040: (struct<type:tinyint,size:int,indices:array<int>,values:array<double>>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_subExpr_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
	... 1 more
Caused by: org.apache.spark.SparkException: VectorIndexer encountered invalid value 4.0 on feature index 1280. To handle or skip invalid value, try setting VectorIndexer.handleInvalid.
	at org.apache.spark.ml.feature.VectorIndexerModel.$anonfun$transformFunc$1(VectorIndexer.scala:398)
	at org.apache.spark.ml.feature.VectorIndexerModel.$anonfun$transform$1(VectorIndexer.scala:427)
	... 20 more


In [None]:
val_m = Metrics()
val_pred_and_labels = model.transform(test).select('prediction', 'label')
val_m.f1(val_pred_and_labels)

# IDF

In [None]:
train_idf = pp.to_tfidf('Cleaned Data/train.csv', min_frequency=275.)
val_idf = pp.to_tfidf('Cleaned Data/val.csv')


In [None]:
train_idf.show(5)

In [None]:
train_idf.cache()
val_idf.cache()

In [None]:
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(train_idf)
featureIndexer=VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(train_idf)
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])

In [None]:
model = pipeline.fit(train_idf)

In [None]:
val_m = Metrics()
val_pred_and_labels = model.transform(val_idf).select('prediction', 'label')
val_m.f1(val_pred_and_labels)

# word2vec

In [None]:
train_2vec = pp.to_word2vec('Cleaned Data/train.csv', vector_size=256)
val_2vec = pp.to_word2vec(('Cleaned Data/train.csv'))


In [None]:
train_2vec.show(5)

In [None]:
train_2vec.cache()
val_2vec.cache()

In [None]:
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(train_2vec)
featureIndexer=VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(train_2vec)
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])

In [None]:
model = pipeline.fit(train_2vec)

In [None]:
val_m = Metrics()
val_pred_and_labels = model.transform(val_2vec).select('prediction', 'label')
val_m.f1(val_pred_and_labels)