## Part 2 : DATA ANALYTICS PIPELINE USING APACHE SPARK
### 1. Tejas Dhrangadharia(tejassha)
### 2. Karan Nisar(karankir)

In [1]:
# Import packages
import pyspark
from pyspark.ml import Pipeline
from pyspark.ml.classification import *
from pyspark.ml.feature import *
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import *
from pyspark.sql.types import *

sc=pyspark.SparkContext()
spark=pyspark.SQLContext(sc)

## 2. Data Cleaning

In [367]:
Topics=["Business","Movies","Sports","Politics"]
schema = StructType([StructField('Keywords', StringType(), False),StructField('category', StringType(), True)])
df=spark.createDataFrame(sc.emptyRDD(), schema)

for i in Topics:
    data=sc.wholeTextFiles(i)
    data=data.values()
    data=data.flatMap(lambda x: x.split(" "))
    data=spark.createDataFrame(data,StringType())
    data=data.select(regexp_replace(col("value"),"[^aA-zZ]"," ")).withColumnRenamed("regexp_replace(value, [^aA-zZ],  )","Keywords")
    data=data.filter("Keywords!=''")
    data=data.withColumn("category",lit(i))
    # dropping null values
    data=data.dropna()
    df=df.union(data)

## 3. Feature Engineering

In [368]:
# Tokenizing 
inx_regex=RegexTokenizer(inputCol="Keywords",outputCol="words")

# Remove Stop words
remover=StopWordsRemover(inputCol="words",outputCol="cleanwords")

# TF-IDF 
h=HashingTF(inputCol="cleanwords",outputCol="filtered")
i=IDF(inputCol="filtered",outputCol="features")

string=StringIndexer(inputCol="category",outputCol="label")

In [369]:
# Build a pipeline
pipe=Pipeline(stages=[inx_regex,remover,h,i,string])

In [370]:
inx=pipe.fit(df).transform(df)

In [371]:
# Split data into train(70%) and test(30%)
(train,test)=inx.randomSplit([0.7,0.3])

## 4. Multi-class Classification

## Logistic Regression

In [372]:
# Fit logistic regression model
lr=LogisticRegression(featuresCol="features",labelCol="label",maxIter=20)
lx=lr.fit(train)

In [373]:
# Test the model using test data
ly=lx.transform(test)

In [374]:
evaluator=MulticlassClassificationEvaluator()

In [375]:
# Evaluate the model
evaluator.evaluate(ly)

0.7156237232501345

### Accuracy: 71.56%

## Naive Bayes

In [376]:
# Tokenizing
inx_regex=RegexTokenizer(inputCol="Keywords",outputCol="words")

# Remove Stop words
remover=StopWordsRemover(inputCol="words",outputCol="cleanwords")

# TF-IDF 
h=HashingTF(inputCol="cleanwords",outputCol="filtered")
i=IDF(inputCol="filtered",outputCol="features")
string=StringIndexer(inputCol="category",outputCol="label")

# Build a pipeline
pipe=Pipeline(stages=[inx_regex,remover,h,i,string])
inx=pipe.fit(df).transform(df)

# Split data into train(70%) and test(30%)
(train,test)=inx.randomSplit([0.7,0.3])

In [377]:
# Fit logistic regression model
nb=NaiveBayes(smoothing=1.0)
mod=nb.fit(train)

In [379]:
# Test the model using test data
model = mod.transform(test)

In [380]:
# Evaluate the model
evaluator=MulticlassClassificationEvaluator()
evaluator.evaluate(model)

0.7260387782876818

### Accuracy: 72.60%

## 5. Testing

In [457]:
schema = StructType([StructField('Keywords', StringType(), False),StructField('category', StringType(), True)])
Directory=["Business","Movies","Politics","Sports"]
df=spark.createDataFrame(sc.emptyRDD(), schema)

for i in Directory:
    data=sc.wholeTextFiles(i)
    data=data.values()
    data=data.flatMap(lambda x: x.split(" "))
    data=spark.createDataFrame(data,StringType())
    data=data.select(regexp_replace(col("value"),"[^aA-zZ]"," ")).withColumnRenamed("regexp_replace(value, [^aA-zZ],  )","Keywords")
    data=data.filter("Keywords!=''")
    data=data.withColumn("category",lit(i))
    # dropping null values
    data=data.dropna()
    df=df.union(data)

In [458]:
inx=pipe.fit(df).transform(df)

### Logistic Regression

In [459]:
ly=lx.transform(inx)
evaluator=MulticlassClassificationEvaluator()
evaluator.evaluate(ly)

0.5500619233315466

### Accuracy: 55.00%

### Naive Bayes

In [460]:
model = mod.transform(inx)
evaluator=MulticlassClassificationEvaluator()
evaluator.evaluate(model)

0.5572045986833497

### Accuracy: 55.72%