## Analysis on Titanic Data:

In [592]:
import time
import pyspark
import os
import csv
from numpy import array
from pyspark.mllib.regression import LabeledPoint
from pyspark import SparkContext, SparkConf
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer, OneHotEncoder, VectorAssembler, IndexToString
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import *

conf = pyspark.SparkConf()
# sc = pyspark.SparkContext(conf=conf) //RUN THIS IF YOUR SPARK CONTEXT IS NOT CREATED
conf.getAll()

# Reading from the hdfs, removing the header
trainTitanic= sc.textFile("train.csv")
trainHeader = trainTitanic.first()
trainTitanic = trainTitanic.filter(lambda line: line != trainHeader).mapPartitions(lambda x: csv.reader(x))
trainTitanic.first()
 
# Data preprocessing
def sexTransformMapper(elem):
    '''Function which transform "male" into 1 and else things into 0
    - elem : string
    - return : vector
    '''
     
    if elem == 'male' :
        return [0]
    else :
        return [1]
 
# Data Transformations and filter lines with empty strings
trainTitanic=trainTitanic.map(lambda line: line[1:3]+sexTransformMapper(line[4])+line[5:11])
trainTitanic=trainTitanic.filter(lambda line: line[3] != '' ).filter(lambda line: line[4] != '' )
trainTitanic.take(10)
 
# creating "labeled point" rdds specific to MLlib "(label (v1, v2...vp])"
trainTitanicLP=trainTitanic.map(lambda line: LabeledPoint(line[0],[line[1:5]]))
trainTitanicLP.first()
 
# splitting dataset into train and test set
(trainData, testData) = trainTitanicLP.randomSplit([0.7, 0.3])
 
# Random forest : same parameters as sklearn (?)
from pyspark.mllib.tree import RandomForest
 
time_start=time.time()
model_rf = RandomForest.trainClassifier(trainData, numClasses = 2,
        categoricalFeaturesInfo = {}, numTrees = 100,
        featureSubsetStrategy='auto', impurity='gini', maxDepth=12,
        maxBins=32, seed=None)
 
  
model_rf.numTrees()
model_rf.totalNumNodes()
time_end=time.time()
time_rf=(time_end - time_start)
print("RF takes %d s" %(time_rf))
 
# Predictions on test set
predictions = model_rf.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
 
# first metrics
from pyspark.mllib.evaluation import BinaryClassificationMetrics
metrics = BinaryClassificationMetrics(labelsAndPredictions)
 
# Area under precision-recall curve
print("Area under PR = %s" % metrics.areaUnderPR)
 
# Area under ROC curve
print("Area under ROC = %s" % metrics.areaUnderROC)


# Creatingt Spark SQL environment
from pyspark.sql import SparkSession, HiveContext
SparkContext.setSystemProperty("hive.metastore.uris", "thrift://nn1:9083")
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
 
# spark is an existing SparkSession
train = spark.read.csv("train.csv", header = True)
# Displays the content of the DataFrame to stdout
train.show(10)
 
# String to float on some columns of the dataset : creates a new dataset
train = train.select(col("Survived"),col("Sex"),col("Embarked"),col("Pclass").cast("float"),col("Age").cast("float"),col("SibSp").cast("float"),col("Fare").cast("float"))
 
# dropping null values
train = train.dropna()
 
# Spliting in train and test set. Beware : It sorts the dataset
(traindf, testdf) = train.randomSplit([0.7,0.3])












# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
genderIndexer = StringIndexer(inputCol="Sex", outputCol="indexedSex")
embarkIndexer = StringIndexer(inputCol="Embarked", outputCol="indexedEmbarked")
 
surviveIndexer = StringIndexer(inputCol="Survived", outputCol="indexedSurvived")
 
# One Hot Encoder on indexed features
genderEncoder = OneHotEncoder(inputCol="indexedSex", outputCol="sexVec")
embarkEncoder = OneHotEncoder(inputCol="indexedEmbarked", outputCol="embarkedVec")
 
# Create the vector structured data (label,features(vector))
assembler = VectorAssembler(inputCols=["Pclass","sexVec","Age","SibSp","Fare","embarkedVec"],outputCol="features")
 
# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedSurvived", featuresCol="features")
 
# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[surviveIndexer, genderIndexer, embarkIndexer, genderEncoder,embarkEncoder, assembler, rf]) # genderIndexer,embarkIndexer,genderEncoder,embarkEncoder,
 
# Train model.  This also runs the indexers.
model = pipeline.fit(traindf)
 
# Predictions
predictions = model.transform(testdf)
 
# Select example rows to display.
predictions.columns 
 
# Select example rows to display.
predictions.select("prediction", "Survived", "features").show(5)
 
