# Jimmy Huang Jia Yi Chen 

In [1]:
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
import random
import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords
import string
import math
import os

[nltk_data] Downloading package stopwords to /Users/admin/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [2]:
sc = SparkContext.getOrCreate()

In [3]:
# Get all the files from our category directories with the spark wholeTextFiles function

techFiles = sc.wholeTextFiles(os.getcwd() + '/Articles/technology/')
businessFiles = sc.wholeTextFiles(os.getcwd() + '/Articles/business/')
sportsFiles = sc.wholeTextFiles(os.getcwd() + '/Articles/sports/')
politicsFiles = sc.wholeTextFiles(os.getcwd() + '/Articles/politics/')

In [4]:
print(techFiles)
sortedTechFiles = sorted(techFiles.collect())
sortedBusinessFiles = sorted(businessFiles.collect())
sortedSportsFiles = sorted(sportsFiles.collect())
sortedPoliticsFiles = sorted(politicsFiles.collect())
print(type(sortedPoliticsFiles))

org.apache.spark.api.java.JavaPairRDD@1945a535
<class 'list'>


In [5]:
print(sortedTechFiles[0][1])
print(type(sortedTechFiles[0][1]))

class="story-body-text story-content" data-para-count="424" data-total-count="424">Scientists searching for a toxic strain of E. coli that has raced across 25 states, sickening 121 people and <a href="https://www.nytimes.com/2018/05/02/health/e-coli-romaine-update.html">killing one</a>, have been able to identify the general source as the Yuma, Ariz., growing region. But as the outbreak enters its second month, they still cannot find the contamination itself — it could be lurking in the area’s fields, water sources, harvesting equipment, processing plants or distribution centers.</p>,class="story-body-text story-content" data-para-count="346" data-total-count="770">Federal officials predict that the outbreak, linked to <a href="https://www.nytimes.com/2018/04/19/well/eat/romaine-lettuce-salad-food-poisoning-e-coli.html" title="consumer advice">romaine lettuce</a>, will continue for several weeks. It is the largest American E. coli flare-up since 2006, <a href="https://www.cdc.gov/ecoli

In [6]:
rand_tech_training = [sortedTechFiles[i] for i in (random.sample(range(len(sortedTechFiles)), math.ceil(len(sortedTechFiles) * 0.8)))]
rand_tech_testing = [x for x in sortedTechFiles if x not in rand_tech_training]

rand_business_training = [sortedBusinessFiles[i] for i in (random.sample(range(len(sortedBusinessFiles)), math.ceil(len(sortedBusinessFiles) * 0.8)))]
rand_business_testing = [x for x in sortedBusinessFiles if x not in rand_business_training]

rand_sports_training = [sortedSportsFiles[i] for i in (random.sample(range(len(sortedSportsFiles)), math.ceil(len(sortedSportsFiles) * 0.8)))]
rand_sports_testing = [x for x in sortedSportsFiles if x not in rand_sports_training]

rand_politics_training = [sortedPoliticsFiles[i] for i in (random.sample(range(len(sortedPoliticsFiles)), math.ceil(len(sortedPoliticsFiles) * 0.8)))]
rand_politics_testing = [x for x in sortedPoliticsFiles if x not in rand_politics_training]

In [7]:
stops = set(stopwords.words('english'))

In [8]:
def cleanedFiles(file):
    cleaned = ''
    for paragraph in file[1].split('<p'):
        # create a new paragraph that removes all the punctuations
        new_paragraph = [word.lower().strip().translate(word.maketrans('', '', string.punctuation)) 
        # Remove all words that contain carrots to remove tags
        for word in paragraph.split() if '<' not in word and '=' not in word and '>' not in word and 'story-content' not in word] 
        # Remove all words that are not words(non-alphabetic) or if they are in the stop words set
        cleaned_paragraph = [(word + " ") for word in new_paragraph if word.isalpha() and word not in stops]
        # Save each paragraph in the format we want in the paragraph file
        cleaned = cleaned + (''.join(cleaned_paragraph)) + " "
    return (file[0], cleaned)

In [9]:
# Clean the articles individually and save them in a list of tuples : cleanedCategoryArticles [(title, content)]
cleanedTechTrainArticles = [cleanedFiles(file) for file in rand_tech_training]
cleanedTechTestArticles = [cleanedFiles(file) for file in rand_tech_testing]

