# Question 1 Part 1: Building an Explicit Movie Recommendation System with Spark MLlib

## Import required libraries

In [1]:
import subprocess
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

spark = SparkSession \
    .builder \
    .appName("recommendation_explicit") \
    .config("spark.som.config.option", "some-value") \
    .getOrCreate()

## Download/Unzip the MovieLens 1M dataset from http://grouplens.org/datasets/movielens

In [2]:
subprocess.call(["wget", "http://files.grouplens.org/datasets/movielens/ml-1m.zip"])
subprocess.call(["unzip", "ml-1m.zip"])

1

## Read and Convert ratings data to a DataFrame

In [3]:
lines = spark.read.text("./ml-1m/ratings.dat").rdd
parts = lines.map(lambda row: row.value.split("::"))
ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                     rating=float(p[2]), timestamp=int(p[3])))
ratings = spark.createDataFrame(ratingsRDD)

## Show the number of ratings in the dataset

In [4]:
print("Number of ratings = " + str(ratings.count()))

Number of ratings = 1000209


## Show a sample of the Ratings DataFrame

In [5]:
ratings.sample(False, 0.0001, seed=0).show(10)

+-------+------+---------+------+
|movieId|rating|timestamp|userId|
+-------+------+---------+------+
|   2908|   5.0|977895809|    68|
|   3730|   5.0|978554445|   173|
|   2917|   2.0|976301830|   456|
|    589|   4.0|976161565|   526|
|   2348|   3.0|976207524|   533|
|   1285|   4.0|979154572|   588|
|   1206|   4.0|980628867|   711|
|   3361|   4.0|975510209|   730|
|   3203|   5.0|975435824|   779|
|   1196|   4.0|975356701|   843|
+-------+------+---------+------+
only showing top 10 rows



## Show sample number of ratings per user

In [6]:
grouped_ratings = ratings.groupBy("userId").count().withColumnRenamed("count", "No. of ratings")
grouped_ratings.show(10)

+------+--------------+
|userId|No. of ratings|
+------+--------------+
|    26|           400|
|    29|           108|
|   474|           318|
|   964|            78|
|  1677|            43|
|  1697|           354|
|  1806|           214|
|  1950|           137|
|  2040|            46|
|  2214|            81|
+------+--------------+
only showing top 10 rows



## Show the number of users in the dataset

In [7]:
print("Number of users = " + str(grouped_ratings.count()))

Number of users = 6040


## Split Ratings data into Training (80%) and Test (20%) datasets

In [8]:
(training, test) = ratings.randomSplit([0.8, 0.2])


## Show resulting Ratings dataset counts

In [9]:
trainingRatio = float(training.count())/float(ratings.count())*100
testRatio = float(test.count())/float(ratings.count())*100

print("Total number of ratings = " + str(ratings.count()))
print("Training dataset count = " + str(training.count()) + ", " + str(trainingRatio) + "%")
print("Test dataset count = " + str(test.count()) + ", " + str(testRatio) + "%")

Total number of ratings = 1000209
Training dataset count = 800880, 80.07126510559293%
Test dataset count = 199329, 19.928734894407068%


## Build the recommendation model on the training data using ALS-explicit

In [10]:
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(training)

## Run the model against the Test data and show a sample of the predictions

In [11]:
predictions = model.transform(test).na.drop()
predictions.show(10)

+-------+------+---------+------+----------+
|movieId|rating|timestamp|userId|prediction|
+-------+------+---------+------+----------+
|    148|   1.0|976295338|   840| 2.9349167|
|    148|   2.0|974875106|  1150| 2.9894443|
|    148|   2.0|974178993|  2456| 3.9975448|
|    463|   5.0|968916009|  3151|  3.967182|
|    463|   3.0|963746396|  4858| 2.0730953|
|    463|   4.0|973625620|  2629| 3.1774714|
|    463|   1.0|966523740|  3683| 1.1212827|
|    463|   2.0|966790403|  3562|  2.780132|
|    463|   4.0|975775726|   721| 3.3978982|
|    463|   3.0|965308300|  4252| 0.9944763|
+-------+------+---------+------+----------+
only showing top 10 rows



## Evaluate the model by computing the RMSE on the test data

In [12]:
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 0.8908929362860674


## Show that a smaller value of rmse is better
This is obviously the case since RMSE is an aggregation of all the error. Thus evaluator.isLargerBetter should be 'false'.

In [13]:
evaluator.isLargerBetter()

False

## Make movie recommendations

In [14]:
# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)

## Show sample recommendations per user

In [15]:
userRecs.sample(False, 0.01).show(10, False)

