# Correlation experiment


In [1]:
from pyspark.sql import SparkSession
import json
import time

In [2]:
# New API
spark_session = SparkSession\
        .builder\
        .master("spark://sp-master:7077") \
        .appName("correlation_experiment")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.dynamicAllocation.shuffleTracking.enabled",True)\
        .config("spark.shuffle.service.enabled", False)\
        .config("spark.dynamicAllocation.executorIdleTimeout","300s")\
        .config("spark.executor.memory","2g")\
        .config("spark.executor.cores",2)\
        .config("spark.driver.port",9998)\
        .config("spark.blockManager.port",10005)\
        .config("spark.shuffle.service.enabled", "false")\
        .config("spark.dynamicAllocation.enabled", "false")\
        .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/08 09:34:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Load data from HDFS

In [3]:
json_lines = spark_session.sparkContext.textFile("hdfs://sp-master:9000/reddit_comment_data/RC_2011-01.json")

In [4]:
json_lines.take(1)

                                                                                

['{"archived":true,"downs":0,"link_id":"t3_etyqc","score_hidden":false,"id":"c1b06fp","author_flair_css_class":null,"body":"They should add that to the instructions on the box :p","ups":1,"distinguished":null,"gilded":0,"edited":false,"retrieved_on":1426664469,"parent_id":"t1_c1azvxa","created_utc":"1293840000","subreddit":"sex","controversiality":0,"author_flair_text":null,"score":1,"name":"t1_c1b06fp","author":"SandRider","subreddit_id":"t5_2qh3p"}']

# Start time measuring


In [5]:
start_time = time.time()

# Convert each line to JSON/Dictionary objects

In [6]:
json_objs = json_lines.map(lambda line: json.loads(line))

In [7]:
json_objs.take(1)

                                                                                

[{'archived': True,
  'downs': 0,
  'link_id': 't3_etyqc',
  'score_hidden': False,
  'id': 'c1b06fp',
  'author_flair_css_class': None,
  'body': 'They should add that to the instructions on the box :p',
  'ups': 1,
  'distinguished': None,
  'gilded': 0,
  'edited': False,
  'retrieved_on': 1426664469,
  'parent_id': 't1_c1azvxa',
  'created_utc': '1293840000',
  'subreddit': 'sex',
  'controversiality': 0,
  'author_flair_text': None,
  'score': 1,
  'name': 't1_c1b06fp',
  'author': 'SandRider',
  'subreddit_id': 't5_2qh3p'}]

# Extract subreddit and author

In [8]:
# We also remove authors named "[deleted]" here.
subreddit_and_author = json_objs.map(lambda obj: (obj["subreddit"], obj["author"]))\
    .filter(lambda sa: sa[1] != "[deleted]")

In [9]:
subreddit_and_author.take(20)

[('sex', 'SandRider'),
 ('relationship_advice', 'throwaway-o'),
 ('DebateAChristian', 'Basilides'),
 ('scifi', 'zachm'),
 ('Seattle', 'BarbieDreamHearse'),
 ('google', 'eroq'),
 ('gaming', 'ramp_tram'),
 ('gaming', 'RevLoki'),
 ('lists', 'xsvfan'),
 ('atheism', 'Helen_A_Handbasket'),
 ('funny', 'lanedek'),
 ('politics', 'mothereffingteresa'),
 ('netsec', 'grutz'),
 ('gaming', 'MainlandX'),
 ('Art', 'fricken'),
 ('techsupport', 'megadert'),
 ('beer', 'DamnJester'),
 ('funny', 'cole1114'),
 ('funny', 'broken189'),
 ('WTF', 'pi_over_3')]

# Group authors by subreddit

In [10]:
grouped = subreddit_and_author.groupBy(lambda sa: sa[0])\
    .map(lambda sl: (sl[0], list(sl[1])))\
    .map(lambda sl: (sl[0], list(map(lambda t: t[1], sl[1]))))

In [11]:
#grouped.take(1)

# Remove duplicate authors

In [12]:
grouped_authors = grouped.map(lambda sas: (sas[0], list(dict.fromkeys(sas[1]))))

In [13]:
#grouped_authors.take(1)

# Compare subreddit authors to all other subreddits and find out which subreddits have the most authors in common

Data format: (subreddit1, subreddit2, same_authors 1000)

In [14]:
sorted_by_popularity = grouped_authors.sortBy(lambda sas: len(sas[1]), False)

                                                                                

In [15]:
#sorted_by_popularity.take(1)

In [16]:
# Returns the number of authors that are in both the author lists.
def count_common_authors(author_list_1, author_list_2):
    return len(set(author_list_1).intersection(author_list_2))

In [17]:
# Steps:
# 1: cartesian with itself, gives us ((subreddit, authors), (subreddit, authors))
# 2: filter to remove pairs where the subreddits are the same, eg. (("pics", authors), ("pics", authors))
# 3: map to get (subreddit1, subreddit2, common_authors)
# 4 & 5: map to sort subreddit names and remove duplicates, i.e to remove 
#        ("pics", "AskReddit", common_authors) and ("AskReddit", "pics", common_authors)
# 6: distinct to remove duplicates, i.e only one ("AskReddit", "pics", common_authors)
# 7: sort by the number of common authors

comparison_rdd = sorted_by_popularity.cartesian(sorted_by_popularity)\
    .filter(lambda pair: pair[0][0] != pair[1][0])\
    .map(lambda pair: (pair[0][0], pair[1][0], count_common_authors(pair[0][1], pair[1][1])))\
    .map(lambda triple: (sorted((triple[0], triple[1])), triple[2]))\
    .map(lambda pair: (pair[0][0], pair[0][1], pair[1]))\
    .distinct()\
    .sortBy(lambda tuple: tuple[2], False)

                                                                                

# Results
The pair of subreddits with the highest number of common authors commenting on the subreddits. The data can be used to indicate that users who are commenting on a specific subreddit are also likely to comment on the corresponding subreddit next to it.

In [18]:
comparison_rdd.take(10)

                                                                                

[('AskReddit', 'pics', 49188),
 ('AskReddit', 'reddit.com', 38486),
 ('pics', 'reddit.com', 37559),
 ('WTF', 'pics', 34574),
 ('funny', 'pics', 34421),
 ('AskReddit', 'WTF', 33569),
 ('AskReddit', 'funny', 32306),
 ('gaming', 'pics', 28104),
 ('AskReddit', 'gaming', 27741),
 ('WTF', 'reddit.com', 27499)]

# Mesure end time

In [19]:
end_time = time.time()
duration = end_time - start_time
print("Experiment took", duration)

Experiment took  424.98576164245605


In [20]:
spark_session.stop()