cleanedBusinessTrainArticles = [cleanedFiles(file) for file in rand_business_training]
cleanedBusinessTestArticles = [cleanedFiles(file) for file in rand_business_testing]

cleanedSportsTrainArticles = [cleanedFiles(file) for file in rand_sports_training]
cleanedSportsTestArticles = [cleanedFiles(file) for file in rand_sports_testing]

cleanedPoliticsTrainArticles = [cleanedFiles(file) for file in rand_politics_training]
cleanedPoliticsTestArticles = [cleanedFiles(file) for file in rand_politics_testing]

In [10]:
for f in cleanedTechTrainArticles:
    print('----------------------------------------------------------------------------------------------------------------')
    print(f[0])
    print('----------------------------------------------------------------------------------------------------------------')
    print(f[1])

----------------------------------------------------------------------------------------------------------------
file:/Users/admin/Lab3/Articles/technology/57.txt
----------------------------------------------------------------------------------------------------------------
snap inc fell short wall street forecasts revenue regular users tuesday redesign snapchat messaging app turned longtime fans advertisers sending shares tumbling growth likely slow substantially second quarter company said showing still faces uphill battle app overhaul meant fend bigger rival facebook inc adds snapchatlike number daily active users snapchat crucial generating advertising revenue rose million quarter ended march short consensus expectations million according thomson reuters figure percent higher year earlier compared growth percent previous quarter shares plunged percent afterhours trading extending slide stock since february shares surged percent feb topping snaps ipo price first time months hopes r

In [11]:
allCleanedTrainingArticles = [cleanedTechTrainArticles, cleanedBusinessTrainArticles, cleanedSportsTrainArticles, cleanedPoliticsTrainArticles]
for articleList in allCleanedTrainingArticles:
    for article in articleList:
        print(article[0])
allCleanedTestingArticles = [cleanedTechTestArticles, cleanedBusinessTestArticles, cleanedSportsTestArticles, cleanedPoliticsTestArticles]
for articleTestList in allCleanedTestingArticles:
    for testArticle in articleTestList:
        print(testArticle[0])

file:/Users/admin/Lab3/Articles/technology/57.txt
file:/Users/admin/Lab3/Articles/technology/15.txt
file:/Users/admin/Lab3/Articles/technology/51.txt
file:/Users/admin/Lab3/Articles/technology/21.txt
file:/Users/admin/Lab3/Articles/technology/24.txt
file:/Users/admin/Lab3/Articles/technology/56.txt
file:/Users/admin/Lab3/Articles/technology/16.txt
file:/Users/admin/Lab3/Articles/technology/14.txt
file:/Users/admin/Lab3/Articles/technology/17.txt
file:/Users/admin/Lab3/Articles/technology/2.txt
file:/Users/admin/Lab3/Articles/technology/32.txt
file:/Users/admin/Lab3/Articles/technology/63.txt
file:/Users/admin/Lab3/Articles/technology/48.txt
file:/Users/admin/Lab3/Articles/technology/46.txt
file:/Users/admin/Lab3/Articles/technology/30.txt
file:/Users/admin/Lab3/Articles/technology/60.txt
file:/Users/admin/Lab3/Articles/technology/11.txt
file:/Users/admin/Lab3/Articles/technology/62.txt
file:/Users/admin/Lab3/Articles/technology/22.txt
file:/Users/admin/Lab3/Articles/technology/7.txt
fi

In [12]:
toBecomeTrainDF = []
toBecomeTestDF = []

categoryMap = {
    'business' : 1, 
    'politics' : 2, 
    'sports' : 3, 
    'technology' : 4
}

for articleList in allCleanedTrainingArticles:
    for index, article in enumerate(articleList):
        theCategory = article[0].split('/')[-2]
        toBecomeTrainDF.append((article[0].split('/')[-2]+article[0].split('/')[-1], categoryMap.get(theCategory) , article[1]))
print(len(toBecomeTrainDF))
for articleList in allCleanedTestingArticles:
    for index, article in enumerate(articleList):
        testCategory = article[0].split('/')[-2]
        toBecomeTestDF.append((article[0].split('/')[-2]+article[0].split('/')[-1], categoryMap.get(testCategory) , article[1]))
print(len(toBecomeTestDF))

171
42


In [27]:
#https://spark.apache.org/docs/2.1.0/ml-features.html