# Select (prediction, true label) and compute test error
predictions = predictions.select(col("Survived").cast("Float"),col("prediction"))
evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
 
rfModel = model.stages[6]
print(rfModel)  # summary only
 
evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % accuracy)
 
evaluatorf1 = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="f1")
f1 = evaluatorf1.evaluate(predictions)
print("f1 = %g" % f1)
 
evaluatorwp = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="weightedPrecision")
wp = evaluatorwp.evaluate(predictions)
print("weightedPrecision = %g" % wp)
 
evaluatorwr = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="weightedRecall")
wr = evaluatorwr.evaluate(predictions)
print("weightedRecall = %g" % wr)
 
# close sparkcontext
sc.stop()


RF takes 3 s
Area under PR = 0.5752939929838485
Area under ROC = 0.7584256138661533
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|  22|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|  38|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|  26|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|  35|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|  35|    0|    0|          373450|   8.05| null|      

## Data Flow Explanation: We collected our articles from NYT using the scipt at the bottom. Our categories are Politics, Food, Business, and Sports. We used the method wholeTextFiles to import our articles into Spark. Once our data for all four categories are in Spark we call our main function which returns the data as a string for each category. We then create a dataframe for each category and appended the category. We then merge the dataframes creating a master dataframe. In the master dataframe, we remove the regx, remove the stopwords, calculate the TF IDF. The output is the feature vector. This gets submitted to the selected ML models and we print our accuracy. 

### Import statements for code

In [571]:
from pyspark import SparkConf, SparkContext
from operator import add
from operator import add
import re
import sys
import numpy as np
import pyspark
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.classification import *
from pyspark.ml.feature import *
from pyspark.ml import *
from pyspark.ml.evaluation import *

## Original remove Regx and Stop word remover - These methods are not used.

In [572]:
## Constants
APP_NAME = "Lab 3"
##OTHER FUNCTIONS/CLASSES
def func(iterator):
    global_list=[]
    with open("stopWords_2.txt", "r") as ins:
        stopWords = []
        for line in ins:
            line=re.sub('[^A-Za-z0-9]+', '', line)
            line=line.lower()
            stopWords.append(line)
    print(stopWords)
    for x in iterator:
        list=x.split()
        for word in list:
            word = re.sub('[^A-Za-z0-9]+', '', word)
            if word.lower() not in stopWords:
                global_list.append((word,1))
    return global_list

def regex(iterator):
   # global_list=[]
    for x,y in iterator:
        y=re.sub(r'[^\x00-\x7F]+',' ', y)
    return iterator

## Main method returns a list of words for each category.

In [573]:
def main(sc,data):    
   dataMerge = data.reduceByKey(lambda x,y:x+y)
   dataList = dataMerge.values().collect()
   return dataList

## Method for removing stop words.
## Method for calculating word count.
## Spark Confoguration declerations, import data from file destinations, and call the main method.

In [574]:
def preprocess(raw_text):
    #extract words from rdd.
    dirtyWords = dataAdd.collect

    # keep only words
    letters_only_text = re.sub("[^a-zA-Z]", " ", dirtyWords)

    # convert to lower case and split 
    words = letters_only_text.lower().split()

    # remove stopwords
    stopword_set = set(stopwords.words("english"))
    meaningful_words = [w for w in words if w not in stopword_set]

    # join the cleaned words in a list
    cleaned_word_list = " ".join(meaningful_words)

    return cleaned_word_list

def wordCount(data):
    #words = textRDD.flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1))
    #wordcount = data.reduceByKey(add).collect()
    for wc in data:
      print('aman1\n',wordcount)

if __name__ == "__main__":

   # Configure Spark
   conf = SparkConf().setAppName(APP_NAME)
   conf = conf.setMaster("local[*]")
#    sc   = SparkContext(conf=conf)
   # Execute Main functionality
   databond = sc.wholeTextFiles("finaldata/datafinal1/business/")
   dataobama = sc.wholeTextFiles("finaldata/datafinal1/food/")
   datatravel = sc.wholeTextFiles("finaldata/datafinal1/politics/")
   datayankees = sc.wholeTextFiles("finaldata/datafinal1/sports/")

   aman=main(sc,databond)
   aman1=main(sc,dataobama)
   aman2=main(sc,datatravel)
   aman3=main(sc,datayankees)


## Create dataframes for each category and append the category.

In [575]:
df=spark.createDataFrame(aman,StringType()).na.drop()
df=df.withColumn("category",lit(('business')))

In [576]:
df1=spark.createDataFrame(aman1,StringType()).na.drop()
df1=df1.withColumn("category",lit(('food')))

In [577]:
df2=spark.createDataFrame(aman2,StringType()).na.drop()
df2=df2.withColumn("category",lit(('politics')))

In [578]:
df3=spark.createDataFrame(aman3,StringType()).na.drop()
df3=df3.withColumn("category",lit(('sports')))

