In [None]:
from pyspark.ml.feature import RegexTokenizer,Tokenizer,CountVectorizer
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer,StringIndexer, RegexTokenizer,StopWordsRemover, HashingTF, IDF
from pyspark.sql.functions import col, udf,regexp_replace,isnull
from pyspark.sql.types import StringType,IntegerType
from pyspark.ml.classification import NaiveBayes, RandomForestClassifier, LogisticRegression, DecisionTreeClassifier, GBTClassifier,OneVsRest
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from sklearn.metrics import classification_report, confusion_matrix

##TEXT PROCESSING


The data is read in as csv and is converted to Spark dataframe. Before writing to csv, is.ascii() is used to ensure that reviews written in alphabets other than English are removed. 

For text preprocesing:
Some rows had the value "null" or "." These were dropped. Rows with NA were also dropped.
Punctuation marks such as ",","!", etc. and digits were removed using regex pattern. 
Stop words were removed.
An 80-20 split with random state 20 is considered.
The data is then tokenized using RegexTokenizer znd stop words are removed using StopWordsRemover. 
In addition, count vectorization, word2vec and tf-idf are considered in conjunction with following models:
logistic regression, naive bayes classifier, decision tree classifier, random forest classifier, linear support vector classifier and 
A pipeline is constructed with the following stages for classification models with count vectorization or word2vec:
tokenization->stop words removal->count vectorization/word2vec->model
In the case of tf-idf, the pipeline is:
tokenization->stop words removal->hashingTF->idf->model

In addition, the label column is of type string and it is cast into integer type for natural language processing.

In [7]:
import os
os.getcwd()
os.chdir("C:/................../spark_instructions")
os.getcwd()

'C:\\Users\\RimJhim\\Desktop\\a3\\spark_instructions'

In [66]:
spark = SparkSession.builder.appName("new").getOrCreate()
df = (spark.read
          .format("csv")
          .option('header', 'true')
          .load("d.csv"))

In [67]:
df.show(5)

+---+---------+-------+--------------------+-----+
|_c0|review_id| app_id|         review_text|label|
+---+---------+-------+--------------------+-----+
|  0|138198607|1277920|although this gam...|    1|
|  0|138198396|1277920|It still is not p...|    0|
|  0|138235603|1277920|If they keep at i...|    1|
|  0|138236287|1669000|Age of Wonders 3 ...| null|
| ,1|     null|   null|                null| null|
+---+---------+-------+--------------------+-----+
only showing top 5 rows



In [82]:
##dropping missing values-rows with NA or null or . , removing digits and punctuation

df2 = df.filter((df.label != 'null') | (df.review_text != 'null')|(df.review_text !=".")) ##doesn't remove null labels
df2=df2.filter((df2.label==1)|(df2.label==0))
##drop any na's
df2=df2.dropna()  ##302 reviews from 444

df2 = df2.withColumn("text",regexp_replace(col('review_text'), '\d+', ''))
##removing any punctuations from reviews
df2 = df2.withColumn("text", regexp_replace(col('review_text'), "[\"$#,<>+@=?!'/]",''))





In [83]:
df2.show(5)

+---+---------+-------+--------------------+-----+--------------------+
|_c0|review_id| app_id|         review_text|label|                text|
+---+---------+-------+--------------------+-----+--------------------+
|  0|138198607|1277920|although this gam...|    1|although this gam...|
|  0|138198396|1277920|It still is not p...|    0|It still is not p...|
|  0|138235603|1277920|If they keep at i...|    1|If they keep at i...|
|  0|138236059|1669000|Just... one more ...|    1|Just... one more ...|
|  0|138235741|1669000|First time agreei...|    1|First time agreei...|
+---+---------+-------+--------------------+-----+--------------------+
only showing top 5 rows



In [84]:
df2.show(10)

+---+---------+-------+--------------------+-----+--------------------+
|_c0|review_id| app_id|         review_text|label|                text|
+---+---------+-------+--------------------+-----+--------------------+
|  0|138198607|1277920|although this gam...|    1|although this gam...|
|  0|138198396|1277920|It still is not p...|    0|It still is not p...|
|  0|138235603|1277920|If they keep at i...|    1|If they keep at i...|
|  0|138236059|1669000|Just... one more ...|    1|Just... one more ...|
|  0|138235741|1669000|First time agreei...|    1|First time agreei...|
|  0|138236247|2272420|second easiest 10...|    1|second easiest 10...|
|  0|138233648|2272420|cant wait for the...|    1|cant wait for the...|
|  0|138235383|1566200|Great game and i ...|    1|Great game and i ...|
|  0|138233754|1566200|Fun game! It's Ro...|    1|Fun game Its Rogu...|
|  0|138235556|1494420|This game might b...|    1|This game might b...|
+---+---------+-------+--------------------+-----+--------------

In [85]:
##Train and test sets before further word processing-80-20 split 
df2=df2.select("text","label")
train,test = df2.randomSplit([0.8, 0.2], seed = 20)

In [80]:
##Tokenizing
re= RegexTokenizer(inputCol="text", outputCol="text2", pattern="\\W")
words = re.transform(train)


