In [0]:
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

In [0]:
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [0]:
filename = '/FileStore/tables/kindle_reduced_clean-3.csv'
df = spark.read.csv(filename,  inferSchema=True, header = True)

In [0]:
df.select("overall","summary","reviewText").show(5)

In [0]:
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

In [0]:
df = df.dropna(how='any')

In [0]:
df=df.drop("index","reviewerName","unixReviewTime","helpful","HasHelpful")

In [0]:
df = df.withColumn('reviewText', translate('reviewText', '.', ''))
df = df.withColumn('reviewText', translate('reviewText', ',', ''))
df = df.withColumn('reviewText', translate('reviewText', '$', ''))

In [0]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover

#tokenize text (make words into an array)
tokenizer = Tokenizer(inputCol='reviewText', outputCol='reviewText_token')
df_token = tokenizer.transform(df).select('*')

#remove basic words
remover = StopWordsRemover(inputCol='reviewText_token', outputCol='reviewText_clean')
df_stop=remover.transform(df_token).select('*')

In [0]:
#tokenize summaries (make words into an array)
tokenizer = Tokenizer(inputCol='summary', outputCol='summary_token')
df_token = tokenizer.transform(df_stop).select('*')

#remove basic words
remover = StopWordsRemover(inputCol='summary_token', outputCol='summary_clean')
df_stop=remover.transform(df_token).select('*')

In [0]:
df_stop=df_stop.drop("reviewText", "summary","reviewText_token", "summary_token")
df_stop.show(5)

In [0]:
display(df_stop.select("reviewText_clean"))

reviewText_clean
"List(arc, provided, author, exchange, honest, reviewthis, first, time, read, book, miranda, p, charles, lastthis, book, -, broken, hearts, twisted, stories, lies, scared, confused, lovers, zach, rebecca, met, hired, private, nurse, grandmother, surgery, dated, four, months, rebecca, said, three, little, words, zach, say, back, rebecca, breaks, chance, tomonths, later, hired, join, grandmother, month, long, cruise, birthday, nurse, grandmother, also, friends, grandsons, joining, birthday, celebration, , cruise, around, rebecca, zach, work, fears, misunderstandings, find, love, again?a, sexy, quick, read, able, put, down!!!)"
"List(wild, ride, nancy, warrenchanging, gears, seriesduncan, forbes, professor, sabbatical, writes, searches, lost, stolen, art, following, lead, long, lost, van, gogh, leads, small, town, swiftcurrent, oregonwith, sexiest, librarian, ever, seenalexandra, forrest, agenda, grandfather, passed, away, plans, finishing, details, writing, memoirs, packing, home, sell, move, big, city, complete, life, plan, one, definitely, include, sexy, stranger, librarygillian, forrest, munn, messed, life, big, time, teen, small, towns, forget, husband, left, alone, officer, tom, perkins, seems, really, believe, changeda, dead, body, library, quiet, town, changes, everything, things, like, happen, steamy, romance, murder, mystery, two, happenings, going, another, page, turner, story**strong, sexual, content, language)"
"List(well, thought, story, many, things, going, time, alien, race, jumps, earth, orbit, destroys, major, earth, cities, tells, us, stop, technologies, fun, begins!, helpful, alien, side, love, story, twist!, good, read!)"
"List(autistic, frequent, reading, difficulties, especially, third-person, stories, though, read, lacuna, twice, still, appreciate, although, bad, book)"
"List(book, four, five, part, serial, , suspense, highest, , , liking, jess, less, less, book, , learning, none, makes, sense, , feeling, going, end, badly)"
"List(really, enjoyed, book, kept, interested, page, one, way, end, author, made, story, feel, real, knew, draw, audience, keep, fully, interested, story, scott, winnie, previously, meet, changing, game, highly, recommend, read, first, chemistry, two, explosive, swear, thought, kindle, going, melt, book, interesting, turns, events, along, way, meet, characters, forget, embark, helluva, journey, sex, love, forgiveness)"
"List(pleasuring, lady, jess, michaels, exciting, regency, historical, romance, set, 1814, london, 2, 8220the, pleasure, wars8221but, read, stand, alone, see8221taken, duke8221, interesting, tale, man, believes, fall, love, love, anyone, woman, proves, wrong, join, lady, portia, , marquis, miles, weatherfield, journey, discovery, passion, feelings, discovery, things, far, deeper, flesh, filled, liessecretsvoyeurismpassionscandalsizzling, sensualitya, brief, threesome, encounterkindnessvery, wicked, encountersand, power, love, erotic, romance!, well, written!, sensual, encounters, well, written, tastebut, 8220pleasuring, lady8221, sensual, encountersit, much, erotic, tale!, ms, michaels, masterful, storyteller, passion, love, love, series, looking, forward, next, installment, well, done!!, must, read, enjoy, historical, romanceswith, twists, turns, well, sensual, encounters, loving, kind, received, honest, review, authorrating:, 45heat, rating:, wild, ridereviewed, by:, aprilr, review, courtesy, book, addiction)"
"List(numerous, books, written, subject, survival, disaster, preparedness, contain, good, information, really, make, top, heapin, guide, specifically, marketed, beginners, really, expect, ton, detail, book, surprise, decent, amount, detail, yes, helpful, information, book, even, decent, amount, writing, style, numerous, grammatical, errors, found, throughout, lead, pass, recommendationi, gave, amazon, 3-stars, ok, glad, picked, available, free, also, glad, pay, itif, enjoy, reading, prepping, survival, preparedness, also, recommendultralight, survival:, make, small, light, bug, bag, save, life)"
"List(story, mom, , duck, making, baby, duck, mom, duck, , three, baby, , one, big, baby, mom, duck, think, aturkey)"
"List(book, suddenly, appeared, book, list, ordered, way, canceling, even, thought, pricewas, extremely, fair)"