## Merge the dataframes.

In [596]:
from functools import reduce  # For Python 3.x
from pyspark.sql import DataFrame

def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)

df4 =unionAll(df,df1,df2,df3)

## Remove the regx, stopwords, and add category labels.

In [583]:
regToken = RegexTokenizer(inputCol="value", outputCol="words", pattern="\\W")
stopwords = StopWordsRemover(inputCol="words", outputCol="filtered")
labels = StringIndexer(inputCol = "category", outputCol = "label")

## Calculate the TF IDF assign it to rawFeatures. Create the pipeline and fit it.

In [584]:
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=5000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
pipeline = Pipeline(stages=[regToken, stopwords, hashingTF, idf, labels])
pipelineFit = pipeline.fit(df4)
dataset = pipelineFit.transform(df4)
dataset.show()

+--------------------+--------+--------------------+--------------------+--------------------+--------------------+-----+
|               value|category|               words|            filtered|         rawFeatures|            features|label|
+--------------------+--------+--------------------+--------------------+--------------------+--------------------+-----+
|AdvertisementAdve...|business|[advertisementadv...|[advertisementadv...|(5000,[7,26,29,34...|(5000,[7,26,29,34...|  0.0|
|AdvertisementAdve...|business|[advertisementadv...|[advertisementadv...|(5000,[20,24,32,5...|(5000,[20,24,32,5...|  0.0|
|AdvertisementAdve...|business|[advertisementadv...|[advertisementadv...|(5000,[15,21,24,9...|(5000,[15,21,24,9...|  0.0|
|AdvertisementAdve...|business|[advertisementadv...|[advertisementadv...|(5000,[0,87,125,1...|(5000,[0,87,125,1...|  0.0|
|AdvertisementAdve...|business|[advertisementadv...|[advertisementadv...|(5000,[1,10,69,70...|(5000,[1,10,69,70...|  0.0|
|AdvertisementAdve...|bu

## Split the data between train and test set. Run the logistic regression model.

In [585]:
(trainingData, testData) = dataset.randomSplit([0.75, 0.25], seed = 10)
(trainingDatafinal, validationData) = trainingData.randomSplit([0.7, 0.23], seed = 10)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))
print("validation Dataset Count: " + str(validationData.count()))
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)
predictions = lrModel.transform(testData)

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

Training Dataset Count: 294
Test Dataset Count: 106
validation Dataset Count: 74


In [586]:
print("The accuracy is :"+str(evaluator.evaluate(predictions)*100))

The accuracy is :73.71977105960539


## Logistic regression on train and validation data for the testing of the random articles.

In [587]:
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingDatafinal)
predictions = lrModel.transform(validationData)

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

In [588]:
print("The accuracy is :"+str(evaluator.evaluate(predictions)*100))

The accuracy is :71.38129921987651


## Random Forest on test and train data.

In [589]:
rf = RandomForestClassifier(labelCol="label", \
                            featuresCol="features", \
                            numTrees = 50, \
                            maxDepth = 4, \
                            maxBins = 32)
# Train model with Training Data
rfModel = rf.fit(trainingData)
predictions = rfModel.transform(testData)

In [590]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
print("The accuracy is :"+str(evaluator.evaluate(predictions)*100))

The accuracy is :78.50253060509209


## New York Times Scraping Script

In [None]:
import os.path
import requests
import json
import time
from newspaper import Article
from bs4 import BeautifulSoup

MY_API_KEY = '2a58a1936b8c465c994b5f77c73a68b8'

save_path = 'C:/Users/aman/Desktop/datafinal/sports/'

session = requests.Session()

def get_articles(api_key,date,category):
    url = 'http://api.nytimes.com/svc/search/v2/articlesearch.json'
    url += '?q=%s&begin_date=%s&api-key=%s&page=9' % (category,date,api_key)
    #print(url)
    response = requests.get(url)
    data = response.json()
    #data = json.loads(response.text)
    articles = data['response']['docs']
    i=81
    for article1 in articles:
        test = "testfile"
        test +='%s'%(i)
        i=i+1
        print(article1['web_url'])
        req = session.get(article1['web_url'])
        soup = BeautifulSoup(req.text,'lxml')
        paragraphs = soup.find_all('p')
        article = paragraphs[0].get_text()
#         article12=Article(article1['web_url'])
#         article12.download()
#         article12.parse()
#         article12.nlp()
        for p in paragraphs:
            article = article + p.get_text()
#         a=article12.text
        name_of_file = test
        completeName = os.path.join(save_path, name_of_file+".txt")         
        file1 = open(completeName, "w",encoding="utf-8")
        file1.write(article)
        file1.close()

get_articles(MY_API_KEY,"20161001","sports")