+------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                                                                                               |
+------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|148   |[[1780,7.2854385], [1369,6.99533], [666,6.6703053], [2892,6.5549903], [1741,6.528875], [3523,6.07751], [572,6.003775], [2127,5.859668], [1164,5.6353364], [649,5.5918784]]    |
|5173  |[[3245,7.7563887], [1038,7.52281], [3867,7.2047706], [632,7.0838833], [37,7.0073814], [751,6.936385], [1369,6.471981], [645,6.453275], [1664,6.23118], [1543,6.188328]]       |
|5695  |[[1458,9.663776], [3855,9.074218], [3106,9.053921], [2837,9.043263], [21

## Show sample recommendations per user

In [16]:
movieRecs.sample(False, 0.01).show(10, False)

+-------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|movieId|recommendations                                                                                                                                                                |
+-------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|3844   |[[1213,7.3201046], [2441,6.9640417], [5297,6.8789372], [2549,6.8698826], [2816,6.507644], [1971,6.458085], [2160,6.4162674], [3915,6.402381], [4544,6.17197], [2560,6.119645]] |
|1031   |[[1070,5.9382234], [4143,5.8492775], [3897,5.841146], [2755,5.6947303], [4282,5.6827908], [527,5.6089225], [1728,5.5674863], [5052,5.52997], [5983,5.419548], [1459,5.4131107]]|
|26     |[[1213,7.0531287], [2640,6.3756685], [879,6.1351347], [2502,6

# Question 1 Part 2: Building an Implicit Music Recommendation System with Spark MLlib

## Import required libraries

In [2]:
import subprocess
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.recommendation import ALSModel
from pyspark.sql import Row

spark = SparkSession \
    .builder \
    .appName("recommendation_implicit") \
    .config("spark.som.config.option", "some-value") \
    .getOrCreate()

## Download/Unzip Audioscrobbler dataset from http://www.iro.umontreal.ca/~lisa/datasets/profiledata_06-May-2005.tar.gz

In [2]:
# subprocess.call(["wget", "http://www.iro.umontreal.ca/~lisa/datasets/profiledata_06-May-2005.tar.gz"])

## Read and Convert ratings data to a DataFrame

In [3]:
lines = spark.read.text("./profiledata_06-May-2005/user_artist_data.txt").rdd
parts = lines.map(lambda row: row.value.split(" ")).filter(lambda line: line!=None)
userArtistRDD = parts.map(lambda p: Row(userId=int(p[0]), artistId=int(p[1]),
                                     count=int(p[2])))
userArtist = spark.createDataFrame(userArtistRDD)

## Show the number of userArtist in the dataset

In [6]:
print("Number of userArtist = " + str(userArtist.count()))

Number of userArtist = 24296858


## Show a sample of the userArtist DataFrame

In [4]:
userArtist.sample(False, 0.0001, seed=23).show(10)

+--------+-----+-------+
|artistId|count| userId|
+--------+-----+-------+
| 1054292|    1|1000127|
| 1033246|    1|1000215|
|    1269|   13|1000357|
|     630|    1|1000410|
| 1000428|    2|1000657|
| 1234327|    1|1000911|
|      45|   34|1000923|
|     969|    4|1000927|
|    1235|    1|1000928|
|    3950|    2|1001009|
+--------+-----+-------+
only showing top 10 rows



## Split userArtist data into Training (80%) and Test (20%) datasets

In [3]:
(training, test) = userArtist.randomSplit([0.8, 0.2])

## Show resulting userArtist dataset counts

In [9]:
trainingRatio = float(training.count())/float(userArtist.count())*100
testRatio = float(test.count())/float(userArtist.count())*100

print("Total number of userArtist = " + str(userArtist.count()))
print("Training dataset count = " + str(training.count()) + ", " + str(trainingRatio) + "%")
print("Test dataset count = " + str(test.count()) + ", " + str(testRatio) + "%")

## Build the recommendation model on the training data using ALS-implicit

In [7]:
als = ALS(maxIter=5, regParam=0.01, implicitPrefs=True, userCol="userId", itemCol="artistId", ratingCol="count")
model = als.fit(training)

## Save and load model

In [4]:
# from pyspark.mllib.recommendation import MatrixFactorizationModel
# model.save("hdfs://localhost:9000/user/vibrioh/implicit_model")
sameModel = ALSModel.load("hdfs://localhost:9000/user/vibrioh/implicit_model")

## Run the model against the Test data and show a sample of the predictions

In [5]:
predictions = sameModel.transform(test).na.drop()
predictions.show(10)

+--------+-----+-------+----------+
|artistId|count| userId|prediction|
+--------+-----+-------+----------+
|     463|    1|1000117| 1.0815843|
|     463|    1|1000221| 0.3066332|
|     463|    1|1000401|0.41309264|
|     463|    1|1000463|0.92721707|
|     463|    1|1000614|0.66710955|
|     463|    1|1000745| 0.5759305|
|     463|    1|1000771|0.10025656|
|     463|    1|1000920|0.14627631|
|     463|    1|1001192| 0.4186052|
|     463|    1|1001239|0.11049299|
+--------+-----+-------+----------+
only showing top 10 rows



## Show recommendations high and low

In [6]:
predictions.registerTempTable("predictions")
spark.sql("SELECT * FROM predictions ORDER BY prediction DESC").show()
spark.sql("SELECT * FROM predictions ORDER BY prediction").show()

+--------+-----+-------+----------+
|artistId|count| userId|prediction|
+--------+-----+-------+----------+
|     393|   44|1044648|  2.048578|
| 1002862|    3|1000764| 2.0438032|
| 1245208|   16|1077252| 1.9122567|
|    1457|    3|1052461| 1.8336266|
|    4538|    1|1044648| 1.8334882|
| 1003361|    8|1052461| 1.7632964|
| 1105069|   12|1045876| 1.7556672|
|     670|   22|2058707| 1.7466245|
| 1034635|  208|1038380| 1.7438686|
| 1043348|   56|1021501| 1.7395341|
| 1003133|    3|1044648| 1.7320846|
| 1299851|   55|1007308| 1.7286341|
| 1296257|   27|1007308| 1.7270842|
| 1034635|   54|1047668| 1.7216785|
| 1031984|    3|1038826|  1.719785|
| 1001169|    9|1072865| 1.7158957|
|    2842|    3|1077252| 1.7109416|
|    2017|  128|2089146|  1.698533|
| 1012494|    5|1070844| 1.6947658|
| 1034635|    4|1001562| 1.6939092|
+--------+-----+-------+----------+
only showing top 20 rows



+--------+-----+-------+-----------+
|artistId|count| userId| prediction|
+--------+-----+-------+-----------+
|     606|    1|1072273| -1.1034482|
| 1062984|    5|1052461| -1.0605614|
| 1002751|    1|2005325| -0.9165062|
| 1000660|    1|2114152|-0.82818604|
| 1000270|    1|1070177| -0.7904455|
| 1001428|    1|2019216|-0.78354007|
|    4569|    4|1072273| -0.7598609|
| 1010728|   22|1054893| -0.7471355|
| 1006594|    3|2231283| -0.7382127|
| 1020783|    6|2200013| -0.6836228|
|    1400|    4|2287446| -0.6611168|
| 1003458|    1|1003897| -0.6341311|
| 1007027|    8|1000647|  -0.631236|
| 1003084|    1|1063655|-0.62958777|
|    2138|    1|2147892| -0.6157164|
|    1179|    2|1055807| -0.6106846|
|    1223|    1|2269169| -0.6081734|
| 1000418|    1|1020855| -0.5983083|
| 1002451|    2|2216293| -0.5928024|
| 1027267|    1|2200013|-0.58894813|
+--------+-----+-------+-----------+
only showing top 20 rows



# Question 2 Part 1: Clustering on News Articles

## Download news articles from: https://www.kaggle.com/asad1m9a9h6mood/news-articles
### This Dataset is scraped from https://www.thenews.com.pk website. It has news articles from 2015 till date related to business and sports. It Contains the Heading of the particular Article, Its content and its date. The content also contains the place from where the statement or Article was published.

## Import required libraries

In [6]:
from pyspark.ml import Pipeline
from pyspark.ml.clustering import BisectingKMeans, KMeans, GaussianMixture
from pyspark.ml.feature import HashingTF, Tokenizer, NGram, IDF, StopWordsRemover
from pyspark.sql import Row
import os
import pandas as pd
from pyspark.sql import SparkSession


spark = SparkSession \
    .builder \
    .appName("news_clustering") \
    .config("spark.som.config.option", "some-value") \
    .getOrCreate()

## Explore the dataset in Pandas

In [7]:
data_df = pd.read_csv("./Articles.csv",sep=",", encoding = "ISO-8859-1")
data_df.head()

Unnamed: 0,Article,Date,Heading,NewsType
0,KARACHI: The Sindh government has decided to b...,1/1/2015,sindh govt decides to cut public transport far...,business
1,HONG KONG: Asian markets started 2015 on an up...,1/2/2015,asia stocks up in new year trad,business
2,HONG KONG: Hong Kong shares opened 0.66 perce...,1/5/2015,hong kong stocks open 0.66 percent lower,business
3,HONG KONG: Asian markets tumbled Tuesday follo...,1/6/2015,asian stocks sink euro near nine year,business
4,NEW YORK: US oil prices Monday slipped below $...,1/6/2015,us oil prices slip below 50 a barr,business


In [8]:
data_df.describe()

Unnamed: 0,Article,Date,Heading,NewsType
count,2692,2692,2692,2692
unique,2584,666,2581,2
top,strong>TOKYO: Tokyo stocks climbed in early tr...,8/1/2016,Tokyo stocks open lower after BoJ under,sports
freq,5,27,5,1408


In [9]:
data = spark.createDataFrame(data_df)
data.printSchema()
data.sample(False, 0.05).show(5)

root
 |-- Article: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Heading: string (nullable = true)
 |-- NewsType: string (nullable = true)



+--------------------+---------+--------------------+--------+
|             Article|     Date|             Heading|NewsType|
+--------------------+---------+--------------------+--------+
|HONG KONG: Asian ...| 1/6/2015|asian stocks sink...|business|
|ISLAMABAD: The Na...|1/23/2015|nepra prevents k ...|business|
|ISLAMABAD: Pakist...|1/26/2015|pakistan fuel cri...|business|
|ISLAMABAD: Federa...| 3/4/2015|pact with k elect...|business|
|London: Oil price...| 3/6/2015|oil prices rise b...|business|
+--------------------+---------+--------------------+--------+
only showing top 5 rows



## Split data into Training (80%) and Test (20%) datasets

In [10]:
(training, test) = data.randomSplit([0.8, 0.2])

## Configure an ML pipeline

In [12]:
tokenizer = Tokenizer(inputCol="Article", outputCol="words")
remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(),outputCol="filtered")   
hashingTF = HashingTF(inputCol=remover.getOutputCol(), outputCol="features",numFeatures=1048576)

## Clustering by K-MEANS

In [13]:
kmeans = KMeans().setK(2).setSeed(1)
pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, kmeans])
model = pipeline.fit(training)

## Make predictions on test and print interested columns of different clusters

In [20]:
predictions = model.transform(test)
predictions.registerTempTable("predictions")
spark.sql("SELECT Article, NewsType, prediction FROM predictions WHERE NewsType = 'business'").show(10)
spark.sql("SELECT Article, NewsType, prediction FROM predictions WHERE NewsType = 'sports'").show(10)

+--------------------+--------+----------+
|             Article|NewsType|prediction|
+--------------------+--------+----------+
|A major rally in ...|business|         1|
|ATLANTA: Twelve P...|business|         1|
|BEIJING: Pakistan...|business|         0|
|Brussels: The EU ...|business|         0|
|DUBAI: Talks betw...|business|         0|
|HONG KONG: Hong K...|business|         0|
|Hong Kong: Asian ...|business|         1|
|Hong Kong: Asian ...|business|         1|
|Hong Kong: Asian ...|business|         1|
|Hong Kong: Asian ...|business|         1|
+--------------------+--------+----------+
only showing top 10 rows



+--------------------+--------+----------+
|             Article|NewsType|prediction|
+--------------------+--------+----------+
|AUCKLAND: Martin ...|  sports|         0|
|Australia win run...|  sports|         0|
|CAPE TOWN: Alex H...|  sports|         0|
|CAPE TOWN: Captai...|  sports|         0|
|CAPE TOWN: Poor w...|  sports|         0|
|CAPE TOWN: Tiny T...|  sports|         0|
|DHAKA: Bangladesh...|  sports|         0|
|DHAKA: Bangladesh...|  sports|         0|
|DHAKA: Captain of...|  sports|         0|
|DHAKA: Hasan Mohs...|  sports|         0|
+--------------------+--------+----------+
only showing top 10 rows



# Question 2 Part 2: Clustering on Wikipedia articles

## Import required libraries

In [51]:
from pyspark.ml import Pipeline
from pyspark.ml.clustering import BisectingKMeans, KMeans, GaussianMixture
from pyspark.ml.feature import HashingTF, Tokenizer, NGram, IDF, StopWordsRemover
from pyspark.sql import Row
import os
import pandas as pd
from pyspark.sql import SparkSession
import subprocess
import json


spark = SparkSession \
    .builder \
    .appName("wiki_clustering") \
    .config("spark.som.config.option", "some-value") \
    .getOrCreate()

## Download/Unzip https://dumps.wikimedia.org/enwiki/20170920/enwiki-20170920-pages-articles14.xml-p7697599p7744799.bz2

In [52]:
#subprocess.call(["wget", "https://dumps.wikimedia.org/enwiki/20170920/enwiki-20170920-pages-articles14.xml-p7697599p7744799.bz2"])

## Parse xml to json using WikiExtractorw (https://github.com/attardi/wikiextractor)

In [53]:
# subprocess.call(["python3", "WikiExtractor.py", "-o wiki_extracted", "--json", "-b 230M", "/Users/vibrioh/Downloads/enwiki-20170920-pages-articles14.xml-p7697599p7744799.bz2"])

## Explore the dataset in Pandas

In [54]:
with open('/Users/vibrioh/local_projects/spark/ wiki_extracted/AA/wiki_00', encoding='utf-8') as f:
    data_json = []
    for line in f:
        data_json.append(json.loads(line))
pd_df = pd.DataFrame(data_json)
pd_df.head()

Unnamed: 0,id,text,title,url
0,7697605,Konica Minolta Cup\n\nKonica Minolta Cup may r...,Konica Minolta Cup,https://en.wikipedia.org/wiki?curid=7697605
1,7697611,Archer (typeface)\n\nArcher is a slab serif ty...,Archer (typeface),https://en.wikipedia.org/wiki?curid=7697611
2,7697612,Stockton Airport\n\nStockton Airport may refer...,Stockton Airport,https://en.wikipedia.org/wiki?curid=7697612
3,7697626,Ricky Minard\n\nRicky Donell Minard Jr. (born ...,Ricky Minard,https://en.wikipedia.org/wiki?curid=7697626
4,7697641,Alexander Peya\n\nAlexander Peya (born 27 June...,Alexander Peya,https://en.wikipedia.org/wiki?curid=7697641


In [55]:
pd_df.describe()

Unnamed: 0,id,text,title,url
count,4577,4577,4577,4577
unique,4577,4577,4577,4577
top,7736826,Lebe lauter\n\nLebe lauter () is the third stu...,Cyprus at the 1988 Winter Olympics,https://en.wikipedia.org/wiki?curid=7716931
freq,1,1,1,1


In [56]:
data = spark.createDataFrame(pd_df)
data.printSchema()
data.sample(False, 0.27).show(3)

root
 |-- id: string (nullable = true)
 |-- text: string (nullable = true)
 |-- title: string (nullable = true)
 |-- url: string (nullable = true)

+-------+--------------------+--------------------+--------------------+
|     id|                text|               title|                 url|
+-------+--------------------+--------------------+--------------------+
|7697612|Stockton Airport
...|    Stockton Airport|https://en.wikipe...|
|7697675|Lobo (wrestler)

...|     Lobo (wrestler)|https://en.wikipe...|
|7697715|Anti-submarine mi...|Anti-submarine mi...|https://en.wikipe...|
+-------+--------------------+--------------------+--------------------+
only showing top 3 rows



## Split data into Training (80%) and Test (20%) datasets

In [57]:
(training, test) = data.randomSplit([0.8, 0.2])

## Configure an ML pipeline

In [58]:
tokenizer = Tokenizer(inputCol="text", outputCol="words")
remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(),outputCol="filtered")   
hashingTF = HashingTF(inputCol=remover.getOutputCol(), outputCol="features",numFeatures=350)

## Clustering by K-MEANS

In [59]:
kmeans = KMeans().setK(2).setSeed(1)
pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, kmeans])
model = pipeline.fit(data)

