In [1]:
## Load context
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))
import re
import os
import pandas as pd
import numpy as np
from time import time

from pyspark.sql import functions as F
from pyspark.ml.fpm import FPGrowth
from pyspark.sql.types import ArrayType, FloatType, StringType

hostname = os.uname()[1]

from pyspark.sql import SparkSession
from pyspark import SparkConf

# BEN'S MASTER: 192.168.2.87
# OUR MASTER:   192.168.2.203

# New API
conf = (SparkConf()    
   .setMaster("spark://192.168.2.203:7077")  
   .setAppName(f"load_local_comments; hostname: {hostname}")    
   .set("spark.executor.cores",2)  
   .set("spark.dynamicAllocation.enabled", False)  
   .set("spark.shuffle.service.enabled", False))

#spark_context = SparkContext(conf = conf)  
spark_session = SparkSession.builder.config(conf=conf).getOrCreate()

# Old API (RDD)
spark_context = spark_session.sparkContext
spark_context.uiWebUrl
print(f'hostname for this machine: {hostname}')

hostname for this machine: host-192-168-2-247-ldsa


In [2]:
load_fraction = 0.05

df = spark_session.read\
    .option("header", "true")\
    .json('hdfs://192.168.2.203:9000/RC_2010-01')\
    .sample(False, load_fraction, 1234)\
    .cache()

sampled_count = df.count()

In [3]:
### START TIMER

print(f'Starting timer. Fraction of dataset is {load_fraction} and sampled size is {sampled_count}')
start_clock = time()

Starting timer. Fraction of dataset is 0.05 and sampled size is 144339


In [4]:
df_fp = df.drop(*['permalink', 'gilded', 'author_flair_css_class', 'can_gild', 'author_flair_text', 'author_cakeday'])
df_fp.printSchema()