+--------------------+-----+--------------------+
|                text|label|               text2|
+--------------------+-----+--------------------+
| Chronicles of th...|    1|[chronicles, of, ...|
| and is a good ex...|    1|[and, is, a, good...|
| not a technical ...|    0|[not, a, technica...|
|                   .|    1|                  []|
|15 hours of my li...|    1|[15, hours, of, m...|
+--------------------+-----+--------------------+
only showing top 5 rows



In [45]:
##Stop words removal
remove= StopWordsRemover(inputCol="text2", outputCol="text3")
words2= remove.transform(words)
words2.select("review_text","label","text","text2","text3")
words2.show(5)

+---+---------+-------+--------------------+-----+--------------------+--------------------+--------------------+
|_c0|review_id| app_id|         review_text|label|                text|               text2|               text3|
+---+---------+-------+--------------------+-----+--------------------+--------------------+--------------------+
|  0|137794199|1366850|The servers don't...|    0|The servers dont ...|[the, servers, do...|[servers, dont, h...|
|  0|138198396|1277920|It still is not p...|    0|It still is not p...|[it, still, is, n...|[still, playable,...|
|  0|138233648|2272420|cant wait for the...|    1|cant wait for the...|[cant, wait, for,...|[cant, wait, full...|
|  0|138233754|1566200|Fun game! It's Ro...|    1|Fun game Its Rogu...|[fun, game, its, ...|[fun, game, rogue...|
|  0|138234373|2381160|Love this game. I...|    1|Love this game. I...|[love, this, game...|[love, game, shor...|
+---+---------+-------+--------------------+-----+--------------------+-----------------

In [47]:
##CountVectorizer ##Also do Tfidf ##Word2vec
count= CountVectorizer(inputCol="text3", outputCol="features")
count2 = count.fit(words2)
count3 = count2.transform(words2)
count3.show(5)

+---+---------+-------+--------------------+-----+--------------------+--------------------+--------------------+--------------------+
|_c0|review_id| app_id|         review_text|label|                text|               text2|               text3|            features|
+---+---------+-------+--------------------+-----+--------------------+--------------------+--------------------+--------------------+
|  0|137794199|1366850|The servers don't...|    0|The servers dont ...|[the, servers, do...|[servers, dont, h...|(1356,[12,194,253...|
|  0|138198396|1277920|It still is not p...|    0|It still is not p...|[it, still, is, n...|[still, playable,...|(1356,[0,3,7,12,2...|
|  0|138233648|2272420|cant wait for the...|    1|cant wait for the...|[cant, wait, for,...|[cant, wait, full...|(1356,[0,48,69,95...|
|  0|138233754|1566200|Fun game! It's Ro...|    1|Fun game Its Rogu...|[fun, game, its, ...|[fun, game, rogue...|(1356,[0,2,4,17,4...|
|  0|138234373|2381160|Love this game. I...|    1|Love 

In [87]:
###PIPELINE-countvectorizer


##set up pipeline
from pyspark.ml import Pipeline
token=RegexTokenizer(inputCol="text", outputCol="text2", pattern="\\W")
remove= StopWordsRemover(inputCol=token.getOutputCol(), outputCol="text3")
count= CountVectorizer(inputCol="text3", outputCol="features")


##putting above:tokenizing,stop words removal, count vectorizing and model in a pipeline
pipeline = Pipeline(stages=[token,remove,count,model])





Exception ignored in: <function JavaWrapper.__del__ at 0x000001DD77D82D30>
Traceback (most recent call last):
  File "C:\Users\RimJhim\anaconda3\lib\site-packages\pyspark\ml\wrapper.py", line 53, in __del__
AttributeError: 'LogisticRegression' object has no attribute '_java_obj'


In [75]:
train.select("text","label")
train.show(5)

+--------------------+-----+
|                text|label|
+--------------------+-----+
| Chronicles of th...|    1|
| and is a good ex...|    1|
| not a technical ...|    0|
|"Only ran into on...|    0|
|"Really wasted mo...|    0|
+--------------------+-----+
only showing top 5 rows



In [91]:
train.schema["label"].dataType ##label is stringtype and so need to change it to integer

StringType()

In [92]:
##So need to change label to integer
from pyspark.sql.types import IntegerType
train = train.withColumn("label", train["label"].cast(IntegerType()))
test = test.withColumn("label",test["label"].cast(IntegerType()))


In [93]:
train.schema["label"].dataType

IntegerType()

In [94]:
##logistic regression
model_lr=pipeline.fit(train)

In [95]:
##logistic regression prediction
prediction= model_lr.transform(test)

In [None]:
Several evaluation criteria are used to assess model performance. These are: area under curve(AUC),accuracy, test error, 
precision,recall and confusion matrix. 
When count vectorizer is used, logistic regression and one-vs best have the best performance in terms of test error 
Naive Bayes classifier has the same AUC as logistic regression but much lower accuracy(higher test error).

##COUNT VECTORIZATION

In [97]:
##Evaluation of Logistic Regression
eval_lr = BinaryClassificationEvaluator()
auc_lr=eval_lr.evaluate(prediction)
print('Area Under ROC',auc_lr)

Area Under ROC 0.868274582560297


In [128]:
##Logistic Model Accuracy
pred=prediction.select("label","prediction")
evallr = MulticlassClassificationEvaluator()
evallr.setPredictionCol("prediction")
accuracy_lr=evallr.evaluate(pred)
print("Accuracy of Logistic Regression = %g"% (accuracy_lr))
print("Test Error of Logistic Regression = %g"% (1-accuracy_lr))

Accuracy of Logistic Regression = 0.847186
Test Error of Logistic Regression = 0.152814


In [104]:
###Logistic Model Confusion matrix, precision,recall
lab_true = pred.select(['label']).collect()
lab_pred = pred.select(['prediction']).collect()
print(classification_report(lab_true, lab_pred))
print(confusion_matrix(lab_true, lab_pred))

              precision    recall  f1-score   support

           0       0.60      0.55      0.57        11
           1       0.90      0.92      0.91        49

    accuracy                           0.85        60
   macro avg       0.75      0.73      0.74        60
weighted avg       0.85      0.85      0.85        60

[[ 6  5]
 [ 4 45]]


In [99]:
##fit naive bayes classifier
model2 = NaiveBayes(modelType="multinomial",smoothing=1)
pipeline2 = Pipeline(stages=[token,remove,count,model2])
nb = pipeline2.fit(train)
prediction_nb= nb.transform(test)
eval_nb=BinaryClassificationEvaluator()
auc_nb=eval_nb.evaluate(prediction)
print('Area Under ROC', auc_nb)




Area Under ROC 0.868274582560297


In [101]:
prediction.show(5)

+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|                text|label|               text2|               text3|            features|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
| but theres enoug...|    1|[but, theres, eno...|[theres, enough, ...|(1302,[24,242,247...|[9.37783933451508...|[0.99991542940331...|       0.0|
|A little jank but...|    1|[a, little, jank,...|[little, jank, lo...|(1302,[0,1,7,10,2...|[38.5117839679455...|           [1.0,0.0]|       0.0|
|Addictive fun and...|    1|[addictive, fun, ...|[addictive, fun, ...|(1302,[1,573],[1....|[-18.984533136250...|[5.69012772411034...|       1.0|
|All a board on th...|    1|[all, a, board, o...|[board, choo, cho...|(1302,[438,974],[...|[-20.124545837917...|[1.81978797528947.

In [127]:
##Accuracy,f1-score,recall and confusion matrix with Naive Bayes classifier

##Accuracy
pred_nb=prediction_nb.select("label","prediction")
evallnb = MulticlassClassificationEvaluator()
evallnb.setPredictionCol("prediction")
accuracy_nb=evallnb.evaluate(pred_nb)
print("Accuracy of Naive Bayes Classifier = %g"% (accuracy_nb))
print("Test Error of Naive Bayes Classifier = %g"% (1-accuracy_nb))

##f1 score
lab_true_nb = pred_nb.select(['label']).collect()
lab_pred_nb = pred_nb.select(['prediction']).collect()
print(classification_report(lab_true_nb, lab_pred_nb))
print(confusion_matrix(lab_true_nb, lab_pred_nb))


Accuracy of Naive Bayes Classifier = 0.760915
Test Error of Naive Bayes Classifier = 0.239085
              precision    recall  f1-score   support

           0       0.50      0.09      0.15        11
           1       0.83      0.98      0.90        49

    accuracy                           0.82        60
   macro avg       0.66      0.54      0.53        60
weighted avg       0.77      0.82      0.76        60

[[ 1 10]
 [ 1 48]]


In [122]:
##fit decision trees classifier
model3 = DecisionTreeClassifier()
pipeline3 = Pipeline(stages=[token,remove,count,model3])
dt = pipeline3.fit(train)
prediction_dt= dt.transform(test)
eval_dt=BinaryClassificationEvaluator()
auc_dt=eval_dt.evaluate(prediction_dt)
print('Area Under ROC', auc_dt)


Area Under ROC 0.6289424860853432


In [123]:
##Accuracy,f1-score,recall and confusion matrix with random forest classifier

##Accuracy
pred_dt=prediction_dt.select("label","prediction")
evalldt = MulticlassClassificationEvaluator()
evalldt.setPredictionCol("prediction")
accuracy_dt=evalldt.evaluate(pred_dt)
print("Accuracy of Decision Tree Classifier = %g"% (accuracy_dt))
print("Test Error of Decision Tree Classifier = %g"% (1-accuracy_dt))

##f1 score,recall,precision
lab_true_dt = pred_dt.select(['label']).collect()
lab_pred_dt = pred_dt.select(['prediction']).collect()
print(classification_report(lab_true_dt, lab_pred_dt))
print(confusion_matrix(lab_true_dt, lab_pred_dt))


Accuracy of Decision Tree Classifier = 0.833603
Test Error of Decision Tree Classifier = 0.166397
              precision    recall  f1-score   support

           0       1.00      0.27      0.43        11
           1       0.86      1.00      0.92        49

    accuracy                           0.87        60
   macro avg       0.93      0.64      0.68        60
weighted avg       0.89      0.87      0.83        60

[[ 3  8]
 [ 0 49]]


In [112]:
##fit random firest classifier
model4 = RandomForestClassifier()
pipeline4 = Pipeline(stages=[token,remove,count,model4])
rf = pipeline4.fit(train)
prediction_rf= rf.transform(test)
eval_rf=BinaryClassificationEvaluator()
auc_rf=eval_rf.evaluate(prediction_rf)
print('Area Under ROC', auc_rf)

Area Under ROC 0.7411873840445269


In [124]:
##Accuracy,f1-score,recall and confusion matrix with random forest classifier

##Accuracy
pred_rf=prediction_rf.select("label","prediction")
evallrf = MulticlassClassificationEvaluator()
evallrf.setPredictionCol("prediction")
accuracy_rf=evallrf.evaluate(pred_rf)
print("Accuracy of Random Forest Classifier = %g"% (accuracy_rf))
print("Test Error of Random Forest Classifier = %g"% (1-accuracy_rf))

##f1 score
lab_true_rf = pred_rf.select(['label']).collect()
lab_pred_rf = pred_rf.select(['prediction']).collect()
print(classification_report(lab_true_rf, lab_pred_rf))
print(confusion_matrix(lab_true_rf, lab_pred_rf))




Accuracy of Random Forest Classifier = 0.734251
Test Error of Random Forest Classifier = 0.265749
              precision    recall  f1-score   support

           0       0.00      0.00      0.00        11
           1       0.82      1.00      0.90        49

    accuracy                           0.82        60
   macro avg       0.41      0.50      0.45        60
weighted avg       0.67      0.82      0.73        60

[[ 0 11]
 [ 0 49]]


  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


In [114]:
##fit gradient boosted classifier
model5 = GBTClassifier(maxIter=10)
pipeline5= Pipeline(stages=[token,remove,count,model5])
gbc = pipeline5.fit(train)
prediction_gbc= gbc.transform(test)
eval_gbc=BinaryClassificationEvaluator()
auc_gbc=eval_gbc.evaluate(prediction_gbc)
print('Area Under ROC', auc_gbc)


Area Under ROC 0.6539888682745826


In [125]:
##Accuracy,f1-score,recall and confusion matrix with gradient boosted classifier

##Accuracy
pred_gbc=prediction_gbc.select("label","prediction")
evallgbc = MulticlassClassificationEvaluator()
evallgbc.setPredictionCol("prediction")
accuracy_gbc=evallgbc.evaluate(pred_gbc)
print("Accuracy of Gradient Boosted Classifier = %g"% (accuracy_gbc))
print("Test Error of Gradient Boosted Classifier = %g"% (1-accuracy_gbc))

##f1 score
lab_true_gbc = pred_gbc.select(['label']).collect()
lab_pred_gbc = pred_gbc.select(['prediction']).collect()
print(classification_report(lab_true_gbc, lab_pred_gbc))
print(confusion_matrix(lab_true_gbc, lab_pred_gbc))

Accuracy of Gradient Boosted Classifier = 0.833603
Test Error of Gradient Boosted Classifier = 0.166397
              precision    recall  f1-score   support

           0       1.00      0.27      0.43        11
           1       0.86      1.00      0.92        49

    accuracy                           0.87        60
   macro avg       0.93      0.64      0.68        60
weighted avg       0.89      0.87      0.83        60

[[ 3  8]
 [ 0 49]]


In [116]:
from pyspark.ml.classification import LinearSVC

model6=LinearSVC(maxIter=10, regParam=0.1)
pipeline6= Pipeline(stages=[token,remove,count,model5])
lsvc = pipeline6.fit(train)
prediction_lsvc= lsvc.transform(test)
eval_lsvc=BinaryClassificationEvaluator()
auc_lsvc=eval_lsvc.evaluate(prediction_lsvc)
print('Area Under ROC', auc_lsvc)
 


Area Under ROC 0.6539888682745826


AttributeError: 'PipelineModel' object has no attribute 'coefficients'

In [126]:
##Accuracy,f1-score,recall and confusion matrix with linear SVC classifier

##Accuracy
pred_svc=prediction_lsvc.select("label","prediction")
evallsvc = MulticlassClassificationEvaluator()
evallsvc.setPredictionCol("prediction")
accuracy_svc=evallsvc.evaluate(pred_svc)
print("Accuracy of linear SVC = %g"% (accuracy_svc))
print("Test Error of linear SVC = %g"% (1-accuracy_svc))

##f1 score,precision,recall
lab_true_svc = pred_svc.select(['label']).collect()
lab_pred_svc = pred_svc.select(['prediction']).collect()
print(classification_report(lab_true_svc, lab_pred_svc))
print(confusion_matrix(lab_true_svc, lab_pred_svc))

Accuracy of linear SVC = 0.833603
Test Error of linear SVC = 0.166397
              precision    recall  f1-score   support

           0       1.00      0.27      0.43        11
           1       0.86      1.00      0.92        49

    accuracy                           0.87        60
   macro avg       0.93      0.64      0.68        60
weighted avg       0.89      0.87      0.83        60

[[ 3  8]
 [ 0 49]]


In [118]:
##One-vs-rest classifier

#base classifier-logistic regression for One-vs-rest classifier
lr = LogisticRegression(maxIter=10, tol=1E-6, fitIntercept=True)
model7 = OneVsRest(classifier=lr)
pipeline7= Pipeline(stages=[token,remove,count,model5])
ovr = pipeline7.fit(train)
prediction_ovr= ovr.transform(test)
eval_ovr=BinaryClassificationEvaluator()
auc_ovr=eval_ovr.evaluate(prediction_ovr)
print('Area Under ROC', auc_ovr)
 

Area Under ROC 0.6539888682745826


In [225]:
##Accuracy
#pred_ovr=prediction_ovr.select("label","prediction")
evallovr = MulticlassClassificationEvaluator(metricName="accuracy")
accuracy_ovr=evallovr.evaluate(prediction_ovr)
print("Accuracy of One-Vs-Rest Classifier = %g"% (accuracy_ovr))
print("Test Error = %g" % (1.0 - accuracy_ovr))


##f1 score and confusion matrix
lab_true_ovr = prediction_ovr.select(['label']).collect()
lab_pred_ovr = prediction_ovr.select(['prediction']).collect()
print(classification_report(lab_true_ovr, lab_pred_ovr))
print(confusion_matrix(lab_true_ovr, lab_pred_ovr))

Py4JJavaError: An error occurred while calling o13371.evaluate.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1762.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1762.0 (TID 1561) (10.46.235.68 executor driver): java.io.FileNotFoundException: C:\Users\RimJhim\AppData\Local\Temp\blockmgr-52dbc4a1-6b46-461c-86a1-a66c3e16bb1e\29\shuffle_411_1561_0.data.eae1c190-5009-470a-8947-820245b7d7f4 (The system cannot find the path specified)
	at java.base/java.io.FileOutputStream.open0(Native Method)
	at java.base/java.io.FileOutputStream.open(FileOutputStream.java:298)
	at java.base/java.io.FileOutputStream.<init>(FileOutputStream.java:237)
	at org.apache.spark.shuffle.sort.io.LocalDiskShuffleMapOutputWriter.initStream(LocalDiskShuffleMapOutputWriter.java:144)
	at org.apache.spark.shuffle.sort.io.LocalDiskShuffleMapOutputWriter.access$200(LocalDiskShuffleMapOutputWriter.java:45)
	at org.apache.spark.shuffle.sort.io.LocalDiskShuffleMapOutputWriter$LocalDiskShufflePartitionWriter.openStream(LocalDiskShuffleMapOutputWriter.java:177)
	at org.apache.spark.shuffle.ShufflePartitionPairsWriter.open(ShufflePartitionPairsWriter.scala:68)
	at org.apache.spark.shuffle.ShufflePartitionPairsWriter.write(ShufflePartitionPairsWriter.scala:59)
	at org.apache.spark.util.collection.WritablePartitionedIterator.writeNext(WritablePartitionedPairCollection.scala:83)
	at org.apache.spark.util.collection.ExternalSorter.$anonfun$writePartitionedMapOutput$1(ExternalSorter.scala:720)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.util.collection.ExternalSorter.writePartitionedMapOutput(ExternalSorter.scala:723)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:70)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2284)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2328)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1019)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1018)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$collectAsMap$1(PairRDDFunctions.scala:738)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
	at org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:737)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.confusions$lzycompute(MulticlassMetrics.scala:61)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.confusions(MulticlassMetrics.scala:52)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.tpByClass$lzycompute(MulticlassMetrics.scala:78)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.tpByClass(MulticlassMetrics.scala:76)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.accuracy$lzycompute(MulticlassMetrics.scala:188)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.accuracy(MulticlassMetrics.scala:188)
	at org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator.evaluate(MulticlassClassificationEvaluator.scala:153)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.FileNotFoundException: C:\Users\RimJhim\AppData\Local\Temp\blockmgr-52dbc4a1-6b46-461c-86a1-a66c3e16bb1e\29\shuffle_411_1561_0.data.eae1c190-5009-470a-8947-820245b7d7f4 (The system cannot find the path specified)
	at java.base/java.io.FileOutputStream.open0(Native Method)
	at java.base/java.io.FileOutputStream.open(FileOutputStream.java:298)
	at java.base/java.io.FileOutputStream.<init>(FileOutputStream.java:237)
	at org.apache.spark.shuffle.sort.io.LocalDiskShuffleMapOutputWriter.initStream(LocalDiskShuffleMapOutputWriter.java:144)
	at org.apache.spark.shuffle.sort.io.LocalDiskShuffleMapOutputWriter.access$200(LocalDiskShuffleMapOutputWriter.java:45)
	at org.apache.spark.shuffle.sort.io.LocalDiskShuffleMapOutputWriter$LocalDiskShufflePartitionWriter.openStream(LocalDiskShuffleMapOutputWriter.java:177)
	at org.apache.spark.shuffle.ShufflePartitionPairsWriter.open(ShufflePartitionPairsWriter.scala:68)
	at org.apache.spark.shuffle.ShufflePartitionPairsWriter.write(ShufflePartitionPairsWriter.scala:59)
	at org.apache.spark.util.collection.WritablePartitionedIterator.writeNext(WritablePartitionedPairCollection.scala:83)
	at org.apache.spark.util.collection.ExternalSorter.$anonfun$writePartitionedMapOutput$1(ExternalSorter.scala:720)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.util.collection.ExternalSorter.writePartitionedMapOutput(ExternalSorter.scala:723)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:70)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more


In [135]:
###Now use word2vec and repeat the process above 

from pyspark.ml.feature import Word2Vec
token=RegexTokenizer(inputCol="text", outputCol="text2", pattern="\\W")
remove= StopWordsRemover(inputCol=token.getOutputCol(), outputCol="text3")
w2v=Word2Vec(inputCol="text3", outputCol="features",vectorSize= 100, minCount= 5)
model=LogisticRegression()
#model2=NaiveBayes(modelType="multinomial",smoothing=1)
#model3=DecisionTreeClassifier
#model4=RandomForestClassifier
#model5=GBTClassifier(maxIter=10)
#model6=
#model7=
pipeline1 = Pipeline(stages= [token,remove,w2v,model])
pipeline2 = Pipeline(stages= [token,remove,w2v,model2])
pipeline3 = Pipeline(stages= [token,remove,w2v,model3])
pipeline4 = Pipeline(stages= [token,remove,w2v,model4])
pipeline5 = Pipeline(stages= [token,remove,w2v,model5])
pipeline6 = Pipeline(stages= [token,remove,w2v,model6])
pipeline7 = Pipeline(stages= [token,remove,w2v,model7])



In [139]:
##fitting the models and making predictions


lr_wv = pipeline1.fit(train)
prediction_lr_wv= lr_wv.transform(test)

#nb_wv = pipeline2.fit(train)
#prediction_nb_wv= nb_wv.transform(test)

dt_wv = pipeline3.fit(train)
prediction_dt_wv= dt_wv.transform(test)

rf_wv = pipeline4.fit(train)
prediction_rf_wv= rf_wv.transform(test)

gbc_wv = pipeline5.fit(train)
prediction_gbc_wv= gbc_wv.transform(test)

lsvc_wv= pipeline6.fit(train)
prediction_lsvc_wv= lsvc_wv.transform(test)

ovr_wv = pipeline7.fit(train)
prediction_ovr_wv= ovr_wv.transform(test)
















In [142]:
prediction_dt_wv= dt_wv.transform(test)
nb_wv = pipeline2.fit(train)
prediction_nb_wv= nb_wv.transform(test)  ##Naive Bayes is not working with word2vec

Py4JJavaError: An error occurred while calling o1380.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1094.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1094.0 (TID 1032) (10.46.235.68 executor driver): java.lang.RuntimeException: Vector values MUST NOT be Negative, NaN or Infinity, but got [-3.5843619843944907E-4,1.143665382793794E-4,-3.519348263378358E-4,2.440131849855081E-4,-3.424555406026128E-5,-2.459082881816559E-4,-5.363427367733999E-4,1.838261070790597E-4,-1.8081811463667286E-4,1.0001053063509364E-4,-3.999982824704299E-4,-8.439564391867154E-4,-5.516021661201698E-4,-5.093515711551945E-4,-6.861005911357804E-4,-3.88080985026641E-5,1.1722860088209725E-4,-6.61263656285074E-4,2.6825293187155493E-4,-2.2023555357009172E-4,-3.1708823452289734E-4,-1.7855643818620592E-5,-2.734604511513478E-4,1.3192514759591883E-4,4.908330787050847E-4,2.805259739236337E-4,1.9223266604563428E-4,1.23567969745232E-4,-2.2764063517873484E-4,-7.076301553752273E-4,6.236794231679393E-6,2.342866921228253E-4,-9.619481990941697E-4,7.721688469044035E-4,-3.16053939362367E-4,2.656221331562847E-4,-5.471269283184988E-4,-4.3285829694165534E-4,-1.1889816960319877E-4,-2.907096576463017E-5,8.219532901421189E-5,-4.970469227474596E-5,-6.635692746688922E-5,8.59491035549177E-4,-6.354417483736243E-5,1.1546562503402431E-4,4.332182288635522E-4,7.151071137438217E-5,5.716221445860962E-4,2.851764050622781E-5,1.649152060660223E-5,-2.610742166224453E-4,9.958047950122919E-5,2.0790754933841527E-4,-9.759265716032435E-5,4.438423137697908E-4,5.342397715948108E-4,-3.674044791195128E-4,-1.08897279359452E-4,-1.9934550639138452E-4,-4.937033744580629E-4,-2.823768785068144E-4,3.979036894937356E-6,8.519979245546791E-4,-9.055458293813797E-5,-2.947525936178863E-4,-2.6386563291048835E-4,-2.8086936799809337E-4,4.708440974354744E-4,7.650458656927286E-4,3.4626584965735674E-4,-5.864471523737948E-4,-4.159428256874283E-4,7.816068925118694E-4,-4.2997936058479047E-4,6.152409785297803E-4,2.082427252187497E-5,3.111958681579886E-4,-3.335202442637334E-4,-5.708756895425419E-4,-6.992324617587858E-6,2.971530387488504E-4,2.160734956204477E-4,-5.913317145314068E-4,-5.558519462485694E-4,-4.172072254328264E-5,-6.908525327970791E-4,-1.878479896630678E-5,-5.613716526163948E-4,-7.751209907130235E-5,2.2180631316991314E-4,-2.474600591489838E-4,4.4360844751483656E-4,1.013254675652004E-4,5.716359719372122E-4,-6.571152836032625E-4,7.04923392428706E-6,-1.0576288655607236E-4,4.963414491309474E-4,-5.511862206428001E-4]
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1(ObjectHashAggregateExec.scala:92)
	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1$adapted(ObjectHashAggregateExec.scala:90)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:875)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:875)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.lang.RuntimeException: Vector values MUST NOT be Negative, NaN or Infinity, but got [-3.5843619843944907E-4,1.143665382793794E-4,-3.519348263378358E-4,2.440131849855081E-4,-3.424555406026128E-5,-2.459082881816559E-4,-5.363427367733999E-4,1.838261070790597E-4,-1.8081811463667286E-4,1.0001053063509364E-4,-3.999982824704299E-4,-8.439564391867154E-4,-5.516021661201698E-4,-5.093515711551945E-4,-6.861005911357804E-4,-3.88080985026641E-5,1.1722860088209725E-4,-6.61263656285074E-4,2.6825293187155493E-4,-2.2023555357009172E-4,-3.1708823452289734E-4,-1.7855643818620592E-5,-2.734604511513478E-4,1.3192514759591883E-4,4.908330787050847E-4,2.805259739236337E-4,1.9223266604563428E-4,1.23567969745232E-4,-2.2764063517873484E-4,-7.076301553752273E-4,6.236794231679393E-6,2.342866921228253E-4,-9.619481990941697E-4,7.721688469044035E-4,-3.16053939362367E-4,2.656221331562847E-4,-5.471269283184988E-4,-4.3285829694165534E-4,-1.1889816960319877E-4,-2.907096576463017E-5,8.219532901421189E-5,-4.970469227474596E-5,-6.635692746688922E-5,8.59491035549177E-4,-6.354417483736243E-5,1.1546562503402431E-4,4.332182288635522E-4,7.151071137438217E-5,5.716221445860962E-4,2.851764050622781E-5,1.649152060660223E-5,-2.610742166224453E-4,9.958047950122919E-5,2.0790754933841527E-4,-9.759265716032435E-5,4.438423137697908E-4,5.342397715948108E-4,-3.674044791195128E-4,-1.08897279359452E-4,-1.9934550639138452E-4,-4.937033744580629E-4,-2.823768785068144E-4,3.979036894937356E-6,8.519979245546791E-4,-9.055458293813797E-5,-2.947525936178863E-4,-2.6386563291048835E-4,-2.8086936799809337E-4,4.708440974354744E-4,7.650458656927286E-4,3.4626584965735674E-4,-5.864471523737948E-4,-4.159428256874283E-4,7.816068925118694E-4,-4.2997936058479047E-4,6.152409785297803E-4,2.082427252187497E-5,3.111958681579886E-4,-3.335202442637334E-4,-5.708756895425419E-4,-6.992324617587858E-6,2.971530387488504E-4,2.160734956204477E-4,-5.913317145314068E-4,-5.558519462485694E-4,-4.172072254328264E-5,-6.908525327970791E-4,-1.878479896630678E-5,-5.613716526163948E-4,-7.751209907130235E-5,2.2180631316991314E-4,-2.474600591489838E-4,4.4360844751483656E-4,1.013254675652004E-4,5.716359719372122E-4,-6.571152836032625E-4,7.04923392428706E-6,-1.0576288655607236E-4,4.963414491309474E-4,-5.511862206428001E-4]
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1(ObjectHashAggregateExec.scala:92)
	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1$adapted(ObjectHashAggregateExec.scala:90)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:875)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:875)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)


In [149]:
##AUC
eval_wv=BinaryClassificationEvaluator()
auc_lr_wv=eval_wv.evaluate(prediction_lr_wv)
#auc_nb=eval_wv.evaluate(prediction_nb_wv)
auc_dt_wv=eval_wv.evaluate(prediction_dt_wv)
auc_rf_wv=eval_wv.evaluate(prediction_rf_wv)
auc_gbc_wv=eval_wv.evaluate(prediction_gbc_wv)
auc_lsvc_wv=eval_wv.evaluate(prediction_lsvc_wv)
#auc_ovr_wv=eval_wv.evaluate(prediction_ovr_wv)  ##auc for ovr is not working

print('Area Under ROC for Logistic Regression', auc_lr_wv)
#print('Area Under ROC for Naive Bayes', auc_nb_wv)
print('Area Under ROC for Decision Trees', auc_dt_wv)
print('Area Under ROC for Random Forest', auc_rf_wv)
print('Area Under ROC for Gradient Boosted Classifier', auc_gbc_wv)
print('Area Under ROC for Linear SVC', auc_lsvc_wv)
#print('Area Under ROC for One-Vs-All', auc_ovr_wv)

Area Under ROC for Logistic Regression 0.6029684601113172
Area Under ROC for Decision Trees 0.6502782931354361
Area Under ROC for Random Forest 0.6623376623376623
Area Under ROC for Gradient Boosted Classifier 0.6141001855287569
Area Under ROC for Linear SVC 0.6382189239332097


In [152]:
###Accuracy
pred_lr_wv=prediction_lr_wv.select("label","prediction")
evallr_wv = MulticlassClassificationEvaluator()
evallr_wv.setPredictionCol("prediction")
accuracy_lr_wv=evallr_wv.evaluate(pred_lr_wv)
print("Accuracy of Logistic Regression = %g"% (accuracy_lr_wv))
print("Test Error of Logistic Regression= %g"% (1-accuracy_lr_wv))

pred_dt_wv=prediction_dt_wv.select("label","prediction")
evaldt_wv = MulticlassClassificationEvaluator()
evaldt_wv.setPredictionCol("prediction")
accuracy_dt_wv=evaldt_wv.evaluate(pred_dt_wv)
print("Accuracy of Decision Tree Classifier = %g"% (accuracy_dt_wv))
print("Test Error of Decision Tree Classifier= %g"% (1-accuracy_dt_wv))

pred_rf_wv=prediction_rf_wv.select("label","prediction")
evalrf_wv = MulticlassClassificationEvaluator()
evalrf_wv.setPredictionCol("prediction")
accuracy_rf_wv=evalrf_wv.evaluate(pred_rf_wv)
print("Accuracy of Random Forest Classifier = %g"% (accuracy_rf_wv))
print("Test Error of Random Forest Classifier= %g"% (1-accuracy_rf_wv))

pred_gbc_wv=prediction_gbc_wv.select("label","prediction")
evalgbc_wv = MulticlassClassificationEvaluator()
evalgbc_wv.setPredictionCol("prediction")
accuracy_gbc_wv=evalgbc_wv.evaluate(pred_gbc_wv)
print("Accuracy of Gradient Boosted Classifier = %g"% (accuracy_gbc_wv))
print("Test Error of Gradient Boosted Classifier= %g"% (1-accuracy_gbc_wv))

pred_ovr_wv=prediction_ovr_wv.select("label","prediction")
evalovr_wv = MulticlassClassificationEvaluator()
evalovr_wv.setPredictionCol("prediction")
accuracy_ovr_wv=evalovr_wv.evaluate(pred_ovr_wv)
print("Accuracy of One-Vs-All Classifier = %g"% (accuracy_ovr_wv))
print("Test Error of One-Vs-All Classifier= %g"% (1-accuracy_ovr_wv))





##One-Vs-All not working here






Accuracy of Logistic Regression = 0.766667
Test Error of Logistic Regression= 0.233333
Accuracy of Decision Tree Classifier = 0.813228
Test Error of Decision Tree Classifier= 0.186772
Accuracy of Random Forest Classifier = 0.771605
Test Error of Random Forest Classifier= 0.228395
Accuracy of Gradient Boosted Classifier = 0.792
Test Error of Gradient Boosted Classifier= 0.208


Py4JJavaError: An error occurred while calling o7551.evaluate.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1387.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1387.0 (TID 1201) (10.46.235.68 executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:192)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:166)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:82)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:131)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:853)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:853)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.sql.execution.SQLExecutionRDD.$anonfun$compute$1(SQLExecutionRDD.scala:52)
	at org.apache.spark.sql.internal.SQLConf$.withExistingConf(SQLConf.scala:158)
	at org.apache.spark.sql.execution.SQLExecutionRDD.compute(SQLExecutionRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/java.net.PlainSocketImpl.waitForNewConnection(Native Method)
	at java.base/java.net.PlainSocketImpl.socketAccept(PlainSocketImpl.java:163)
	at java.base/java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:474)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:565)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:533)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:179)
	... 59 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2284)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2328)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1019)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1018)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$collectAsMap$1(PairRDDFunctions.scala:738)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
	at org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:737)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.confusions$lzycompute(MulticlassMetrics.scala:61)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.confusions(MulticlassMetrics.scala:52)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.labelCountByClass$lzycompute(MulticlassMetrics.scala:66)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.labelCountByClass(MulticlassMetrics.scala:64)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.weightedFMeasure(MulticlassMetrics.scala:227)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.weightedFMeasure$lzycompute(MulticlassMetrics.scala:235)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.weightedFMeasure(MulticlassMetrics.scala:235)
	at org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator.evaluate(MulticlassClassificationEvaluator.scala:152)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:192)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:166)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:82)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:131)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:853)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:853)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.sql.execution.SQLExecutionRDD.$anonfun$compute$1(SQLExecutionRDD.scala:52)
	at org.apache.spark.sql.internal.SQLConf$.withExistingConf(SQLConf.scala:158)
	at org.apache.spark.sql.execution.SQLExecutionRDD.compute(SQLExecutionRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/java.net.PlainSocketImpl.waitForNewConnection(Native Method)
	at java.base/java.net.PlainSocketImpl.socketAccept(PlainSocketImpl.java:163)
	at java.base/java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:474)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:565)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:533)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:179)
	... 59 more


In [154]:
##f1,precision,recall
lab_true_lr_wv = pred_lr_wv.select(['label']).collect()
lab_pred_lr_wv = pred_lr_wv.select(['prediction']).collect()
print(classification_report(lab_true_lr_wv, lab_pred_lr_wv))
print(confusion_matrix(lab_true_lr_wv, lab_pred_lr_wv))

lab_true_dt_wv = pred_dt_wv.select(['label']).collect()
lab_pred_dt_wv = pred_dt_wv.select(['prediction']).collect()
print(classification_report(lab_true_dt_wv, lab_pred_dt_wv))
print(confusion_matrix(lab_true_dt_wv, lab_pred_dt_wv))

lab_true_rf_wv = pred_rf_wv.select(['label']).collect()
lab_pred_rf_wv = pred_rf_wv.select(['prediction']).collect()
print(classification_report(lab_true_rf_wv, lab_pred_rf_wv))
print(confusion_matrix(lab_true_rf_wv, lab_pred_rf_wv))


lab_true_gbc_wv = pred_gbc_wv.select(['label']).collect()
lab_pred_gbc_wv = pred_gbc_wv.select(['prediction']).collect()
print(classification_report(lab_true_gbc_wv, lab_pred_gbc_wv))
print(confusion_matrix(lab_true_gbc_wv, lab_pred_gbc_wv))

#lab_true_ovr_wv = pred_ovr_wv.select(['label']).collect()
#lab_pred_ovr_wv = pred_ovr_wv.select(['prediction']).collect()
#print(classification_report(lab_true_ovr_wv, lab_pred_ovr_wv))
#print(confusion_matrix(lab_true_ovr_wv, lab_pred_ovr_wv))-ovr not working too well 




              precision    recall  f1-score   support

           0       0.36      0.36      0.36        11
           1       0.86      0.86      0.86        49

    accuracy                           0.77        60
   macro avg       0.61      0.61      0.61        60
weighted avg       0.77      0.77      0.77        60

[[ 4  7]
 [ 7 42]]
              precision    recall  f1-score   support

           0       0.50      0.45      0.48        11
           1       0.88      0.90      0.89        49

    accuracy                           0.82        60
   macro avg       0.69      0.68      0.68        60
weighted avg       0.81      0.82      0.81        60

[[ 5  6]
 [ 5 44]]
              precision    recall  f1-score   support

           0       1.00      0.09      0.17        11
           1       0.83      1.00      0.91        49

    accuracy                           0.83        60
   macro avg       0.92      0.55      0.54        60
weighted avg       0.86      0.83   

Py4JJavaError: An error occurred while calling o7633.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1400.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1400.0 (TID 1213) (10.46.235.68 executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:192)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:166)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:82)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:131)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:853)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:853)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/java.net.PlainSocketImpl.waitForNewConnection(Native Method)
	at java.base/java.net.PlainSocketImpl.socketAccept(PlainSocketImpl.java:163)
	at java.base/java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:474)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:565)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:533)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:179)
	... 43 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2284)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2328)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1019)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1018)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)
	at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:3997)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4167)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4165)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4165)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3994)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:192)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:166)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:82)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:131)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:853)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:853)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/java.net.PlainSocketImpl.waitForNewConnection(Native Method)
	at java.base/java.net.PlainSocketImpl.socketAccept(PlainSocketImpl.java:163)
	at java.base/java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:474)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:565)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:533)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:179)
	... 43 more


