In [1]:
import os
os.environ['PYSPARK_PYTHON'] = '/opt/conda/envs/python2/bin/python2'

from pyspark import SparkConf, SparkContext
sconf = SparkConf()
sconf.set("spark.driver.memory", "32g")
sconf.setMaster("local[*]")
sc = SparkContext(conf=sconf)

In [2]:
from pyspark.sql import SQLContext
sqlCtx = SQLContext(sc)

In [3]:
import numpy ; numpy.version.version

'1.10.1'

In [4]:
data = sqlCtx.read.json('20newsgroups.labelled.json.gz')

In [5]:
data.printSchema()

root
 |-- approved: string (nullable = true)
 |-- article-id: string (nullable = true)
 |-- content: string (nullable = true)
 |-- date: string (nullable = true)
 |-- distribution: string (nullable = true)
 |-- followup-to: string (nullable = true)
 |-- from: string (nullable = true)
 |-- in-reply-to: string (nullable = true)
 |-- keywords: string (nullable = true)
 |-- label: string (nullable = true)
 |-- lines: string (nullable = true)
 |-- message-id: string (nullable = true)
 |-- newsgroups: string (nullable = true)
 |-- nntp-posting-host: string (nullable = true)
 |-- organization: string (nullable = true)
 |-- originator: string (nullable = true)
 |-- path: string (nullable = true)
 |-- references: string (nullable = true)
 |-- reply-to: string (nullable = true)
 |-- sender: string (nullable = true)
 |-- subject: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- x-newsreader: string (nullable = true)
 |-- xref: string (nullable = true)



In [6]:
data.registerTempTable('newsgroups')
docs = sqlCtx.sql("SELECT label, content FROM newsgroups")
labelsDistinct = sqlCtx.sql("SELECT DISTINCT(label) FROM newsgroups")
print sqlCtx.sql("SELECT COUNT(DISTINCT(label)) as labelCount FROM newsgroups").collect()

[Row(labelCount=20)]


In [7]:
print labelsDistinct.count()
labelList = labelsDistinct.map(lambda r: str(r.label)).collect()
sc.broadcast(labelList)
from IPython.display import display, HTML
th = "<th>Label</th>"
td = ["<tr><td>" + d + "</td></tr>" for d in labelList]
display(HTML("<table><thead><tr>" + "".join(th) + "</tr></thead><tbody>" + "".join(td) + "</tbody></table>"))

20


Label
rec.sport.hockey
sci.electronics
sci.med
rec.autos
comp.sys.mac.hardware
comp.windows.x
rec.sport.baseball
comp.sys.ibm.pc.hardware
misc.forsale
rec.motorcycles


In [8]:
labels = sqlCtx.sql("SELECT label FROM newsgroups")
labelCounts = labels.map(lambda r: (str(r.label), 1)).reduceByKey(lambda v1, v2: v1 + v2).collect()

from IPython.display import display, HTML
th = "<th>ID</th><th>Label</th><th>Messages</th>"
td = ["<tr><td>" + str(labelList.index(l)) +"</td><td>" + l + "</td><td>" + str(m) +"</tr>" for (l,m) in labelCounts]
display(HTML("<table><thead><tr>" + "".join(th) + "</tr></thead><tbody>" + "".join(td) + "</tbody></table>"))

ID,Label,Messages
9,rec.motorcycles,1000
4,comp.sys.mac.hardware,1000
11,talk.politics.misc,1000
15,soc.religion.christian,997
12,comp.graphics,1000
19,talk.religion.misc,1000
5,comp.windows.x,1000
7,comp.sys.ibm.pc.hardware,1000
16,talk.politics.guns,1000
13,alt.atheism,1000


In [22]:
from pyspark.mllib.feature import HashingTF
htf = HashingTF()
labels = docs.map(lambda r: (labelList.index(r.label)))
tfDocs = htf.transform(docs.map(lambda r: r.content.encode('utf-8')))
tfDocs.cache()
print tfDocs.first()