In [0]:
df_stop.printSchema()

In [0]:
df_stop.show(5)

In [0]:
#Exploratory Data Analysis

In [0]:
df_stop.describe().show()

In [0]:
#Start of Pipelines

In [0]:
from pyspark import HiveContext
hiveContext = HiveContext(sc)

#df.show(truncate = False)
# Get term frequency vector through HashingTF
from pyspark.ml.feature import HashingTF
ht = HashingTF(inputCol="reviewText_clean", outputCol="review_features")
result = ht.transform(df_stop)
ht1 = HashingTF(inputCol="summary_clean", outputCol="summary_features")
result = ht1.transform(result)
result.show(2)


In [0]:
result=result.drop("reviewText_clean","summary_clean")

In [0]:
type(result)

In [0]:
df_sp = result.withColumn('overall', when(result.overall >= 2.5,1).otherwise(0))
#df_sp = df_sp.withColumn('HasHelpful', when(df_sp.HasHelpful == True,1).otherwise(0))

In [0]:
df_sp.show(3)

In [0]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler 
from pyspark.ml.feature import OneHotEncoder
from pyspark.mllib.linalg import Vectors

In [0]:
df_sp.printSchema()

In [0]:
# label encode

asin_indexer = StringIndexer(inputCol = 'asin', outputCol='asin_num').setHandleInvalid("skip").fit(df_sp)
df_sp = asin_indexer.transform(df_sp)

reviewTime_indexer = StringIndexer(inputCol = 'reviewTime', outputCol='reviewTime_num').setHandleInvalid("skip").fit(df_sp)
df_sp = reviewTime_indexer.transform(df_sp)

reviewerID_indexer = StringIndexer(inputCol = 'reviewerID', outputCol='reviewerID_num').setHandleInvalid("skip").fit(df_sp)
df_sp = reviewerID_indexer.transform(df_sp)


In [0]:
asin_onehoter = OneHotEncoder(inputCol='asin_num', outputCol='asin_vector').fit(df_sp)
df_sp = asin_onehoter.transform(df_sp)

reviewTime_onehoter = OneHotEncoder(inputCol='reviewTime_num', outputCol='reviewTime_vector').fit(df_sp)
df_sp = reviewTime_onehoter.transform(df_sp)

reviewerID_onehoter = OneHotEncoder(inputCol='reviewerID_num', outputCol='reviewerID_vector').fit(df_sp)
df_sp = reviewerID_onehoter.transform(df_sp)