root
 |-- archived: boolean (nullable = true)
 |-- author: string (nullable = true)
 |-- body: string (nullable = true)
 |-- controversiality: long (nullable = true)
 |-- created_utc: string (nullable = true)
 |-- distinguished: string (nullable = true)
 |-- downs: long (nullable = true)
 |-- edited: string (nullable = true)
 |-- id: string (nullable = true)
 |-- link_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- parent_id: string (nullable = true)
 |-- removal_reason: string (nullable = true)
 |-- retrieved_on: long (nullable = true)
 |-- score: long (nullable = true)
 |-- score_hidden: boolean (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- subreddit_id: string (nullable = true)
 |-- ups: long (nullable = true)



In [5]:
# Filter functions
to_basket_unique = lambda comment: list(set((re.sub(r'\W+', ' ', comment).lower().strip().split(' '))))
udf_to_basket_unique = F.udf(to_basket_unique, ArrayType(StringType()))


def filter_words(basket):
    stopwords = ['i', 'me', 'my', 'myself', 'we', 'our', 'ours', 'ourselves', 'you', "you're", "you've", "you'll", "you'd", 'your', 'yours', 'yourself', 'yourselves', 'he', 'him', 'his', 'himself', 'she', "she's", 'her', 'hers', 'herself', 'it', "it's", 'its', 'itself', 'they', 'them', 'their', 'theirs', 'themselves', 'what', 'which', 'who', 'whom', 'this', 'that', "that'll", 'these', 'those', 'am', 'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'having', 'do', 'does', 'did', 'doing', 'a', 'an', 'the', 'and', 'but', 'if', 'or', 'because', 'as', 'until', 'while', 'of', 'at', 'by', 'for', 'with', 'about', 'against', 'between', 'into', 'through', 'during', 'before', 'after', 'above', 'below', 'to', 'from', 'up', 'down', 'in', 'out', 'on', 'off', 'over', 'under', 'again', 'further', 'then', 'once', 'here', 'there', 'when', 'where', 'why', 'how', 'all', 'any', 'both', 'each', 'few', 'more', 'most', 'other', 'some', 'such', 'no', 'nor', 'not', 'only', 'own', 'same', 'so', 'than', 'too', 'very', 's', 't', 'can', 'will', 'just', 'don', "don't", 'should', "should've", 'now', 'd', 'll', 'm', 'o', 're', 've', 'y', 'ain', 'aren', "aren't", 'couldn', "couldn't", 'didn', "didn't", 'doesn', "doesn't", 'hadn', "hadn't", 'hasn', "hasn't", 'haven', "haven't", 'isn', "isn't", 'ma', 'mightn', "mightn't", 'mustn', "mustn't", 'needn', "needn't", 'shan', "shan't", 'shouldn', "shouldn't", 'wasn', "wasn't", 'weren', "weren't", 'won', "won't", 'wouldn', "wouldn't"]
    stopwords += ['https', 'www', 'one', 'would', 'come', 'really', 'also', 'com', 'gt', 'r', '737yli']
    stopwords += ['get', 'even', 'make', 'go', 'still', 'could', 'got', 'goes', '2', 'first', 'going', 'right', 'sure', 'something', 'http', 'well', 'back', 'though']
    return [word for word in basket if word not in stopwords]
    
udf_filter_words = F.udf(filter_words, ArrayType(StringType()))

In [6]:
df_fp1 = df_fp.withColumn('body', udf_filter_words(udf_to_basket_unique('body'))).select('body').filter(F.size(F.col('body')) > 5)

In [7]:
fpGrowth = FPGrowth(itemsCol="body", minSupport=0.01, minConfidence=0.05)
model = fpGrowth.fit(df_fp1)

In [8]:
# Display frequent itemsets.
model.freqItemsets.sort("freq", ascending = False).show()

+----------+-----+
|     items| freq|
+----------+-----+
|    [like]|14524|
|  [people]|10526|
|   [think]| 9331|
|    [time]| 7722|
|    [know]| 7474|
|    [good]| 6763|
|    [much]| 6220|
|     [way]| 5746|
|     [see]| 5560|
|    [want]| 5096|
|     [say]| 4689|
|   [never]| 4431|
|   [thing]| 4336|
|[actually]| 4185|
|  [things]| 3975|
|    [work]| 3949|
|     [use]| 3933|
|    [need]| 3838|
|    [take]| 3743|
|  [pretty]| 3740|
+----------+-----+
only showing top 20 rows



In [9]:
# Display generated association rules.
antecedent = 'think'

model.associationRules.filter(F.array_contains('antecedent', antecedent)).sort('confidence', ascending=False).show()

+---------------+----------+-------------------+------------------+
|     antecedent|consequent|         confidence|              lift|
+---------------+----------+-------------------+------------------+
|[think, people]|    [like]| 0.3877214668314792|2.2877008085277657|
|  [think, like]|  [people]|0.37370929308975376| 3.042538978711061|
|        [think]|    [like]| 0.2698531775801093|1.5922340787030174|
|        [think]|  [people]|0.26010073947058193| 2.117599569676084|
|        [think]|    [know]|0.15614617940199335|1.7903745164854996|
|        [think]|    [time]|0.15346693816311222|1.7031411810106483|
|        [think]|    [good]| 0.1401779016182617|1.7762569325713697|
|        [think]|    [much]|0.13728432108027006| 1.891455701545965|
|        [think]|     [way]| 0.1336405529953917|1.9931420936383717|
|        [think]|     [see]|0.11960132890365449|1.8434307703338992|
|        [think]|     [say]|0.11081341764012431| 2.025245777672368|
|        [think]|    [want]|0.10899153359768514|

In [10]:
# transform examines the input items against all the association rules and summarize the
# consequents as prediction
model.transform(df_fp1).show()

+--------------------+--------------------+
|                body|          prediction|
+--------------------+--------------------+
|[cat, dog, consid...|                  []|
|[center, question...|      [people, like]|
|[someone, continu...|      [people, like]|
|[czechs, system, ...|                  []|
|[someone, handout...|[think, know, peo...|
|[level, developme...|[think, know, peo...|
|[person, least, t...|[think, point, ac...|
|[specific, trying...|      [like, people]|
|[assume, thing, w...|[know, people, ti...|
|[plate, comments,...|                  []|
|[god, new, gibbon...|              [like]|
|[interesting, cre...|                  []|
|[fine, protecting...|[people, like, th...|
|[beach, place, fo...|      [like, people]|
|[house, visit, bo...|                  []|
|[gold, fuckload, ...|                  []|
|[promote, kills, ...|[point, actually,...|
|[komedi, dizi, da...|                  []|
|[30, median, quit...|              [like]|
|[anything, joke, ...|          

In [11]:
### END TIMER

end_clock = time()

runtime = end_clock - start_clock

print(f'Run finished. Experiment run on {sampled_count} comments. Runtime resulted in {runtime} seconds.')

Run finished. Experiment run on 144339 comments. Runtime resulted in 136.61919116973877 seconds.
