In [1]:
import pyspark

In [2]:
import re
from nltk.stem import PorterStemmer

In [3]:
from pyspark.ml.feature import *

In [4]:
f = open('stopwords.txt','r')
stop = f.read().split(',')

In [5]:
global stopwords
stopwords = {}
for word in stop:
    stopwords[word] = True

In [6]:
sc = pyspark.SparkContext.getOrCreate()

# load text files from multiple folders as an rdd

In [7]:
text_file = sc.textFile("data/Sports,data/Politics,data/Science,data/Business")

In [8]:
def stopWords(word):
    stemmer = PorterStemmer()
    try:
        if stopwords[word]:
            return False #this word should be abandoned when return true
    except KeyError:
        return True

In [9]:
def splitwords(line):
    words = line.split(" ")
    returnwords = []
    stemmer = PorterStemmer()
    for word in words:
        if(re.match('http.*',word)):
                continue
        word = ''.join(re.findall(r'[0-9a-zA-Z]*', word))
        if(len(word)>10):
            continue
        if(word.isdigit()):
            continue
        word = word.lower()
        word = stemmer.stem(word)
        returnwords.append(word)
    return returnwords

In [10]:
def mapper(word):
    return (word,1)

## compute the word count for all articals and extract the top 1000 words as features

In [11]:
counts = text_file.flatMap(splitwords).filter(stopWords).map(mapper).reduceByKey(lambda a, b: a + b).\
takeOrdered(1000, key=lambda x: -x[1])

In [12]:
global feature
feature = {}
for i in range(len(counts)):
    feature[counts[i][0]] = i

# read files into pandas dataframe

In [13]:
import os
import pandas as pd
import numpy as np

In [14]:
label_list = os.listdir("data")

In [15]:
def textdf(list_, folder,label):
    t = []
    for i in range(len(list_)):
        filename = "data/"+folder + "/" + list_[i]
        f = open(filename,"r",encoding = "utf-8")
        t.append(f.read())
    df = pd.DataFrame({"content":t, "label":[label]*len(t)})
    return df

In [16]:
label_list  

['Politics', 'Sports', 'Science', 'Business']

### get labels from the folder names

In [17]:
label = label_list[0]
file_list = os.listdir(f"data/{label}")
Politics_df = textdf(file_list,label,0)

In [18]:
label = label_list[1]
file_list = os.listdir(f"data/{label}")
Sports_df = textdf(file_list,label,1)

label = label_list[2]
file_list = os.listdir(f"data/{label}")
Science_df = textdf(file_list,label,2)

label = label_list[3]
file_list = os.listdir(f"data/{label}")
Business_df = textdf(file_list,label,3)

### randomly seperate data into train set and test set

In [52]:
resultdf = Science_df.append([Business_df, Politics_df, Sports_df],ignore_index=True)

In [74]:
resultdf = resultdf.reindex(np.random.permutation(resultdf.index))

In [75]:
twenty = round(len(resultdf)/5)
train_set = resultdf[0:(twenty*3)]
test_set = resultdf[(twenty*3):(twenty*4)]
cv_set = resultdf[(twenty*4):]

## transform pandas dataframe into spark dataframe

In [76]:
from pyspark.sql import SQLContext
sqlCtx = SQLContext(sc)

In [77]:
df_train = sqlCtx.createDataFrame(train_set)

In [78]:
df_test = sqlCtx.createDataFrame(test_set)

In [79]:
df_cv = sqlCtx.createDataFrame(cv_set)

In [31]:
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import SparseVector
from pyspark.mllib.classification import LogisticRegressionWithLBFGS

## for words in one artical, once a word in feature appears, the feature vector +=1

In [32]:
def feature_ab(line):
    f = [0]*1000
    text = line[0]
    label = line[1]
    stemmer = PorterStemmer()
    words = text.lower().split(" ")
    for i in words:
        i = stemmer.stem(i)
        try:
            num = feature[i]
            f[num] +=1 #or just = 1
        except KeyError:
            continue
    return LabeledPoint(label,f)

In [80]:
train_data = df_train.rdd.map(feature_ab)

# Logistic regression

In [81]:
lrm = LogisticRegressionWithLBFGS.train(train_data, iterations=100, numClasses=4)

In [82]:
labelsAndPreds = train_data.map(lambda p: (p.label, lrm.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda lp: lp[0] != lp[1]).count() / float(train_data.count())
print("trainning Accuracy = " + str((1-trainErr)*100)+"%")

trainning Accuracy = 98.34162520729684%


In [83]:
test_data = df_test.rdd.map(feature_ab)

In [84]:
labelsAndPreds_test = test_data.map(lambda p: (p.label, lrm.predict(p.features)))
testErr = labelsAndPreds_test.filter(lambda lp: lp[0] != lp[1]).count() / float(test_data.count())
print("testing Accuracy = " + str((1-testErr)*100)+"%")

testing Accuracy = 93.47014925373134%


In [85]:
cross_validation = df_cv.rdd.map(feature_ab)

In [86]:
labelsAndPreds_cv = cross_validation.map(lambda p: (p.label, lrm.predict(p.features)))
cvErr = labelsAndPreds_cv.filter(lambda lp: lp[0] != lp[1]).count() / float(cross_validation.count())
print("Cross Validation Accuracy = " + str((1-cvErr)*100)+"%")

Cross Validation Accuracy = 94.5273631840796%


# Naive Bayes

In [65]:
from pyspark.mllib.classification import NaiveBayes

In [87]:
model = NaiveBayes.train(train_data, 1.0)

In [88]:
labelsAndPreds_bayes = train_data.map(lambda p: (p.label, model.predict(p.features)))
trainErr = labelsAndPreds_bayes.filter(lambda lp: lp[0] != lp[1]).count() / float(train_data.count())
print("trainning Accuracy = " + str((1-trainErr)*100)+"%")

trainning Accuracy = 95.93698175787728%


In [89]:
labelsAndPreds_test_bayes = test_data.map(lambda p: (p.label, model.predict(p.features)))
testErr = labelsAndPreds_test_bayes.filter(lambda lp: lp[0] != lp[1]).count() / float(test_data.count())
print("testing Accuracy = " + str((1-testErr)*100)+"%")

testing Accuracy = 95.1492537313433%


In [90]:
labelsAndPreds_cv_bayes = cross_validation.map(lambda p: (p.label, model.predict(p.features)))
cvErr = labelsAndPreds_cv_bayes.filter(lambda lp: lp[0] != lp[1]).count() / float(cross_validation.count())
print("Cross Validation Accuracy = " + str((1-cvErr)*100)+"%")

Cross Validation Accuracy = 96.01990049751244%