sqlContext = SQLContext(sc)
trainingDataFrame = sqlContext.createDataFrame(toBecomeTrainDF
, ["id", "label" , "article"])

tokenizer = Tokenizer(inputCol="article", outputCol="words")
wordsData = tokenizer.transform(trainingDataFrame)

hashingTF = HashingTF(numFeatures=5000, inputCol="words", outputCol="rawFeatures")
featurizedTrainData = hashingTF.transform(wordsData)

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedTrainData)
rescaledTrainIDFData = idfModel.transform(featurizedTrainData)


rescaledTrainIDFData.select("id", "label", 'words', 'rawFeatures' ,"features").show(1000)

+----------------+-----+--------------------+--------------------+--------------------+
|              id|label|               words|         rawFeatures|            features|
+----------------+-----+--------------------+--------------------+--------------------+
|technology57.txt|    4|[snap, inc, fell,...|(5000,[7,45,142,1...|(5000,[7,45,142,1...|
|technology15.txt|    4|[years, later, pr...|(5000,[1,6,25,31,...|(5000,[1,6,25,31,...|
|technology51.txt|    4|[electronic, arts...|(5000,[8,32,157,2...|(5000,[8,32,157,2...|
|technology21.txt|    4|[wednesday, witne...|(5000,[19,32,59,6...|(5000,[19,32,59,6...|
|technology24.txt|    4|[scott, pruitt, h...|(5000,[1,15,60,72...|(5000,[1,15,60,72...|
|technology56.txt|    4|[akamai, technolo...|(5000,[2,8,26,56,...|(5000,[2,8,26,56,...|
|technology16.txt|    4|[summer, engineer...|(5000,[1,15,24,26...|(5000,[1,15,24,26...|
|technology14.txt|    4|[ever, wanted, ta...|(5000,[26,77,81,9...|(5000,[26,77,81,9...|
|technology17.txt|    4|[pretty,

In [28]:
#https://spark.apache.org/docs/2.1.0/ml-features.html

sqlContext = SQLContext(sc)
testingDataFrame = sqlContext.createDataFrame(toBecomeTestDF
, ["id", "label" , "article"])

tokenizer = Tokenizer(inputCol="article", outputCol="words")
wordsData = tokenizer.transform(testingDataFrame)

hashingTF = HashingTF(numFeatures=5000, inputCol="words", outputCol="rawFeatures")
featurizedTestData = hashingTF.transform(wordsData)

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedTestData)
rescaledTestIDFData = idfModel.transform(featurizedTestData)


rescaledTestIDFData.select("id", "label", 'words', 'rawFeatures' ,"features").show(1000)

+----------------+-----+--------------------+--------------------+--------------------+
|              id|label|               words|         rawFeatures|            features|
+----------------+-----+--------------------+--------------------+--------------------+
|technology10.txt|    4|[night, early, ja...|(5000,[0,1,2,8,9,...|(5000,[0,1,2,8,9,...|
|technology18.txt|    4|[fall, russian, b...|(5000,[1,4,9,15,3...|(5000,[1,4,9,15,3...|
|technology19.txt|    4|[phone, reading, ...|(5000,[26,34,48,6...|(5000,[26,34,48,6...|
|technology27.txt|    4|[early, spring, l...|(5000,[1,5,9,15,2...|(5000,[1,5,9,15,2...|
|technology33.txt|    4|[years, environme...|(5000,[1,6,9,15,1...|(5000,[1,6,9,15,1...|
|technology34.txt|    4|[ranh, bay, vietn...|(5000,[1,87,91,12...|(5000,[1,87,91,12...|
|technology35.txt|    4|[thursday, trump,...|(5000,[1,2,7,8,15...|(5000,[1,2,7,8,15...|
|technology36.txt|    4|[switzerland, big...|(5000,[23,34,39,6...|(5000,[23,34,39,6...|
|technology39.txt|    4|[b, whit

In [29]:
from pyspark.ml.classification import LogisticRegression

# regPara: lasso regularisation parameter (L1)
lr = LogisticRegression(maxIter = 100, regParam = 0.5).fit(rescaledTrainIDFData)
rescaledLRData = lr.transform(rescaledTestIDFData)

from pyspark.ml.evaluation import BinaryClassificationEvaluator

rescaledLRData.select('id', 'label', 'words', 'rawFeatures' ,'features', 'probability', 'prediction').show(1000)

+----------------+-----+--------------------+--------------------+--------------------+--------------------+----------+
|              id|label|               words|         rawFeatures|            features|         probability|prediction|
+----------------+-----+--------------------+--------------------+--------------------+--------------------+----------+
|technology10.txt|    4|[night, early, ja...|(5000,[0,1,2,8,9,...|(5000,[0,1,2,8,9,...|[1.06918249667609...|       1.0|
|technology18.txt|    4|[fall, russian, b...|(5000,[1,4,9,15,3...|(5000,[1,4,9,15,3...|[7.12597373226932...|       1.0|
|technology19.txt|    4|[phone, reading, ...|(5000,[26,34,48,6...|(5000,[26,34,48,6...|[0.00219081077762...|       3.0|
|technology27.txt|    4|[early, spring, l...|(5000,[1,5,9,15,2...|(5000,[1,5,9,15,2...|[4.76000302219730...|       3.0|
|technology33.txt|    4|[years, environme...|(5000,[1,6,9,15,1...|(5000,[1,6,9,15,1...|[2.79986644294424...|       1.0|
|technology34.txt|    4|[ranh, bay, viet

In [30]:
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier
 
dt = DecisionTreeClassifier(maxDepth = 15).fit(rescaledTrainIDFData)
rescaledDTData = dt.transform(rescaledTestIDFData)
rescaledDTData.select('id', 'label', 'words' ,'features', 'probability', 'prediction').show(1000)

+----------------+-----+--------------------+--------------------+--------------------+----------+
|              id|label|               words|            features|         probability|prediction|
+----------------+-----+--------------------+--------------------+--------------------+----------+
|technology10.txt|    4|[night, early, ja...|(5000,[0,1,2,8,9,...|[0.0,0.0,0.0,0.0,...|       4.0|
|technology18.txt|    4|[fall, russian, b...|(5000,[1,4,9,15,3...|[0.0,1.0,0.0,0.0,...|       1.0|
|technology19.txt|    4|[phone, reading, ...|(5000,[26,34,48,6...|[0.0,0.0,0.0,0.0,...|       4.0|
|technology27.txt|    4|[early, spring, l...|(5000,[1,5,9,15,2...|[0.0,0.0,0.0,1.0,...|       3.0|
|technology33.txt|    4|[years, environme...|(5000,[1,6,9,15,1...|[0.0,0.0,0.0,1.0,...|       3.0|
|technology34.txt|    4|[ranh, bay, vietn...|(5000,[1,87,91,12...|[0.0,0.0,0.0,0.0,...|       4.0|
|technology35.txt|    4|[thursday, trump,...|(5000,[1,2,7,8,15...|[0.0,1.0,0.0,0.0,...|       1.0|
|technolog

In [31]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(rescaledDTData)
print("Test Accuray = %g " % (accuracy))

Test Accuray = 0.452381 


In [32]:
rf = RandomForestClassifier(numTrees = 150).fit(rescaledTrainIDFData)
rescaledRFData = rf.transform(rescaledTestIDFData)
rescaledRFData.select('id', 'label', 'words','features', 'probability', 'prediction').show(1000)

+----------------+-----+--------------------+--------------------+--------------------+----------+
|              id|label|               words|            features|         probability|prediction|
+----------------+-----+--------------------+--------------------+--------------------+----------+
|technology10.txt|    4|[night, early, ja...|(5000,[0,1,2,8,9,...|[0.0,0.3822824249...|       1.0|
|technology18.txt|    4|[fall, russian, b...|(5000,[1,4,9,15,3...|[0.0,0.4882876085...|       1.0|
|technology19.txt|    4|[phone, reading, ...|(5000,[26,34,48,6...|[0.0,0.1651518686...|       4.0|
|technology27.txt|    4|[early, spring, l...|(5000,[1,5,9,15,2...|[0.0,0.3513821698...|       1.0|
|technology33.txt|    4|[years, environme...|(5000,[1,6,9,15,1...|[0.0,0.3061553561...|       2.0|
|technology34.txt|    4|[ranh, bay, vietn...|(5000,[1,87,91,12...|[0.0,0.1668035490...|       2.0|
|technology35.txt|    4|[thursday, trump,...|(5000,[1,2,7,8,15...|[0.0,0.6243600648...|       1.0|
|technolog

In [33]:
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(rescaledRFData)
print("Test Accuracy = %g " % (accuracy))

Test Accuracy = 0.52381 
