In [1]:
from pyspark.sql import SparkSession

# New API
spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.83:7077") \
        .appName("Marina_Mota_Project")\
        .config("spark.dynamicAllocation.enabled", False)\
        .config("spark.shuffle.service.enabled", False)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores", 1)\
        .getOrCreate()

# Old API (RDD)
spark_context = spark_session.sparkContext

In [None]:
#Load file
df2 = spark_session.read.json("hdfs://192.168.2.83:9000/user/ubuntu/input/RC_2009-10.bz2")

In [4]:
#Initial analysis that we discarded: ranking authors by average post popularity
df3 = df3.groupBy('author').agg({'score': 'mean'}).orderBy('avg(score)', ascending=False).withColumnRenamed('avg(score)', 'mean_score')

In [10]:
df3.show()

+--------------+-----------------+
|        author|       mean_score|
+--------------+-----------------+
|TenebraeVision|             16.0|
|          Grue|             16.0|
|        fergie|             12.0|
|        bstard|             11.0|
|         sempf|9.666666666666666|
|     bolinfest|              7.0|
|      kanagawa|              7.0|
|    HiggsBoson|              7.0|
|    paulgraham|6.533333333333333|
|         arkas|              6.5|
|      dstowell|6.333333333333333|
|        kelyse|              6.0|
|     FutureAEI|              6.0|
|       gernika|              6.0|
|          chao|              6.0|
|       spolsky|              6.0|
|       AaronSw|              6.0|
|      bwringer|              6.0|
|        willfe|              6.0|
|          bosk|              5.5|
+--------------+-----------------+
only showing top 20 rows



In [2]:
import time

In [3]:
#We used this to compute the global time of the analysis
time_init = time.time()

In [14]:
#Load file
df = spark_session.read.json("hdfs://192.168.2.83:9000/user/ubuntu/input/RC_2011-06.bz2")

CPU times: user 5.15 ms, sys: 6.01 ms, total: 11.2 ms
Wall time: 1min 2s


In [None]:
df.show()

In [13]:
df.printSchema()