In [193]:
token=RegexTokenizer(inputCol="text", outputCol="text2", pattern="\\W")
remove= StopWordsRemover(inputCol=token.getOutputCol(), outputCol="text3")
htf = HashingTF(inputCol="text3", outputCol="features", numFeatures=100000)
idf = IDF(inputCol="features", outputCol="tf")

pipeline1 = Pipeline(stages= [token,remove,htf,idf,model])
pipeline2 = Pipeline(stages= [token,remove,htf,idf,model2])##not working
pipeline3 = Pipeline(stages= [token,remove,htf,idf,model3])##not working
pipeline4 = Pipeline(stages= [token,remove,htf,idf,model4])##not working
pipeline5 = Pipeline(stages= [token,remove,htf,idf,model5])##not working
pipeline6 = Pipeline(stages= [token,remove,htf,idf,model6])
pipeline7 = Pipeline(stages= [token,remove,htf,idf,model7])##not working

In [194]:
##fitting the models and making predictions


lr_tf = pipeline1.fit(train)
prediction_lr_tf= lr_tf.transform(test)

#nb_tf = pipeline2.fit(train)
#prediction_nb_tf= nb_tf.transform(test)

#dt_tf = pipeline3.fit(train)
#prediction_dt_tf= dt_tf.transform(test)

