In [1]:
# Run command below with docker installed to download and run a spark docker instance:
# docker run --name sparkbook -p 8881:8888 -v "$PWD":/home/jovyan/work jupyter/pyspark-notebook start.sh jupyter lab --LabApp.token=''
# Next, open a web browser and navigate to localhost:8881 to enter docker container. This notebook can then be run.

In [2]:
import sys
!{sys.executable} -m pip install nltk
#Required when running from a newly created spark docker container



In [3]:
import nltk
nltk.download('punkt')
nltk.download('stopwords')
#required for nltk functionality

[nltk_data] Downloading package punkt to /home/jovyan/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to /home/jovyan/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [33]:
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import udf
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import CountVectorizer, IDF
from pyspark.ml import Pipeline
from pyspark.sql.types import ArrayType, FloatType, StringType
from nltk.stem.porter import PorterStemmer
from pyspark import SparkConf, SparkContext
from sklearn.decomposition import NMF
import nltk
from nltk.tokenize import sent_tokenize
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem.porter import PorterStemmer
import sys
import string
from collections import defaultdict
import numpy as np

In [5]:
#Creates Spark context and session.
conf = SparkConf().setAll([('spark.executor.memory', '15g'), ('spark.driver.memory', '30g')])
sc = SparkContext(conf=conf)
spark = SparkSession.builder.appName("SimpleApp").getOrCreate()

In [6]:
#prints current configuration of spark context
print(sc.getConf().getAll())

[('spark.driver.memory', '30g'), ('spark.app.name', 'SimpleApp'), ('spark.rdd.compress', 'True'), ('spark.driver.port', '40515'), ('spark.app.id', 'local-1562079182261'), ('spark.serializer.objectStreamReset', '100'), ('spark.master', 'local[*]'), ('spark.executor.id', 'driver'), ('spark.submit.deployMode', 'client'), ('spark.driver.host', 'a3343a2ac2ad'), ('spark.executor.memory', '15g'), ('spark.ui.showConsoleProgress', 'true')]


In [7]:
#Creates a spark datafrom from the amazon reviews file. Using .limit(x) will subset the first x items.
df = spark.read.csv("data/gaming_reviews.tsv", sep="\t", header=True, inferSchema=True).limit(500)

In [8]:
df.count()

500

In [9]:
# Stopwords to filter from  
stop_words = set(stopwords.words('english'))
stop_words.add('')

In [37]:
# removes punctuation, adds column 'body_list' which is a list of words, and selects only the star_rating and body_list columns
stop_words = set(stopwords.words('english'))
for word in ['', 1, 2, 3, 4, 5]:
    stop_words.add(word)

def process_str(row):
    """Input: String
    Output: List of strings without punctuation
    
    Removes punctuation using functions from the strings library and stems using NLK. 
    Performs raw string operations in C via a lookup table for maximum performance."""
    stemmer = PorterStemmer()
    word_list = row.translate(str.maketrans('', '', string.punctuation)).split(' ')
    return [stemmer.stem(x.lower()) for x in word_list if x.lower() not in stop_words]

process = udf(process_str, ArrayType(StringType()))

df_new = df.withColumn('body_list', process(df['review_body']))\
        .withColumn('body_list', process(df['review_body']))\
        .select('star_rating', 'body_list')
df_new.head()

Row(star_rating=5, body_list=['use', 'elit', 'danger', 'mac', 'amaz', 'joystick', 'especi', 'love', 'twist', 'stick', 'differ', 'movement', 'bind', 'well', 'move', 'normal', 'way'])

In [36]:
stemmer = PorterStemmer()
t = ['used', 'elite', 'dangerous', 'mac', 'amazing', \
     'joystick', 'especially', 'love', 'twist', 'stick', 'different', 'movement', 'bindings', 'well', 'move', 'normal', 'way']
for token in t:
    print(stemmer.stem(token))

use
elit
danger
mac
amaz
joystick
especi
love
twist
stick
differ
movement
bind
well
move
normal
way


In [49]:
cv = CountVectorizer(inputCol="body_list", outputCol="features", minDF=0.001)
idf = IDF(inputCol='features', outputCol = 'idf_features')
rf = RandomForestClassifier(labelCol='star_rating', featuresCol='idf_features', seed=100)

In [48]:
type(0.001)

float

In [50]:
# Stage one in pipeline the words are separated into count vectors
stage1_fit = cv.fit(df_new)
stage1_transform = stage1_fit.transform(df_new)

In [51]:
# Stage two in pipeline for tf-IDF
stage2_fit = idf.fit(stage1_transform)
stage2_transform = stage2_fit.transform(stage1_transform)