In [0]:
display(df_sp)

asin,overall,reviewTime,reviewerID,HelpfulRecords,weightedRating,review_features,summary_features,asin_num,reviewTime_num,reviewerID_num,asin_vector,reviewTime_vector,reviewerID_vector
B00J4S6YWC,1,"06 21, 2014",AUSBN91MCI3WM,0.0,5.0,"Map(vectorType -> sparse, length -> 262144, indices -> List(1546, 11941, 16757, 25764, 34343, 38640, 39143, 41931, 42882, 45155, 50793, 53570, 60345, 68044, 71961, 75181, 75836, 77751, 81103, 84028, 84696, 84933, 90859, 91878, 93307, 96005, 101464, 121517, 122682, 124360, 124403, 130047, 130846, 132133, 132270, 133834, 136350, 139891, 146929, 147136, 151864, 165682, 168976, 171222, 173955, 181494, 186480, 188523, 188835, 189113, 193920, 196689, 206312, 207834, 228967, 235240, 235962, 236008, 239362, 242022, 242438, 248572, 248630, 249180, 254661, 256961, 257872), values -> List(1.0, 1.0, 2.0, 1.0, 1.0, 1.0, 3.0, 2.0, 1.0, 1.0, 1.0, 2.0, 1.0, 1.0, 4.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 3.0, 1.0, 2.0, 1.0, 1.0, 1.0, 1.0, 2.0, 1.0, 1.0, 2.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 262144, indices -> List(16757, 84933), values -> List(1.0, 1.0))",3959.0,58.0,4189.0,"Map(vectorType -> sparse, length -> 4431, indices -> List(3959), values -> List(1.0))","Map(vectorType -> sparse, length -> 972, indices -> List(58), values -> List(1.0))","Map(vectorType -> sparse, length -> 4347, indices -> List(4189), values -> List(1.0))"
B00HCZUBH8,1,"03 3, 2014",A141H51I3H4B1S,0.5,5.0,"Map(vectorType -> sparse, length -> 262144, indices -> List(6346, 6872, 9129, 9781, 11203, 11275, 16725, 17893, 19153, 19684, 21823, 25231, 27308, 30686, 33358, 34121, 37908, 41198, 49407, 54245, 62499, 66187, 66208, 68595, 69060, 71949, 72208, 73018, 73342, 75707, 77470, 79055, 83261, 84933, 92607, 98194, 99179, 100314, 102032, 102382, 109944, 110427, 112747, 116836, 117554, 121517, 123445, 123940, 131391, 133567, 134992, 138905, 152275, 153272, 154336, 159066, 170414, 172517, 172888, 172931, 175966, 185228, 187722, 192356, 199643, 201457, 203802, 205764, 206312, 206924, 207499, 208258, 208787, 210587, 212846, 214676, 216318, 221821, 223651, 225157, 225567, 229264, 229543, 232427, 232519, 248899, 249556, 250904, 250984, 253382, 253415, 258728), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 2.0, 2.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 262144, indices -> List(50415, 131391, 137819), values -> List(1.0, 1.0, 1.0))",3518.0,7.0,525.0,"Map(vectorType -> sparse, length -> 4431, indices -> List(3518), values -> List(1.0))","Map(vectorType -> sparse, length -> 972, indices -> List(7), values -> List(1.0))","Map(vectorType -> sparse, length -> 4347, indices -> List(525), values -> List(1.0))"
B006RZNR3Y,1,"07 10, 2014",AP8TKDM76TROZ,0.0,4.0,"Map(vectorType -> sparse, length -> 262144, indices -> List(2325, 23087, 37255, 84131, 87607, 97376, 102382, 109156, 111767, 113432, 115996, 121517, 142343, 149272, 150535, 151058, 154469, 174582, 186480, 186925, 202268, 205317, 214676, 218026, 227686, 236725, 245044), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 1.0, 1.0, 2.0, 1.0, 2.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 262144, indices -> List(51852, 55639, 107472), values -> List(1.0, 1.0, 1.0))",106.0,167.0,393.0,"Map(vectorType -> sparse, length -> 4431, indices -> List(106), values -> List(1.0))","Map(vectorType -> sparse, length -> 972, indices -> List(167), values -> List(1.0))","Map(vectorType -> sparse, length -> 4347, indices -> List(393), values -> List(1.0))"
B006RZNR3Y,1,"02 1, 2014",A22GGHISKRVAOX,0.0,4.0,"Map(vectorType -> sparse, length -> 262144, indices -> List(11995, 31536, 41294, 53570, 58227, 84028, 96611, 119116, 122403, 129422, 145380, 189113, 211985, 219766, 224909, 234233), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 262144, indices -> List(97078, 108258, 137819, 190787), values -> List(1.0, 1.0, 1.0, 1.0))",106.0,26.0,43.0,"Map(vectorType -> sparse, length -> 4431, indices -> List(106), values -> List(1.0))","Map(vectorType -> sparse, length -> 972, indices -> List(26), values -> List(1.0))","Map(vectorType -> sparse, length -> 4347, indices -> List(43), values -> List(1.0))"
B00J47H8H8,1,"03 21, 2014",A19DWIC1T7127Y,0.75,3.0,"Map(vectorType -> sparse, length -> 262144, indices -> List(8804, 11825, 12710, 24397, 29514, 29546, 93307, 102382, 105627, 110743, 137733, 156917, 163984, 189113, 199581, 201511, 234593, 234654, 249180), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 1.0, 1.0, 1.0, 1.0, 2.0, 1.0, 1.0, 1.0, 1.0, 5.0))","Map(vectorType -> sparse, length -> 262144, indices -> List(139030, 166839), values -> List(1.0, 1.0))",3957.0,331.0,107.0,"Map(vectorType -> sparse, length -> 4431, indices -> List(3957), values -> List(1.0))","Map(vectorType -> sparse, length -> 972, indices -> List(331), values -> List(1.0))","Map(vectorType -> sparse, length -> 4347, indices -> List(107), values -> List(1.0))"
B00LRZLRMM,1,"07 14, 2014",AM5P5MI4PU2KH,0.0,5.0,"Map(vectorType -> sparse, length -> 262144, indices -> List(991, 3338, 6696, 12784, 16108, 19633, 19698, 21823, 32890, 50415, 51471, 53570, 61899, 65343, 68228, 73018, 94900, 102382, 106691, 110427, 111767, 116312, 133955, 138895, 140784, 142343, 153524, 156917, 159927, 166936, 174475, 176156, 182804, 186480, 188835, 189113, 192213, 195761, 208694, 212560, 212761, 221223, 229234, 229264, 234706, 240840, 241856, 248630, 258728), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 1.0, 1.0, 1.0, 2.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 262144, indices -> List(58732, 78987), values -> List(1.0, 1.0))",4428.0,20.0,3949.0,"Map(vectorType -> sparse, length -> 4431, indices -> List(4428), values -> List(1.0))","Map(vectorType -> sparse, length -> 972, indices -> List(20), values -> List(1.0))","Map(vectorType -> sparse, length -> 4347, indices -> List(3949), values -> List(1.0))"
B00DWGFFBI,1,"11 19, 2013",AM09IO8QXEB1B,0.75,5.0,"Map(vectorType -> sparse, length -> 262144, indices -> List(2701, 3023, 4558, 5257, 10244, 11825, 12524, 16108, 19263, 19624, 27308, 30429, 32930, 36999, 37086, 37521, 49574, 51081, 53570, 54961, 59957, 60741, 61318, 68891, 69099, 73508, 74014, 76764, 78896, 79132, 79292, 85602, 90867, 92068, 100928, 103064, 105259, 109753, 111798, 111823, 120593, 128582, 131593, 132133, 133640, 133955, 135290, 135923, 139241, 139305, 143531, 149079, 149540, 152275, 154977, 160668, 163000, 165678, 171785, 172164, 175966, 177633, 177873, 180558, 181337, 182556, 185212, 186480, 186925, 189113, 192905, 201440, 204787, 205430, 206596, 208016, 208053, 209518, 214676, 215737, 216145, 218172, 226965, 229407, 234706, 235240, 237407, 240879, 240932, 248711, 249180, 250193, 256986, 258474, 260202), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 1.0, 2.0, 1.0, 1.0, 1.0, 2.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 3.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 1.0, 1.0, 2.0, 1.0, 1.0, 1.0, 1.0, 2.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 1.0, 1.0, 1.0, 1.0, 2.0, 1.0, 1.0, 5.0, 4.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 262144, indices -> List(63678, 73508, 160668, 181337, 199122, 229407), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0))",2804.0,40.0,23.0,"Map(vectorType -> sparse, length -> 4431, indices -> List(2804), values -> List(1.0))","Map(vectorType -> sparse, length -> 972, indices -> List(40), values -> List(1.0))","Map(vectorType -> sparse, length -> 4347, indices -> List(23), values -> List(1.0))"
B00ALUMW96,1,"08 5, 2013",A1SKME00QMJR6,1.0,3.0,"Map(vectorType -> sparse, length -> 262144, indices -> List(2701, 4214, 5365, 6886, 9781, 15885, 18176, 26620, 29292, 33358, 36319, 38574, 39794, 61318, 68716, 68947, 80808, 82597, 87405, 88845, 89717, 89721, 109208, 113089, 113432, 122954, 125752, 129422, 132543, 133243, 134304, 138036, 139321, 139462, 141075, 147136, 153946, 163240, 167207, 172517, 174582, 174966, 184280, 185228, 189113, 196543, 198197, 199496, 204971, 217281, 229264, 233502, 237761, 239713, 251870, 252754, 254565, 257222, 257685), values -> List(1.0, 2.0, 1.0, 1.0, 1.0, 2.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 2.0, 1.0, 2.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 1.0, 2.0, 2.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 1.0, 1.0, 1.0, 1.0, 2.0, 2.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 262144, indices -> List(36319, 53238, 113432, 166027, 189113), values -> List(1.0, 1.0, 1.0, 1.0, 1.0))",1971.0,501.0,1251.0,"Map(vectorType -> sparse, length -> 4431, indices -> List(1971), values -> List(1.0))","Map(vectorType -> sparse, length -> 972, indices -> List(501), values -> List(1.0))","Map(vectorType -> sparse, length -> 4347, indices -> List(1251), values -> List(1.0))"
B004SUP1XO,1,"12 4, 2012",AIE9O8ERPUDFP,0.0,3.0,"Map(vectorType -> sparse, length -> 262144, indices -> List(21823, 96005, 111767, 141269, 146870, 153423, 172888, 176996, 214985, 248069, 249180), values -> List(1.0, 1.0, 1.0, 3.0, 1.0, 1.0, 1.0, 3.0, 4.0, 1.0, 3.0))","Map(vectorType -> sparse, length -> 262144, indices -> List(175613, 198150, 200537, 249180), values -> List(1.0, 1.0, 1.0, 1.0))",65.0,187.0,3845.0,"Map(vectorType -> sparse, length -> 4431, indices -> List(65), values -> List(1.0))","Map(vectorType -> sparse, length -> 972, indices -> List(187), values -> List(1.0))","Map(vectorType -> sparse, length -> 4347, indices -> List(3845), values -> List(1.0))"
B004SUP1XO,0,"05 27, 2014",A3Q1KLWRFQ59JM,0.0,3.0,"Map(vectorType -> sparse, length -> 262144, indices -> List(14376, 32392, 51471, 71707, 102076, 125537, 127501, 142343, 174966, 189113, 227001, 239685), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 1.0, 1.0))","Map(vectorType -> sparse, length -> 262144, indices -> List(41601, 94555, 98717, 113673, 189113, 227001), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0))",65.0,217.0,3228.0,"Map(vectorType -> sparse, length -> 4431, indices -> List(65), values -> List(1.0))","Map(vectorType -> sparse, length -> 972, indices -> List(217), values -> List(1.0))","Map(vectorType -> sparse, length -> 4347, indices -> List(3228), values -> List(1.0))"