(1048576,[8148,19142,30136,60275,71269,82263,93257,104251,115245,134386,145380,156374,167368,178362,189356,208501,219495,230489,241483,252477,263471,274465,282612,293606,304600,315594,337582,356727,367721,375612,378715,389709,400703,411697,422691,430838,441832,463820,485808,496802,504953,515947,523838,537935,548929,559923,570917,579064,590058,612046,623040,634034,653179,664173,686161,697155,708149,738284,741131,760272,771266,782260,793254,812399,834387,845381,856375,867369,886510,897504,908498,919492,930486,935534,941480,960625,971619,982613,993607,1004601,1034736,1045730],[23.0,33.0,31.0,485.0,269.0,9.0,41.0,26.0,117.0,572.0,890.0,21.0,51.0,3.0,65.0,632.0,164.0,5.0,11.0,1.0,146.0,1960.0,189.0,129.0,4.0,18.0,4.0,76.0,363.0,1.0,3.0,29.0,1.0,53.0,91.0,99.0,661.0,36.0,63.0,1.0,26.0,4.0,1.0,46.0,2.0,40.0,2.0,122.0,56.0,32.0,98.0,18.0,11.0,319.0,40.0,49.0,32.0,199.0,291.0,1.0,45.0,24.0,19.0,562.0,52.0,38.0,18.0,25.0,616.0,586.0,58.0,48.0,41.0,1.0,25.0,137.0,108.0,60.0,28.0,32.0,7.0,227.0])


In [23]:
from pyspark.mllib.feature import IDF
hidf = IDF()
hidf = hidf.fit(tfDocs)
tfIdfDocs = hidf.transform(tfDocs)
print tfIdfDocs.first()

featureAndLabelData = labels.zip(tfIdfDocs)

print featureAndLabelData.first()


(1048576,[8148,19142,30136,60275,71269,82263,93257,104251,115245,134386,145380,156374,167368,178362,189356,208501,219495,230489,241483,252477,263471,274465,282612,293606,304600,315594,337582,356727,367721,375612,378715,389709,400703,411697,422691,430838,441832,463820,485808,496802,504953,515947,523838,537935,548929,559923,570917,579064,590058,612046,623040,634034,653179,664173,686161,697155,708149,738284,741131,760272,771266,782260,793254,812399,834387,845381,856375,867369,886510,897504,908498,919492,930486,935534,941480,960625,971619,982613,993607,1004601,1034736,1045730],[16.4658488032,19.2724426797,17.6080522841,2.40695283093,2.16087114383,9.94711346486,17.1408894998,6.35426667785,7.3834102957,2.78122806646,3.70155620884,8.33127780896,2.4206340085,5.33682207546,10.4731668274,2.66025648347,3.26337720704,8.36405165195,8.73492091711,0.709481482023,1.51175012391,4.61188227504,1.89015822685,2.55376484612,3.73589158668,16.6292090246,4.16021885638,3.43605254593,3.00747370688,5.76025282114,

In [24]:
from pyspark.mllib.regression import LabeledPoint
inputData = featureAndLabelData.map(lambda (c,v): LabeledPoint(c,v))

In [25]:
training, testing, validation = inputData.randomSplit([0.9,0.1,0.1], 5)
training.cache()
print training.count()

16320


In [26]:
from pyspark.mllib.classification import NaiveBayes, NaiveBayesModel
# Train a naive Bayes model.
model = NaiveBayes.train(training, 1.0)

# Make prediction and test accuracy.
#predictionAndLabel = testing.map(lambda p : (model.predict(p.features), p.label))
#accuracy = 1.0 * predictionAndLabel.filter(lambda (x, v): x == v).count() / testing.count()

In [27]:
predictionAndLabel = testing.map(lambda p : (model.predict(p.features), p.label))
accuracy = 1.0 * predictionAndLabel.filter(lambda (x, v): x == v).count() / testing.count()
print accuracy

0.0577777777778
