In [33]:
# import libraries
import findspark
findspark.init()
from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import HashingTF, IDF
#import featureExtractor.tfidf as tfidf
from functools import reduce
from nltk.corpus import stopwords

sc = SparkContext()
sqlContext = SQLContext(sc)

def readData(dataPath):
    #input: file path 
    #output: returns a spark dataframe 
    data = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load(dataPath)
    print("Data Read successfully ...")
    drop_list = ['url']
    
    data = data.select([column for column in data.columns if column not in drop_list])
    return data.dropna()



def tfidf(regexTokenizer, stopwordsRemover, label_stringIdx):
    #returns pipeline using tfidf 
    hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=10000)
    idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
    return Pipeline(stages=[regexTokenizer, stopwordsRemover, hashingTF, idf, label_stringIdx])

def countVectorizer(regexTokenizer, stopwordsRemover, label_stringIdx):
    # bag of words count
    countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)
    label_stringIdx = StringIndexer(inputCol = "Category", outputCol = "label")
    return Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])   


def get_stopwords():
    stop_words = set(stopwords.words('english'))
    special = ['ms','mr','http','https','amp', 'none', 'i’m', 'th','don’t','it’s','advertisement']
    with open('seostopwords.txt') as f:
        stopLines = f.read().splitlines()

    dump = [stop_words.add(i) for i in special]
    dump = [stop_words.add(i) for i in stopLines]
    return list(stop_words)
    
def classify(featureExtractor, classifier):
    #input : 
        #1)feature extractor (count vectorizer or tfidf)
        #2) classifier algorithm (random forest, naive bayes or linear reg)
        
    # training :80% , testing 10% , validation 10%
    
    
    #Part 1:  Combine data from all sources 
    data_politics = readData('politics.csv')
    data_sports = readData('sports.csv')
    data_business = readData('business.csv')
    data_movies = readData('movies.csv')
    
    
    data_merged = data_politics.union(data_sports)
    data_merged = data_merged.union(data_business)
    data_merged = data_merged.union(data_movies)
    
    
    data = data_merged.withColumnRenamed("category", "Category").withColumnRenamed("content", "Descript")
    print("The total number of articles from 4 categories is " , data.count())
        
    # Part 2 -> build regex tokenizer 
    regexTokenizer = RegexTokenizer(inputCol="Descript", outputCol="words", pattern="\\W")
    
    # stop words ---------- (3)
    add_stopwords = get_stopwords()
    #print(add_stopwords)
    
    # Part 3 -> build stop word tokenizer 
    stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered",stopWords=add_stopwords)
    #print(stopwordsRemover.transform())
    label_stringIdx = StringIndexer(inputCol = "Category", outputCol = "label")

    # Part 4 -> build pipeline 
    if featureExtractor=="cv":
        pipeline = countVectorizer(regexTokenizer, stopwordsRemover, label_stringIdx)
    elif featureExtractor=="tfidf":
        pipeline = tfidf(regexTokenizer, stopwordsRemover, label_stringIdx)
    print("Pipeline constructed successfully..")

    # Fit the pipeline to the model.
    pipelineFit = pipeline.fit(data)
    
    dataset = pipelineFit.transform(data)
    # Part 5 -> split data and classify
    (trainingData, validationData, testData) = dataset.randomSplit([0.8, 0.1, 0.1], seed = 100)
    
    
    print("Training model..")
    if classifier=="lr":
        selectedModel = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
    elif classifier=="nb":
        selectedModel = NaiveBayes(smoothing=1)
    elif classifier=="rf":
        selectedModel = RandomForestClassifier(labelCol="label", \
                            featuresCol="features", \
                            numTrees = 100, \
                            maxDepth = 20, \
                            maxBins = 32)

    model = selectedModel.fit(trainingData)
    print("Model Trained..")
    
    # part 6 -> predict and test model
    print("Test model..")
    predictions = model.transform(validationData)
    predictions.filter(predictions['prediction'] == 0) \
        .select("Descript","Category","probability","label","prediction") \
        .orderBy("probability", ascending=False)

    print("Evaluate model..")
    evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
    accuracy = evaluator.evaluate(predictions)
    print("Accuracy: "+str(accuracy))

    

In [34]:
def main():
    print("Please select method for feature extraction. Type\ncv for count vectorizer\ntfidf for term \
frequency inverse document frequency")
    fe_choice = input()
    print("\nPlese enter algorithm for classification. Type\nlr for linear regression\nnb for naive bayes\n\
rf for random forest")
    algo_choice = input()
    classify(fe_choice,algo_choice)
    sc.stop()
    

In [35]:
main()

Please select method for feature extraction. Type
cv for count vectorizer
tfidf for term frequency inverse document frequency
tfidf

Plese enter algorithm for classification. Type
lr for linear regression
nb for naive bayes
rf for random forest
nb
Data Read successfully ...
Data Read successfully ...
Data Read successfully ...
Data Read successfully ...
The total number of articles from 4 categories is  243
Pipeline constructed successfully..
Training model..
Model Trained..
Test model..
Evaluate model..
Accuracy: 0.6915151515151515


In [109]:
    cv    tfidf
lr 0.631  0.733
nb 0.654  0.692
rf 0.737  0.732