In [0]:
df_assem = VectorAssembler(inputCols=['review_features','summary_features','weightedRating','asin_vector','reviewTime_vector', 
                                          'reviewerID_vector'],
                                          outputCol='features')
df_assem = df_assem.transform(df_sp)

In [0]:
#Train Test Split
seed = 314
train_test = [0.8, 0.2]


data_set = df_assem.select(['features','overall'])
train_df, test_df = data_set.randomSplit(train_test, seed)

In [0]:
#Linear Regression

In [0]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='overall', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)

In [0]:
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

In [0]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)


In [0]:
train_df.describe().show()

In [0]:
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","overall","features").show(5)
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="overall",metricName="r2")

In [0]:
test_result = lr_model.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

In [0]:
from pyspark.ml.classification import LogisticRegression

# Train Logistic Regression Model
log_reg = LogisticRegression(labelCol = 'overall').fit(train_df)

train_pred = log_reg.evaluate(train_df).predictions

train_pred.filter(train_pred['overall'] == 1).filter(train_pred['prediction'] == 1).select(['overall', 'prediction', 'probability']).show(10, False)

In [0]:
# Evaluate on testdata

test_result = log_reg.evaluate(test_df).predictions
test_result.show(3)

In [0]:
# Accuracy computation

tp = test_result[(test_result.overall == 1) & (test_result.prediction == 1)].count()
tn = test_result[(test_result.overall == 0) & (test_result.prediction == 1)].count()
fp = test_result[(test_result.overall == 0) & (test_result.prediction == 1)].count()
fn = test_result[(test_result.overall == 1) & (test_result.prediction == 0)].count()