#rf_tf=pipeline4.fit(train)
#prediction_rf_tf= rf_tf.transform(test)

#gbc_tf = pipeline5.fit(train)
#prediction_gbc_tf= gbc_tf.transform(test)

lsvc_tf= pipeline6.fit(train)
prediction_lsvc_tf= lsvc_tf.transform(test)

#ovr_tf = pipeline7.fit(train)
#prediction_ovr_tf= ovr_tf.transform(test)

In [217]:
eval_tf=BinaryClassificationEvaluator()
pred_lr_tf=prediction_lr_tf.select("label","prediction")
#auc_lr_tf=eval_tf.evaluate(pred_lr_tf)
#auc_lsvc_tf=eval_tf.evaluate(prediction_lsvc_tf)

#print('Area Under ROC for Logistic Regression', auc_lr_tf)
#print('Area Under ROC for Linear SVC', auc_lsvc_tf)

##Accuracy
pred_lr_tf.show(10)

+-----+----------+
|label|prediction|
+-----+----------+
|    1|       0.0|
|    1|       0.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    0|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    0|       0.0|
+-----+----------+
only showing top 10 rows



In [166]:
##Break down the steps and fit the models for which pipeline did not work
htf = HashingTF(inputCol="text3", outputCol="features", numFeatures=20)
htf2 = htf.transform(words2)
idf = IDF(inputCol="features", outputCol="tf")
idf2= idf.fit(htf2)
td= idf2.transform(htf2)