root
 |-- archived: boolean (nullable = true)
 |-- author: string (nullable = true)
 |-- author_flair_css_class: string (nullable = true)
 |-- author_flair_text: string (nullable = true)
 |-- body: string (nullable = true)
 |-- controversiality: long (nullable = true)
 |-- created_utc: string (nullable = true)
 |-- distinguished: string (nullable = true)
 |-- downs: long (nullable = true)
 |-- edited: string (nullable = true)
 |-- gilded: long (nullable = true)
 |-- id: string (nullable = true)
 |-- link_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- parent_id: string (nullable = true)
 |-- removal_reason: string (nullable = true)
 |-- retrieved_on: long (nullable = true)
 |-- score: long (nullable = true)
 |-- score_hidden: boolean (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- subreddit_id: string (nullable = true)
 |-- ups: long (nullable = true)



In [7]:
#We did not run this step, nor the ones above this one, in the final analysis, these are just for data visualization
df.count()

9766511

In [5]:
#Convert to RDD
reddit_rdd = df.select("body").rdd.flatMap(lambda x: x)
reddit_rdd.take(10)

['[deleted]',
 "It thought the same thing, searched hard when it seemed like it wasn't going anywhere and now I have four interviews in the next two weeks!\n\nKeep on keepin' on broseph!",
 "He has tons of counters in the laning phase:\n\n- Vindicator\n- Demented Shaman\n- Pharaoh\n- Silhouette (but honestly, who does she not counter?)\n- Jeraziah\n- Tundra\n- Torturer\n- Succubus\n\netc.\n\nLater it's just hard disables and good initiation, Hellflower is incredibly good at shutting him down, any true disabler too.",
 "I know exactly how you feel man.  It's amazing how entry-level positions now require 2-3 years experience and a couple of internships aren't enough.",
 "&gt;Then play soldier or scout.\n\nCan't take on a heavy/medic.",
 "Hmm...that's quite a catch of a man you have there. Well done...could you possibly name your house The USS Enterprise or would that be copyright infringement? =P",
 "Ironically I get a lot of my 'major' Euro news from Al-Jazeera.\n\nAfter that it's all l

In [6]:
#Filter out deleted posts
reddit_rdd2 = reddit_rdd.filter(lambda row: row != "[deleted]")
reddit_rdd2.take(10)

["It thought the same thing, searched hard when it seemed like it wasn't going anywhere and now I have four interviews in the next two weeks!\n\nKeep on keepin' on broseph!",
 "He has tons of counters in the laning phase:\n\n- Vindicator\n- Demented Shaman\n- Pharaoh\n- Silhouette (but honestly, who does she not counter?)\n- Jeraziah\n- Tundra\n- Torturer\n- Succubus\n\netc.\n\nLater it's just hard disables and good initiation, Hellflower is incredibly good at shutting him down, any true disabler too.",
 "I know exactly how you feel man.  It's amazing how entry-level positions now require 2-3 years experience and a couple of internships aren't enough.",
 "&gt;Then play soldier or scout.\n\nCan't take on a heavy/medic.",
 "Hmm...that's quite a catch of a man you have there. Well done...could you possibly name your house The USS Enterprise or would that be copyright infringement? =P",
 "Ironically I get a lot of my 'major' Euro news from Al-Jazeera.\n\nAfter that it's all local or region

In [7]:
#Generate the wordlist
import re
def lower_rdd(lines):
      lines = lines.lower()
      words = re.findall(r'[a-zA-Z]+', lines)
      return words
reddit_words = reddit_rdd2.map(lower_rdd)
reddit_words.take(10)

[['it',
  'thought',
  'the',
  'same',
  'thing',
  'searched',
  'hard',
  'when',
  'it',
  'seemed',
  'like',
  'it',
  'wasn',
  't',
  'going',
  'anywhere',
  'and',
  'now',
  'i',
  'have',
  'four',
  'interviews',
  'in',
  'the',
  'next',
  'two',
  'weeks',
  'keep',
  'on',
  'keepin',
  'on',
  'broseph'],
 ['he',
  'has',
  'tons',
  'of',
  'counters',
  'in',
  'the',
  'laning',
  'phase',
  'vindicator',
  'demented',
  'shaman',
  'pharaoh',
  'silhouette',
  'but',
  'honestly',
  'who',
  'does',
  'she',
  'not',
  'counter',
  'jeraziah',
  'tundra',
  'torturer',
  'succubus',
  'etc',
  'later',
  'it',
  's',
  'just',
  'hard',
  'disables',
  'and',
  'good',
  'initiation',
  'hellflower',
  'is',
  'incredibly',
  'good',
  'at',
  'shutting',
  'him',
  'down',
  'any',
  'true',
  'disabler',
  'too'],
 ['i',
  'know',
  'exactly',
  'how',
  'you',
  'feel',
  'man',
  'it',
  's',
  'amazing',
  'how',
  'entry',
  'level',
  'positions',
  'now',


In [8]:
#Generate a list containing stop words
from nltk.corpus import stopwords
stopword_list = stopwords.words('english')
stopword_list.append('http')
stopword_list.append('com')

In [9]:
#Remove stop words from the general list of words
def removeStopWordsFunct(x):
    filteredSentence = [w for w in x if not w in stopword_list]
    return filteredSentence
stopword_reddit = reddit_words.map(removeStopWordsFunct)

In [10]:
stopword_reddit.take(10)

[['thought',
  'thing',
  'searched',
  'hard',
  'seemed',
  'like',
  'going',
  'anywhere',
  'four',
  'interviews',
  'next',
  'two',
  'weeks',
  'keep',
  'keepin',
  'broseph'],
 ['tons',
  'counters',
  'laning',
  'phase',
  'vindicator',
  'demented',
  'shaman',
  'pharaoh',
  'silhouette',
  'honestly',
  'counter',
  'jeraziah',
  'tundra',
  'torturer',
  'succubus',
  'etc',
  'later',
  'hard',
  'disables',
  'good',
  'initiation',
  'hellflower',
  'incredibly',
  'good',
  'shutting',
  'true',
  'disabler'],
 ['know',
  'exactly',
  'feel',
  'man',
  'amazing',
  'entry',
  'level',
  'positions',
  'require',
  'years',
  'experience',
  'couple',
  'internships',
  'enough'],
 ['gt', 'play', 'soldier', 'scout', 'take', 'heavy', 'medic'],
 ['hmm',
  'quite',
  'catch',
  'man',
  'well',
  'done',
  'could',
  'possibly',
  'name',
  'house',
  'uss',
  'enterprise',
  'would',
  'copyright',
  'infringement',
  'p'],
 ['ironically',
  'get',
  'lot',
  'major'

In [11]:
#Count the number of occurrences of every word
def count_words(lines):
    counts = lines.flatMap(lambda line: line) \
        .map(lambda word: (word, 1)) \
        .reduceByKey(lambda a, b: a + b) 
    return counts.sortBy(lambda a: a[1], False)

In [12]:
#Compute the time of the final word count step
%%time
word_counts = count_words(stopword_reddit)
word_counts.collect()

[('like', 1336082),
 ('would', 1007222),
 ('one', 972899),
 ('people', 968006),
 ('get', 919777),
 ('think', 770083),
 ('know', 635489),
 ('time', 630918),
 ('really', 615540),
 ('good', 588170),
 ('gt', 513810),
 ('also', 496955),
 ('much', 494461),
 ('make', 487558),
 ('even', 480145),
 ('see', 463930),
 ('go', 454549),
 ('well', 453212),
 ('way', 452084),
 ('want', 445451),
 ('www', 433445),
 ('could', 427470),
 ('going', 394079),
 ('something', 390411),
 ('right', 390294),
 ('say', 377506),
 ('still', 375270),
 ('never', 350001),
 ('first', 328446),
 ('got', 323671),
 ('thing', 321154),
 ('actually', 320406),
 ('work', 318130),
 ('amp', 317455),
 ('back', 317261),
 ('need', 309452),
 ('use', 306063),
 ('things', 304527),
 ('pretty', 303039),
 ('though', 298917),
 ('sure', 296629),
 ('better', 288585),
 ('take', 288355),
 ('someone', 286832),
 ('lot', 280337),
 ('us', 279557),
 ('game', 270954),
 ('said', 256334),
 ('many', 253553),
 ('day', 250207),
 ('years', 250041),
 ('every', 2

In [13]:
#Compute the difference of time when the analysis started and when it finished
time_final = time.time()
time_final-time_init

258.3187446594238

In [15]:
# release the cores for another application!
spark_context.stop()