print('test accuracy is : %f'%((tp+tn)/(tp+tn+fp+fn)))

In [0]:
# Recall and Precision

print('test recall is : %f'%(tp/(tp+fn)))
print('test precision is : %f'%(tp/(tp+fp)))

In [0]:
# F1 score

recall = tp/(tp+fn)
precision = tp/(tp+fp)

F1 =  2 * (precision*recall) / (precision + recall)
print('F1 score: %0.3f' % F1)

In [0]:
#NaiveBayes

In [0]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

train_df = train_df.withColumnRenamed('overall','label')
test_df = test_df.withColumnRenamed('overall','label')

# create the trainer and set its parameters
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

# train the model
model = nb.fit(train_df)

# select example rows to display.
predictions = model.transform(test_df)
predictions.show()

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

In [0]:
# Accuracy computation

tp = predictions[(predictions.label == 1) & (predictions.prediction == 1)].count()
tn = predictions[(predictions.label == 0) & (predictions.prediction == 1)].count()
fp = predictions[(predictions.label == 0) & (predictions.prediction == 1)].count()
fn = predictions[(predictions.label == 1) & (predictions.prediction == 0)].count()

recall = tp/(tp+fn)
precision = tp/(tp+fp)

# Recall and Precision

print('test recall is : %f'% recall)
print('test precision is : %f'% precision)