In [218]:
##f1 score,precision,recall
lab_true_lr_tf = prediction_lr_tf.select(['label']).collect()
lab_pred_lr_tf = prediction_lr_tf.select(['prediction']).collect()
print(classification_report(lab_true_lr_tf, lab_pred_lr_tf))
print(confusion_matrix(lab_true_lr_tf, lab_pred_lr_tf))

lab_true_lsvc_tf = prediction_lsvc_tf.select(['label']).collect()
lab_pred_lsvc_tf = prediction_lsvc_tf.select(['prediction']).collect()
print(classification_report(lab_true_lsvc_tf, lab_pred_lsvc_tf))
print(confusion_matrix(lab_true_lsvc_tf, lab_pred_lsvc_tf))

              precision    recall  f1-score   support

           0       0.60      0.55      0.57        11
           1       0.90      0.92      0.91        49

    accuracy                           0.85        60
   macro avg       0.75      0.73      0.74        60
weighted avg       0.85      0.85      0.85        60

[[ 6  5]
 [ 4 45]]
              precision    recall  f1-score   support

           0       0.64      0.64      0.64        11
           1       0.92      0.92      0.92        49

    accuracy                           0.87        60
   macro avg       0.78      0.78      0.78        60
weighted avg       0.87      0.87      0.87        60

