In [1]:
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
import time
import numpy as np

In [2]:
df = pd.read_csv('training_test_data.txt')

In [3]:
df.head()

Unnamed: 0,topic,body
0,grain,the u.s. agriculture department reported the f...
1,wheat,"the commodity credit corporation, ccc, has acc..."
2,coffee,"international coffee organization, ico, produc..."
3,sugar,sugar imports subject to the u.s. sugar import...
4,trade,"inflation plan, initially hailed at home and a..."


In [4]:
vectorizer = TfidfVectorizer()
start = time.clock()
df['body'] = vectorizer.fit_transform(df['body'])
end = time.clock()
sklearn_duration = end - start
print(sklearn_duration)
df.head()

0.5838239999999999


Unnamed: 0,topic,body
0,grain,"(0, 15051)\t0.0269008829518\n (0, 1549)\t0...."
1,wheat,"(0, 15051)\t0.0269008829518\n (0, 1549)\t0...."
2,coffee,"(0, 15051)\t0.0269008829518\n (0, 1549)\t0...."
3,sugar,"(0, 15051)\t0.0269008829518\n (0, 1549)\t0...."
4,trade,"(0, 15051)\t0.0269008829518\n (0, 1549)\t0...."


In [5]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
from pyspark import SparkConf, SparkContext
sc = SparkContext()

In [6]:
# Make the DataFrame
sqlContext = SQLContext(sc)
sdf = sqlContext.read.format('csv').options(header='true').load('training_test_data.txt')


tokenizer = Tokenizer(inputCol="body", outputCol="words")
tokenized_sdf = tokenizer.transform(sdf)
start = time.clock()
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures")
featurized_sdf = hashingTF.transform(tokenized_sdf)
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurized_sdf)
final = idfModel.transform(featurized_sdf).select("topic", "features")
end = time.clock()
pyspark_duration = end - start
print(pyspark_duration)
final = final.select(col("topic").alias("label"), col("features").alias("features"))

0.010882000000000058


In [7]:
final.show()