## Make predictions on test and print interested columns of different clusters

In [61]:
predictions = model.transform(data)
predictions.registerTempTable("predictions")
spark.sql("SELECT id, title, prediction FROM predictions WHERE prediction = '0'").show(10)
spark.sql("SELECT id, title, prediction FROM predictions WHERE prediction = '1'").show(10)
spark.sql("SELECT count(*) FROM predictions GROUP BY prediction").show()

+-------+--------------------+----------+
|     id|               title|prediction|
+-------+--------------------+----------+
|7697605|  Konica Minolta Cup|         0|
|7697611|   Archer (typeface)|         0|
|7697612|    Stockton Airport|         0|
|7697626|        Ricky Minard|         0|
|7697641|      Alexander Peya|         0|
|7697655|  Swiss chalet style|         0|
|7697664|European Federati...|         0|
|7697671|The Best Is Yet t...|         0|
|7697675|     Lobo (wrestler)|         0|
|7697715|Anti-submarine mi...|         0|
+-------+--------------------+----------+
only showing top 10 rows



+-------+--------------------+----------+
|     id|               title|prediction|
+-------+--------------------+----------+
|7698038|      Radamel Falcao|         1|
|7698053| Panjshir offensives|         1|
|7698941|Istanbul High School|         1|
|7699151|The Market for Li...|         1|
|7699200|     Parchís (group)|         1|
|7700918|            Manikata|         1|
|7701000|2007 Major League...|         1|
|7701470|World War II pers...|         1|
|7701711|      Luck by Chance|         1|
|7702313|       Yoga Vasistha|         1|
+-------+--------------------+----------+
only showing top 10 rows