[[ 7  4]
 [ 4 45]]


In [212]:
##Compute accuracy by hand since above method is not working for some reason
prediction_lr_tf.show()


+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|                text|label|               text2|               text3|            features|                  tf|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
| but theres enoug...|    1|[but, theres, eno...|[theres, enough, ...|(100000,[12106,23...|(100000,[12106,23...|[9.28235268071518...|[0.99990695671363...|       0.0|
|A little jank but...|    1|[a, little, jank,...|[little, jank, lo...|(100000,[756,2543...|(100000,[756,2543...|[39.7453827394167...|           [1.0,0.0]|       0.0|
|Addictive fun and...|    1|[addictive, fun, ...|[addictive, fun, ...|(100000,[12067,35...|(100000,[12067,35...|[-19.045901986797...|[5.35143015993216...|       1.0|
|All

In [171]:
td.show(5)
td=td.select("label","tf")
td.show(5)

+-----+--------------------+
|label|                  tf|
+-----+--------------------+
|    0|(20,[5,12,17,19],...|
|    0|(20,[0,1,2,3,4,5,...|
|    1|(20,[6,15,17],[1....|
|    1|(20,[3,6,7,8,10,1...|
|    1|(20,[0,1,2,3,4,5,...|
+-----+--------------------+
only showing top 5 rows

+-----+--------------------+
|label|                  tf|
+-----+--------------------+
|    0|(20,[5,12,17,19],...|
|    0|(20,[0,1,2,3,4,5,...|
|    1|(20,[6,15,17],[1....|
|    1|(20,[3,6,7,8,10,1...|
|    1|(20,[0,1,2,3,4,5,...|
+-----+--------------------+
only showing top 5 rows



In [184]:
##Naive Bayes
td = td.withColumn("label", td["label"].cast(IntegerType()))
test = test.withColumn("label",test["label"].cast(IntegerType()))
##Processing test data as well
test2 = re.transform(test)
test3= remove.transform(test2)
test4 = htf.transform(test3)
test5= idf.fit(test4)
td2= test5.transform(test4)
td2.show(5)
td2=td2.select("label","tf")


nb_td = NaiveBayes(smoothing=1.0)
nb_td2=nb_td.fit(td)
#pred_nb_td = nb_td2.transform(td2)

# Accuracy
#pred_nb_tfidf=pred_nb_td.select("label","prediction")
#eval_nb = MulticlassClassificationEvaluator(metricName="accuracy")
#accuracy_nb_td = eval_nb.evaluate(pred_nb_tfidf)
#print("Accuracy of Naive Bayes=  %g"% (accuracy_nb_td))
#print("Test Error of Naive Bayes= %g"% (1-accuracy_nb_td))

#val_nb_tfidf = nb_tfidf.transform(df_val)
#val_nb_tfidf.select(['label','prediction','probability']).show(5, False)
# Compute accuracy on validation set
#accuracy = evaluator.evaluate(val_nb_tfidf)
#print("Validation set accuracy = " + str(accuracy))

#pred_nb_tfidf = nb_tfidf.transform(df_test)
#pred_nb_tfidf.select(['label','prediction','probability']).show(5, False)
# Compute accuracy on test set
#accuracy = evaluator.evaluate(pred_nb_tfidf)
#print("Test set accuracy = " + str(accuracy))

+--------------------+-----+--------------------+--------------------+--------------------+--------------------+
|                text|label|               text2|               text3|            features|                  tf|
+--------------------+-----+--------------------+--------------------+--------------------+--------------------+
| but theres enoug...|    1|[but, theres, eno...|[theres, enough, ...|(20,[3,6,8,19],[1...|(20,[3,6,8,19],[0...|
|A little jank but...|    1|[a, little, jank,...|[little, jank, lo...|(20,[0,1,2,3,4,5,...|(20,[0,1,2,3,4,5,...|
|Addictive fun and...|    1|[addictive, fun, ...|[addictive, fun, ...|(20,[1,7,14,19],[...|(20,[1,7,14,19],[...|
|All a board on th...|    1|[all, a, board, o...|[board, choo, cho...|(20,[3,14,17],[2....|(20,[3,14,17],[1....|
|Bought it soon af...|    1|[bought, it, soon...|[bought, soon, re...|(20,[1,2,3,5,6,8,...|(20,[1,2,3,5,6,8,...|
+--------------------+-----+--------------------+--------------------+--------------------+-----

IllegalArgumentException: features does not exist. Available: label, tf

In [180]:
td2.show(5)

+-----+--------------------+
|label|                  tf|
+-----+--------------------+
|    1|(20,[3,6,8,19],[0...|
|    1|(20,[0,1,2,3,4,5,...|
|    1|(20,[1,7,14,19],[...|
|    1|(20,[3,14,17],[1....|
|    1|(20,[1,2,3,5,6,8,...|
+-----+--------------------+
only showing top 5 rows



Py4JJavaError: An error occurred while calling o11515.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1667.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1667.0 (TID 1479) (10.46.235.68 executor driver): java.io.FileNotFoundException: C:\Users\RimJhim\AppData\Local\Temp\blockmgr-52dbc4a1-6b46-461c-86a1-a66c3e16bb1e\3c\temp_shuffle_beac3bda-6c20-4486-b2f3-1efe8b57d73e (The system cannot find the path specified)
	at java.base/java.io.FileOutputStream.open0(Native Method)
	at java.base/java.io.FileOutputStream.open(FileOutputStream.java:298)
	at java.base/java.io.FileOutputStream.<init>(FileOutputStream.java:237)
	at org.apache.spark.storage.DiskBlockObjectWriter.initialize(DiskBlockObjectWriter.scala:140)
	at org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:159)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:306)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:171)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.io.FileNotFoundException: C:\Users\RimJhim\AppData\Local\Temp\blockmgr-52dbc4a1-6b46-461c-86a1-a66c3e16bb1e\3c\temp_shuffle_beac3bda-6c20-4486-b2f3-1efe8b57d73e (The system cannot find the path specified)
	at java.base/java.io.FileOutputStream.open0(Native Method)
	at java.base/java.io.FileOutputStream.open(FileOutputStream.java:298)
	at java.base/java.io.FileOutputStream.<init>(FileOutputStream.java:237)
	at org.apache.spark.storage.DiskBlockObjectWriter.initialize(DiskBlockObjectWriter.scala:140)
	at org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:159)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:306)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:171)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)


In [203]:
pred_gbc.schema["prediction"].dataType

DoubleType()

In [185]:
td.show(2)

+-----+--------------------+
|label|                  tf|
+-----+--------------------+
|    1|(20,[3,6,8,19],[0...|
|    1|(20,[0,1,2,3,4,5,...|
+-----+--------------------+
only showing top 2 rows



In [None]:
##CROSS VALIDATION
lr = LogisticRegression()
grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
evaluator = BinaryClassificationEvaluator()
cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator,
    )
counts4=count3.select("label","text")
counts4=counts4.withColumn("label", counts4["label"].cast(IntegerType()))
cvModel = cv.fit(counts4)