+------+--------------------+
| label|            features|
+------+--------------------+
| grain|(262144,[1846,328...|
| wheat|(262144,[9639,158...|
|coffee|(262144,[4081,508...|
| sugar|(262144,[353,2366...|
| trade|(262144,[3283,380...|
|  ship|(262144,[1846,963...|
| grain|(262144,[9639,101...|
|  ship|(262144,[444,2776...|
|coffee|(262144,[9677,154...|
| grain|(262144,[4737,963...|
| sugar|(262144,[227,2366...|
| grain|(262144,[27570,28...|
| grain|(262144,[2410,328...|
| crude|(262144,[4900,910...|
| grain|(262144,[5213,963...|
| crude|(262144,[7838,967...|
|   cpi|(262144,[733,1846...|
| grain|(262144,[640,4842...|
| crude|(262144,[5381,910...|
| grain|(262144,[5381,678...|
+------+--------------------+
only showing top 20 rows



In [8]:
from pyspark.ml.feature import StringIndexer

In [11]:
indexer = StringIndexer(inputCol="label", outputCol="indexed")
final = indexer.fit(final).transform(final)
final = final.select(col("indexed").alias("label"), col("features").alias("features"))
final.show()
df.write.format('com.databricks.spark.csv').save('final.csv')

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(262144,[1846,328...|
| 15.0|(262144,[9639,158...|
|  6.0|(262144,[4081,508...|
|  5.0|(262144,[353,2366...|
|  2.0|(262144,[3283,380...|
|  4.0|(262144,[1846,963...|
|  0.0|(262144,[9639,101...|
|  4.0|(262144,[444,2776...|
|  6.0|(262144,[9677,154...|
|  0.0|(262144,[4737,963...|
|  5.0|(262144,[227,2366...|
|  0.0|(262144,[27570,28...|
|  0.0|(262144,[2410,328...|
|  1.0|(262144,[4900,910...|
|  0.0|(262144,[5213,963...|
|  1.0|(262144,[7838,967...|
| 10.0|(262144,[733,1846...|
|  0.0|(262144,[640,4842...|
|  1.0|(262144,[5381,910...|
|  0.0|(262144,[5381,678...|
+-----+--------------------+
only showing top 20 rows



AttributeError: 'DataFrame' object has no attribute 'write'

In [10]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [11]:
data, validate = final.randomSplit([.9, .1])

In [None]:
splits = [
    [0.55555, 0.44445],
    [0.66666, 0.33334],
    [0.77777, 0.22223],
]
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")
models = {i: [] for i in range(3)}
accs = {i: [] for i in range(3)}
for s, split in enumerate(splits):
    for i in range(6):
        train, test = data.randomSplit(split)
        model = nb.fit(train)
        predictions = model.transform(test)
        # print(predictions.head().prediction)
        accuracy = evaluator.evaluate(predictions)
        print(accuracy)
        accs[s].append(accuracy)
        models[s].append(model)
    print("AVG: {}".format(np.mean(accs[s])))
        

0.7040229885057471
0.7159647404505387
0.7010816125860374
0.7040714995034757
0.7170923379174853
0.7105263157894737
AVG: 0.7087932491254597
0.7046894803548795
0.7100515463917526
0.6958277254374159
0.7313624678663239
0.7029449423815621
0.7397078353253652
AVG: 0.7140973329595499
0.7035490605427975
0.7470817120622568
0.7542857142857143
0.7325349301397206


--- Logging error ---
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:50278)
Traceback (most recent call last):
  File "/usr/local/bin/anaconda3/lib/python3.5/site-packages/IPython/core/interactiveshell.py", line 2881, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-12-0d2e9e79bdf6>", line 13, in <module>
    model = nb.fit(train)
  File "/usr/local/bin/anaconda3/lib/python3.5/site-packages/pyspark/ml/base.py", line 64, in fit
    return self._fit(dataset)
  File "/usr/local/bin/anaconda3/lib/python3.5/site-packages/pyspark/ml/wrapper.py", line 265, in _fit
    java_model = self._fit_java(dataset)
  File "/usr/local/bin/anaconda3/lib/python3.5/site-packages/pyspark/ml/wrapper.py", line 262, in _fit_java
    return self._java_obj.fit(dataset._jdf)
  File "/usr/local/bin/anaconda3/lib/python3.5/site-packages/py4j/java_gateway.py", line 1160, in __call__
    answer, self.gateway_client, self.target_id

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:50278)
Traceback (most recent call last):
  File "/usr/local/bin/anaconda3/lib/python3.5/site-packages/IPython/core/interactiveshell.py", line 2881, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-12-0d2e9e79bdf6>", line 13, in <module>
    model = nb.fit(train)
  File "/usr/local/bin/anaconda3/lib/python3.5/site-packages/pyspark/ml/base.py", line 64, in fit
    return self._fit(dataset)
  File "/usr/local/bin/anaconda3/lib/python3.5/site-packages/pyspark/ml/wrapper.py", line 265, in _fit
    java_model = self._fit_java(dataset)
  File "/usr/local/bin/anaconda3/lib/python3.5/site-packages/pyspark/ml/wrapper.py", line 262, in _fit_java
    return self._java_obj.fit(dataset._jdf)
  File "/usr/local/bin/anaconda3/lib/python3.5/site-packages/py4j/java_gateway.py", line 1160, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:50278)
Traceback (most recent call last):
  File "/usr/local/bin/anaconda3/lib/python3.5/site-packages/IPython/core/interactiveshell.py", line 2881, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-12-0d2e9e79bdf6>", line 13, in <module>
    model = nb.fit(train)
  File "/usr/local/bin/anaconda3/lib/python3.5/site-packages/pyspark/ml/base.py", line 64, in fit
    return self._fit(dataset)
  File "/usr/local/bin/anaconda3/lib/python3.5/site-packages/pyspark/ml/wrapper.py", line 265, in _fit
    java_model = self._fit_java(dataset)
  File "/usr/local/bin/anaconda3/lib/python3.5/site-packages/pyspark/ml/wrapper.py", line 262, in _fit_java
    return self._java_obj.fit(dataset._jdf)
  File "/usr/local/bin/anaconda3/lib/python3.5/site-packages/py4j/java_gateway.py", line 1160, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:50278)
Traceback (most recent call last):
  File "/usr/local/bin/anaconda3/lib/python3.5/site-packages/IPython/core/interactiveshell.py", line 2881, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-12-0d2e9e79bdf6>", line 13, in <module>
    model = nb.fit(train)
  File "/usr/local/bin/anaconda3/lib/python3.5/site-packages/pyspark/ml/base.py", line 64, in fit
    return self._fit(dataset)
  File "/usr/local/bin/anaconda3/lib/python3.5/site-packages/pyspark/ml/wrapper.py", line 265, in _fit
    java_model = self._fit_java(dataset)
  File "/usr/local/bin/anaconda3/lib/python3.5/site-packages/pyspark/ml/wrapper.py", line 262, in _fit_java
    return self._java_obj.fit(dataset._jdf)
  File "/usr/local/bin/anaconda3/lib/python3.5/site-packages/py4j/java_gateway.py", line 1160, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:50278)
Traceback (most recent call last):
  File "/usr/local/bin/anaconda3/lib/python3.5/site-packages/IPython/core/interactiveshell.py", line 2881, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-12-0d2e9e79bdf6>", line 13, in <module>
    model = nb.fit(train)
  File "/usr/local/bin/anaconda3/lib/python3.5/site-packages/pyspark/ml/base.py", line 64, in fit
    return self._fit(dataset)
  File "/usr/local/bin/anaconda3/lib/python3.5/site-packages/pyspark/ml/wrapper.py", line 265, in _fit
    java_model = self._fit_java(dataset)
  File "/usr/local/bin/anaconda3/lib/python3.5/site-packages/pyspark/ml/wrapper.py", line 262, in _fit_java
    return self._java_obj.fit(dataset._jdf)
  File "/usr/local/bin/anaconda3/lib/python3.5/site-packages/py4j/java_gateway.py", line 1160, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/bin/anaconda3/lib/python3.5/site-packages/IPython/core/interactiveshell.py", line 2881, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-12-0d2e9e79bdf6>", line 13, in <module>
    model = nb.fit(train)
  File "/usr/local/bin/anaconda3/lib/python3.5/site-packages/pyspark/ml/base.py", line 64, in fit
    return self._fit(dataset)
  File "/usr/local/bin/anaconda3/lib/python3.5/site-packages/pyspark/ml/wrapper.py", line 265, in _fit
    java_model = self._fit_java(dataset)
  File "/usr/local/bin/anaconda3/lib/python3.5/site-packages/pyspark/ml/wrapper.py", line 262, in _fit_java
    return self._java_obj.fit(dataset._jdf)
  File "/usr/local/bin/anaconda3/lib/python3.5/site-packages/py4j/java_gateway.py", line 1160, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/local/bin/anaconda3/lib/python3.5/site-packages/pyspark