+--------+
|count(1)|
+--------+
|     154|
|    4423|
+--------+



# Question 3 Part 1: Fist Dataset with Logistic Regression and NaiveBayes classification

## Import required labraries

In [100]:
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.clustering import BisectingKMeans, KMeans, GaussianMixture
from pyspark.ml.feature import HashingTF, Tokenizer, NGram, IDF, StopWordsRemover
import os
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression, NaiveBayes
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

spark = SparkSession \
    .builder \
    .appName("dataset1_classification") \
    .config("spark.som.config.option", "some-value") \
    .getOrCreate()

## Read and Vectorize the raw data

In [101]:
with open('/Users/vibrioh/local_projects/spark/datasets/ta/set1/pima-indians-diabetes.data', encoding='utf-8') as f:
    col1 = []
    col2 = []
    for line in f:
        parts = line.split(',')
        label = float(parts[len(parts)-1])
        features = Vectors.dense([float(parts[x]) for x in range(0,len(parts)-1)])
        col1.append(label)
        col2.append(features)
    dict = {"label": col1, "features": col2}
pd_d1 = pd.DataFrame(dict)
pd_d1.head()

Unnamed: 0,features,label
0,"[6.0, 148.0, 72.0, 35.0, 0.0, 33.6, 0.627, 50.0]",1.0
1,"[1.0, 85.0, 66.0, 29.0, 0.0, 26.6, 0.351, 31.0]",0.0
2,"[8.0, 183.0, 64.0, 0.0, 0.0, 23.3, 0.672, 32.0]",1.0
3,"[1.0, 89.0, 66.0, 23.0, 94.0, 28.1, 0.167, 21.0]",0.0
4,"[0.0, 137.0, 40.0, 35.0, 168.0, 43.1, 2.288, 3...",1.0