In [52]:
# Stage three in pipeline for random forest
stage3_fit = rf.fit(stage2_transform)
stage3_transform = stage3_fit.transform(stage2_transform)

In [53]:
# Function that prints the top n features
def print_top_features(model, n=10):
    """
    Input: RandomForest model, number of top feature words to print.
    Output: None. Prints top n words to console.
    """
    sorted_indices = np.flip(np.argsort(model.featureImportances.toArray()))
    for i in range(20):
        print(stage1_fit.vocabulary[sorted_indices[i]])
print_top_features(stage3_fit);

open
could
compani
work
play
even
enemi
would
good
useless
unabl
overal
past
return
whole
fix
done
15
suggest
netflix


In [54]:
star_idf = stage3_transform.select('star_rating', 'idf_features')
vocabulary = np.array(stage1_fit.vocabulary)

In [55]:
nmf_model = NMF(n_components=10)

In [56]:
X = [x[0].toArray() for x in star_idf.select('idf_features').collect()] 
nmf_fit = nmf_model.fit(X)
nmf_transform = nmf_model.transform(X)

In [57]:
for i in range(nmf_fit.components_.shape[0]):
    print(vocabulary[np.flip(np.argsort(nmf_fit.components_[i, :]))[0:10]])

['sonic' 'level' '3d' '62' 'youll' 'boom' 'cooper' 'sli' 'charact' 'game']
['dishonor' 'fp' 'play' 'caus' 'thief' 'get' 'gun' 'kill' 'ive' 'gamer']
['disney' 'toybox' 'coaster' 'new' 'war' 'path' 'there' '30' 'level'
 'infin']
['batmobil' 'arkham' 'combat' 'citi' 'game' 'addit' 'commentari' 'it’'
 'event' 'previou']
['play' 'disney' 'jar' 'br' 'issu' 'split' 'player' 'game' '20' 'screen']
['cod' 'skill' 'campaign' 'better' 'instal' 'almost' 'rather' 'caus'
 'take' 'two']
['sound' 'br' 'turtl' 'beach' 'ear' 'charg' 'hurt' 'often' 'turn' 'wear']
['make' 'game' 'choic' 'charact' 'ultim' 'player' 'futur' 'decis' 'stori'
 'decid']
['ea' 'br' 'man' 'rate' 'yard' 'catch' 'draft' 'play' 'player' 'madden']
['ps4' 'port' 'usb' 'stand' 'control' 'fan' 'charg' 'qualiti' 'dual' 'one']


In [58]:
star_idf.head()

Row(star_rating=5, idf_features=SparseVector(2594, {10: 1.9399, 12: 2.4099, 16: 2.553, 23: 3.0811, 30: 2.8154, 113: 3.5775, 171: 4.0194, 218: 4.0194, 229: 4.4248, 439: 4.4248, 511: 4.4248, 916: 5.118, 982: 5.118, 1070: 5.118, 1128: 5.118, 2193: 5.5235, 2464: 5.5235}))

In [60]:
pd = star_idf.select('*').toPandas()

In [76]:
pd.info(memory_usage='deep')

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 500 entries, 0 to 499
Data columns (total 2 columns):
star_rating     500 non-null int32
idf_features    500 non-null object
dtypes: int32(1), object(1)
memory usage: 21.6 KB


0      (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...
1      (2.398652528539881, 0.0, 0.0, 0.0, 2.089471716...
2      (0.0, 0.0, 0.0, 1.6522579096170285, 0.0, 0.0, ...
3      (1.1993262642699405, 0.0, 0.0, 0.0, 0.0, 0.0, ...
4      (0.0, 2.7606483882667736, 0.0, 0.0, 0.0, 0.0, ...
5      (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...
6      (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...
7      (0.0, 1.3803241941333868, 0.0, 0.0, 0.0, 0.0, ...
8      (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...
9      (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...
10     (1.1993262642699405, 0.0, 0.0, 0.0, 0.0, 0.0, ...
11     (0.0, 0.0, 0.0, 0.0, 4.178943432079547, 0.0, 0...
12     (0.0, 0.0, 2.011913481693899, 0.0, 0.0, 0.0, 0...
13     (1.1993262642699405, 1.3803241941333868, 0.0, ...
14     (0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 2.1912544103497...
15     (0.0, 0.0, 0.0, 1.6522579096170285, 0.0, 0.0, ...
16     (0.0, 1.3803241941333868, 0.0, 0.0, 0.0, 0.0, ...
17     (0.0, 2.7606483882667736