In [0]:
# F1 score

F1 =  2 * (precision*recall) / (precision + recall)
print('F1 score: %0.3f' % F1)

In [0]:
#Random Forest

In [0]:
#train RF model
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

rf = RandomForestClassifier(labelCol = 'label', featuresCol = "features", numTrees = 20, seed = 314)

In [0]:
model = rf.fit(train_df)

In [0]:
pred = model.transform(test_df)

In [0]:
evalRF = MulticlassClassificationEvaluator(labelCol = 'label', predictionCol = "prediction", metricName = "accuracy")

In [0]:
acc = evalRF.evaluate(pred)
print("Test set accuracy = " + str(acc))

In [0]:
# Accuracy computation

tp = pred[(pred.label == 1) & (pred.prediction == 1)].count()
tn = pred[(pred.label == 0) & (pred.prediction == 1)].count()
fp = pred[(pred.label == 0) & (pred.prediction == 1)].count()
fn = pred[(pred.label == 1) & (pred.prediction == 0)].count()

recall = tp/(tp+fn)
precision = tp/(tp+fp)

# Recall and Precision

print('test recall is : %f'% recall)
print('test precision is : %f'% precision)

In [0]:
# F1 score

F1 =  2 * (precision*recall) / (precision + recall)
print('F1 score: %0.3f' % F1)