In [102]:
data1 = spark.createDataFrame(pd_d1)
data1.printSchema()
data1.sample(False, 0.27).show(3)

root
 |-- features: vector (nullable = true)
 |-- label: double (nullable = true)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[8.0,183.0,64.0,0...|  1.0|
|[1.0,89.0,66.0,23...|  0.0|
|[10.0,115.0,0.0,0...|  0.0|
+--------------------+-----+
only showing top 3 rows



## Split data into Training (80%) and Test (20%) datasets

In [103]:
(training, test) = data1.randomSplit([0.8, 0.2], seed=23)

## Train Logistic Regression and Naive Bayes models

In [104]:
# create the trainer and set its parameters
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the model
lrModel = lr.fit(training)

# create the trainer and set its parameters
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

# train the model
nbModel = nb.fit(training)

## Display the predictions

In [106]:
lrPredictions = lrModel.transform(test)
print("Logistic Regression classifier predictions")
lrPredictions.show()
nbPredictions = nbModel.transform(test)
print("Naive Bayes classifier predictions")
nbPredictions.show()

Logistic Regression classifier predictions
+--------------------+-----+--------------------+-----------+----------+
|            features|label|       rawPrediction|probability|prediction|
+--------------------+-----+--------------------+-----------+----------+
|[0.0,118.0,84.0,4...|  1.0|[0.61903920840622...|[0.65,0.35]|       0.0|
|[0.0,119.0,64.0,1...|  0.0|[0.61903920840622...|[0.65,0.35]|       0.0|
|[0.0,125.0,96.0,0...|  0.0|[0.61903920840622...|[0.65,0.35]|       0.0|
|[0.0,146.0,82.0,0...|  0.0|[0.61903920840622...|[0.65,0.35]|       0.0|
|[1.0,0.0,74.0,20....|  0.0|[0.61903920840622...|[0.65,0.35]|       0.0|
|[1.0,89.0,66.0,23...|  0.0|[0.61903920840622...|[0.65,0.35]|       0.0|
|[1.0,95.0,66.0,13...|  0.0|[0.61903920840622...|[0.65,0.35]|       0.0|
|[1.0,101.0,50.0,1...|  0.0|[0.61903920840622...|[0.65,0.35]|       0.0|
|[1.0,109.0,56.0,2...|  0.0|[0.61903920840622...|[0.65,0.35]|       0.0|
|[1.0,115.0,70.0,3...|  1.0|[0.61903920840622...|[0.65,0.35]|       0.0|
|[1.0,12

## Evalue the modles with AUC and overall Accuracy

In [107]:
evaluator1 = BinaryClassificationEvaluator().setMetricName("areaUnderROC")
evaluator2 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
print ("Logistic Regression: \n", "\t Area under ROC curve (AUC):", evaluator1.evaluate(lrPredictions), "\n\t Accuracy: ", evaluator2.evaluate(lrPredictions))
print ("Naive Bayes: \n", "\t Area under ROC curve (AUC):", evaluator1.evaluate(nbPredictions), "\n\t Accuracy: ", evaluator2.evaluate(nbPredictions))

Logistic Regression: 
 	 Area under ROC curve (AUC): 0.5 
	 Accuracy:  0.6554054054054054


Naive Bayes: 
 	 Area under ROC curve (AUC): 0.29694764503739646 
	 Accuracy:  0.7027027027027027


## A simple comparison on the two modles's performance

### Overall Accuracy:

- Naive Bayes classifier carried out a higher overall accuracy (0.70 vs 0.66)
- Both modle can yield predictions accuracy than 0.5 of random
- So Naive Bayes made better classification over Logistic Regression on this training/test set

### Area under ROC curve (AUC):

- Logistic Regression classifier carried out a higher AUC (0.5 vs 0.3)
- Logistic Regression classifier has higher discriminative power over class distribution

### In summary, on the selected training/test set, Naive Bayes classifier has the better result. However, the Logistic Regression classifier may be more stable on other datasets.

# Question 3 Part 2: Second Dataset with Logistic Regression and NaiveBayes classification

## Import required labraries

In [108]:
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.clustering import BisectingKMeans, KMeans, GaussianMixture
from pyspark.ml.feature import HashingTF, Tokenizer, NGram, IDF, StopWordsRemover
import os
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression, NaiveBayes
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

spark = SparkSession \
    .builder \
    .appName("dataset2_classification") \
    .config("spark.som.config.option", "some-value") \
    .getOrCreate()

## Read and Vectorize the raw data

In [109]:
with open('/Users/vibrioh/local_projects/spark/datasets/ta/set2/australian.dat', encoding='utf-8') as f:
    col1 = []
    col2 = []
    for line in f:
        parts = line.split(' ')
        label = float(parts[len(parts)-1])
        features = Vectors.dense([float(parts[x]) for x in range(0,len(parts)-1)])
        col1.append(label)
        col2.append(features)
    dict = {"label": col1, "features": col2}
pd_d2 = pd.DataFrame(dict)
pd_d2.head()

Unnamed: 0,features,label
0,"[1.0, 22.08, 11.46, 2.0, 4.0, 4.0, 1.585, 0.0,...",0.0
1,"[0.0, 22.67, 7.0, 2.0, 8.0, 4.0, 0.165, 0.0, 0...",0.0
2,"[0.0, 29.58, 1.75, 1.0, 4.0, 4.0, 1.25, 0.0, 0...",0.0
3,"[0.0, 21.67, 11.5, 1.0, 5.0, 3.0, 0.0, 1.0, 1....",1.0
4,"[1.0, 20.17, 8.17, 2.0, 6.0, 4.0, 1.96, 1.0, 1...",1.0


In [110]:
data2 = spark.createDataFrame(pd_d2)
data2.printSchema()
data2.sample(False, 0.27).show(3)

root
 |-- features: vector (nullable = true)
 |-- label: double (nullable = true)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[0.0,21.67,11.5,1...|  1.0|
|[1.0,20.17,8.17,2...|  1.0|
|[0.0,15.83,0.585,...|  1.0|
+--------------------+-----+
only showing top 3 rows



## Split data into Training (80%) and Test (20%) datasets

In [111]:
(training, test) = data2.randomSplit([0.8, 0.2], seed=23)

## Train Logistic Regression and Naive Bayes models

In [112]:
# create the trainer and set its parameters
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the model
lrModel = lr.fit(training)

# create the trainer and set its parameters
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

# train the model
nbModel = nb.fit(training)

## Display the predictions

In [114]:
lrPredictions = lrModel.transform(test)
print("Logistic Regression classifier predictions")
lrPredictions.show()
nbPredictions = nbModel.transform(test)
print("Naive Bayes classifier predictions")
nbPredictions.show()

Logistic Regression classifier predictions
+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[0.0,20.42,10.5,1...|  0.0|[0.53467906597270...|[0.63057376704516...|       0.0|
|[0.0,20.75,10.25,...|  1.0|[-0.1767811960708...|[0.45591944020243...|       1.0|
|[0.0,20.75,10.335...|  1.0|[-0.1767811960708...|[0.45591944020243...|       1.0|
|[0.0,22.67,7.0,2....|  0.0|[0.53467906597270...|[0.63057376704516...|       0.0|
|[0.0,24.75,12.5,2...|  1.0|[-0.1767811960708...|[0.45591944020243...|       1.0|
|[0.0,30.67,12.0,2...|  1.0|[-0.1767811960708...|[0.45591944020243...|       1.0|
|[0.0,32.17,1.46,2...|  1.0|[-0.1767811960708...|[0.45591944020243...|       1.0|
|[0.0,35.75,0.915,...|  1.0|[-0.1767811960708...|[0.45591944020243...|       1.0|
|[0.0,38.92,1.665,...|  0.0|[0.53467906597270...|[0.630

## Evalue the modles with AUC and overall Accuracy

In [116]:
evaluator1 = BinaryClassificationEvaluator().setMetricName("areaUnderROC")
evaluator2 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
print ("Logistic Regression: \n", "\t Area under ROC curve (AUC):", evaluator1.evaluate(lrPredictions), "\n\t Accuracy: ", evaluator2.evaluate(lrPredictions))
print ("Naive Bayes: \n", "\t Area under ROC curve (AUC):", evaluator1.evaluate(nbPredictions), "\n\t Accuracy: ", evaluator2.evaluate(nbPredictions))

Logistic Regression: 
 	 Area under ROC curve (AUC): 0.8758116883116883 
	 Accuracy:  0.8646616541353384


Naive Bayes: 
 	 Area under ROC curve (AUC): 0.3773191094619667 
	 Accuracy:  0.631578947368421


## A simple comparison on the two modles's performance

### Overall Accuracy:

- Logistic Regression classifier carried out a higher overall accuracy (0.86 vs 0.63)
- Both modle can yield predictions accuracy than 0.5 of random
- So Logistic Regression made better classification over Naive Bayes on this training/test set

### Area under ROC curve (AUC):

- Logistic Regression classifier carried out a higher AUC (0.88 vs 0.38)
- Logistic Regression classifier has higher discriminative power over class distribution

### In summary, the Logistic Regression classifier has much better performance over the Naive Bayes, whatever on this perticular training/test set or potential future test datasets.

# Question 3 Part 3: Wikipedia Dataset with Logistic Regression and NaiveBayes classification

## Import required labraries

In [117]:
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.clustering import BisectingKMeans, KMeans, GaussianMixture
from pyspark.ml.feature import HashingTF, Tokenizer, NGram, IDF, StopWordsRemover
import os
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression, NaiveBayes
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

spark = SparkSession \
    .builder \
    .appName("wikipedia_classification") \
    .config("spark.som.config.option", "some-value") \
    .getOrCreate()

## Read the raw data and assigne lables with clustering results by K-MEANS

In [120]:
wikiDF = spark.sql("SELECT id, title, text, prediction as assignedLable FROM predictions")
#print schema
wikiDF.printSchema()
wikiDF.registerTempTable("wikiDF")
wikiDF.show(2)
spark.sql("SELECT count(*) FROM wikiDF GROUP BY assignedLable").show()

root
 |-- id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- text: string (nullable = true)
 |-- assignedLable: integer (nullable = true)

+-------+------------------+--------------------+-------------+
|     id|             title|                text|assignedLable|
+-------+------------------+--------------------+-------------+
|7697605|Konica Minolta Cup|Konica Minolta Cu...|            0|
|7697611| Archer (typeface)|Archer (typeface)...|            0|
+-------+------------------+--------------------+-------------+
only showing top 2 rows



+--------+
|count(1)|
+--------+
|     154|
|    4423|
+--------+



## Cast assigned lables to double and store to DataFrame

In [121]:
wikiDF = wikiDF.withColumn("label", wikiDF.assignedLable.cast("double"))
wikiDF.printSchema()
wikiDF.sample(False, 0.23).show(2)

root
 |-- id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- text: string (nullable = true)
 |-- assignedLable: integer (nullable = true)
 |-- label: double (nullable = true)

+-------+--------------------+--------------------+-------------+-----+
|     id|               title|                text|assignedLable|label|
+-------+--------------------+--------------------+-------------+-----+
|7697715|Anti-submarine mi...|Anti-submarine mi...|            0|  0.0|
|7697725|Northern river shark|Northern river sh...|            0|  0.0|
+-------+--------------------+--------------------+-------------+-----+
only showing top 2 rows



## Split data into Training (80%) and Test (20%) datasets

In [123]:
training, test = wikiDF.randomSplit([0.8, 0.2], 2388)
print ("Total document count:",wikiDF.count())
print ("Training-set count:",training.count())
print ("Test-set count:",test.count())

Total document count: 4577


Training-set count: 3645


Test-set count: 932


## Config the pipeines for Logistic Regression and Naive Bayes

In [124]:
tokenizer = Tokenizer().setInputCol("text").setOutputCol("words")
remover= StopWordsRemover().setInputCol("words").setOutputCol("filtered").setCaseSensitive(False)
hashingTF = HashingTF().setNumFeatures(1000).setInputCol("filtered").setOutputCol("rawFeatures")
idf = IDF().setInputCol("rawFeatures").setOutputCol("features").setMinDocFreq(0)
lr = LogisticRegression().setRegParam(0.01).setThreshold(0.5)
nb = NaiveBayes().setModelType("multinomial").setSmoothing(1.0)
lrPipeline=Pipeline(stages=[tokenizer, remover, hashingTF, idf, lr])
nbPipeline=Pipeline(stages=[tokenizer, remover, hashingTF, idf, nb])

## Train  Logistic Regression and Naive Bayes models

In [125]:
lrModel = lrPipeline.fit(training)
nbModel = nbPipeline.fit(training)

## Display predictions on the test data

In [127]:
lrPredictions = lrModel.transform(test)
lrPredictions.registerTempTable("lrPredictions")
print("Logistic Regression classifier predictions")
spark.sql("SELECT id, label, prediction FROM lrPredictions WHERE prediction = '0'").show(5)
spark.sql("SELECT id, label, prediction FROM lrPredictions WHERE prediction = '1'").show(5)
nbPredictions = nbModel.transform(test)
nbPredictions.registerTempTable("nbPredictions")
print("Naive Bayes classifier predictions")
spark.sql("SELECT id, label, prediction FROM nbPredictions WHERE prediction = '0'").show(5)
spark.sql("SELECT id, label, prediction FROM nbPredictions WHERE prediction = '1'").show(5)

Logistic Regression classifier predictions


+-------+-----+----------+
|     id|label|prediction|
+-------+-----+----------+
|7697757|  0.0|       0.0|
|7697782|  0.0|       0.0|
|7697786|  0.0|       0.0|
|7697794|  0.0|       0.0|
|7697805|  0.0|       0.0|
+-------+-----+----------+
only showing top 5 rows



+-------+-----+----------+
|     id|label|prediction|
+-------+-----+----------+
|7698941|  1.0|       1.0|
|7701470|  1.0|       1.0|
|7703762|  1.0|       1.0|
|7705039|  1.0|       1.0|
|7705856|  1.0|       1.0|
+-------+-----+----------+
only showing top 5 rows

Naive Bayes classifier predictions


+-------+-----+----------+
|     id|label|prediction|
+-------+-----+----------+
|7697757|  0.0|       0.0|
|7697782|  0.0|       0.0|
|7697786|  0.0|       0.0|
|7697794|  0.0|       0.0|
|7697808|  0.0|       0.0|
+-------+-----+----------+
only showing top 5 rows



+-------+-----+----------+
|     id|label|prediction|
+-------+-----+----------+
|7697805|  0.0|       1.0|
|7698625|  0.0|       1.0|
|7699045|  0.0|       1.0|
|7699145|  0.0|       1.0|
|7699710|  0.0|       1.0|
+-------+-----+----------+
only showing top 5 rows



## Evaluate the models with Area under ROC curve (AUC) and overall Accuracy

In [128]:
evaluator1 = BinaryClassificationEvaluator().setMetricName("areaUnderROC")
evaluator2 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
print ("Logistic Regression: \n", "\t Area under ROC curve (AUC):", evaluator1.evaluate(lrPredictions), "\n\t Accuracy: ", evaluator2.evaluate(lrPredictions))
print ("Naive Bayes: \n", "\t Area under ROC curve (AUC):", evaluator1.evaluate(nbPredictions), "\n\t Accuracy: ", evaluator2.evaluate(nbPredictions))

Logistic Regression: 
 	 Area under ROC curve (AUC): 0.9990286117979512 
	 Accuracy:  0.9924892703862661


Naive Bayes: 
 	 Area under ROC curve (AUC): 0.00029436006122689424 
	 Accuracy:  0.8594420600858369


## A simple comparison on the two modles's performance

### Overall Accuracy:

- lables are assigned by K-MEANS clustering, so overall accuracy was very high
- Logistic Regression classifier carried out a higher overall accuracy (0.99 vs 0.86)
- Both modle can yield predictions accuracy than 0.5 of random
- So Logistic Regression made better classification over Naive Bayes on this training/test set

### Area under ROC curve (AUC):

- Logistic Regression classifier carried out a much higher AUC (0.85 vs 0.00)
- Logistic Regression classifier has higher discriminative power over class distribution

### In summary, the Logistic Regression classifier has much better performance over the Naive Bayes, whatever on this perticular training/test set or potential future test datasets.

### **In our raw data showed at the beginning, one class had much more data points (4423) than the other class (154). This bias significantly affected Naive Bayes classifier -- it had very hight error costs (false positive and false negative cost), but not the Logistic Regression classifier. So in our experiment, at least we can conclude that under this kind of bias, we should avoid using Naive Bayes classifier.**