# Louvain Algorithm

In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

# Create spark context
conf = SparkConf().setAll([('spark.driver.memory','20g')])
sc = SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()
sc

In [2]:
import pandas as pd
import numpy as np
import networkx as nx
import matplotlib.pyplot as plt
import pickle
from tqdm import tqdm

graph_df = spark.read.parquet("../data/export/subreddit_100_v2.parquet")
graph_df.createOrReplaceTempView("comments")

In [3]:
spark.sql("SELECT COUNT(*) FROM comments").show()

+--------+
|count(1)|
+--------+
|43487526|
+--------+



In [4]:
spark.sql("SELECT * FROM comments LIMIT 1").toPandas()

Unnamed: 0,id,link_id,parent_id,created_utc,author,author_fullname,subreddit_id,subreddit,subreddit_num_comments,subreddit_ranking,score,controversiality,stickied,t_user_sub_comments
0,epueh5f,t3_bvwsxh,t1_epue3o2,1559508424,JuliaH7931,t2_10003s,t5_2qh1i,AskReddit,7292189,1,1,0,False,1


In [5]:
# identify & remove bots - 
# could also use highest node degree

query = """

    WITH cte_distinct AS (
        SELECT DISTINCT
            author,
            author_fullname,
            subreddit
        FROM comments
    )
    
    SELECT
        author,
        author_fullname,
        COUNT(*) num_counts
    FROM cte_distinct
    GROUP BY 1,2
    ORDER BY num_counts DESC

"""

user_submissions_df = spark.sql(query).toPandas()
num_sub_bins_series = user_submissions_df['num_counts'].value_counts(ascending=True, bins=50).cumsum()
cutoff = num_sub_bins_series.quantile(0.75)
print("cut-off:", cutoff)

results = num_sub_bins_series[num_sub_bins_series > cutoff]
max_sub_count = int(np.ceil(results.index.to_tuples().max()[1]))
print("max subreddit threshold:", max_sub_count)

results

cut-off: 2248.75
max subreddit threshold: 27


(24.52, 26.48]       2457
(22.56, 24.52]       3734
(20.6, 22.56]        5710
(18.64, 20.6]        8915
(16.68, 18.64]      14229
(14.72, 16.68]      23233
(12.76, 14.72]      38719
(10.8, 12.76]       65445
(8.84, 10.8]       113242
(6.88, 8.84]       202587
(4.92, 6.88]       387306
(2.96, 4.92]       857819
(0.901, 2.96]     3225996
Name: num_counts, dtype: int64

In [6]:
user_submissions_df.head()

Unnamed: 0,author,author_fullname,num_counts
0,nkid299,t2_30k2nng,99
1,AutoModerator,t2_6l4z3,89
2,BadDadBot,t2_3vmgbuw8,87
3,WaitingToBeTriggered,t2_3qhv112i,87
4,LeEpicRedditor69,t2_3y359z0f,86


In [7]:
%%time

try: del user_submissions_df 
except: pass

min_comments = 5

query = f"""

    -- finds distinct user-subreddit associations
    WITH cte_distinct AS (  
        SELECT DISTINCT
            author,
            author_fullname,
            subreddit
        FROM comments
    ),
    
    -- finds users that have commented on less than `max_sub_count` distinct subreddits
    cte_cut AS (   
        SELECT
            author,
            author_fullname,
            COUNT(*) num_counts
        FROM cte_distinct
        GROUP BY 1,2
        HAVING num_counts < {max_sub_count}
    )
    
    -- finds distinct user-subreddit associations where they have commented at least `min_comments` times
    -- & have not posted on more than `max_sub_count` distinct subreddits
    SELECT DISTINCT
        c.author,
        c.author_fullname,
        c.subreddit,
        c.subreddit_ranking,
        c.t_user_sub_comments,
        cte_cut.num_counts subreddits_followed
    FROM comments c
    INNER JOIN cte_cut ON cte_cut.author_fullname = c.author_fullname
    WHERE c.t_user_sub_comments >= {min_comments}

-- note: subreddits_followed does not take into account min_comments

"""

subset_sdf = spark.sql(query)
# subset_sdf.write.parquet("../data/export/filtered_dataset_v2.parquet", compression='snappy')

subset_df = subset_sdf.toPandas()

# subset_df = spark.sql("SELECT * FROM comments").sample(withReplacement=False, fraction=0.01).toPandas()
# subset_df = subset_df.groupby(['author_fullname', 'subreddit'], as_index=False)['id'].count().rename(columns={'id':'num_comments'})
# subset_df = spark.sql("SELECT DISTINCT author_fullname, subreddit FROM comments WHERE t_user_sub_comments >= 3").toPandas()
print(len(subset_df))
subset_df.head()

1470229
CPU times: user 4.25 s, sys: 47.9 ms, total: 4.3 s
Wall time: 14.6 s


Unnamed: 0,author,author_fullname,subreddit,subreddit_ranking,t_user_sub_comments,subreddits_followed
0,Flaming_Dorito_,t2_1010sbi5,RoastMe,41,5,9
1,iammesowhoareyou,t2_102nty,AmItheAsshole,7,10,1
2,PsystrikeSmash,t2_102znx,AskReddit,1,8,6
3,PsystrikeSmash,t2_102znx,teenagers,3,13,6
4,bluewave41,t2_103nb8,AmItheAsshole,7,18,3


In [8]:
subreddit_rankings_df = subset_df[['subreddit', 'subreddit_ranking']].drop_duplicates(ignore_index=True).sort_values(by='subreddit_ranking')
subreddit_rankings_df

Unnamed: 0,subreddit,subreddit_ranking
2,AskReddit,1
23,nba,2
3,teenagers,3
29,politics,4
19,memes,5
...,...,...
54,iamatotalpieceofshit,96
33,de,97
87,buildapc,98
21,PS4,99


In [9]:
%%time

user_nodes = subset_df['author_fullname'].unique()
subreddit_nodes = subset_df['subreddit'].unique()
edges = [(u, s) for u, s in zip(subset_df['author_fullname'].values, subset_df['subreddit'].values)]

print("Unique users:", len(user_nodes))
print("Unique subreddits:", len(subreddit_nodes))
print("Num edges:", len(edges))

Unique users: 893043
Unique subreddits: 100
Num edges: 1470229
CPU times: user 390 ms, sys: 7.57 ms, total: 398 ms
Wall time: 396 ms


In [10]:
%%time

from networkx.algorithms import bipartite

B = nx.Graph()
B.add_nodes_from(user_nodes, bipartite='user')
B.add_nodes_from(subreddit_nodes, bipartite='subreddit')
B.add_edges_from(edges)
print(nx.info(B))

Name: 
Type: Graph
Number of nodes: 893143
Number of edges: 1470229
Average degree:   3.2923
CPU times: user 3.55 s, sys: 91.9 ms, total: 3.64 s
Wall time: 3.64 s


## Louvain Algorithm

In [11]:
%%time

import community
from networkx.algorithms.bipartite.projection import overlap_weighted_projected_graph

G = overlap_weighted_projected_graph(B, subreddit_nodes, jaccard=True)

print(nx.info(G))

Name: 
Type: Graph
Number of nodes: 100
Number of edges: 4814
Average degree:  96.2800
CPU times: user 31.7 s, sys: 3.91 ms, total: 31.7 s
Wall time: 31.7 s


In [12]:
results_list = sorted(list(G.edges(data=True)), key=lambda x: x[2]['weight'], reverse=True)
results_list[:10]

[('worldnews', 'news', {'weight': 0.15952791406975203}),
 ('memes', 'dankmemes', {'weight': 0.14537077491361164}),
 ('funny', 'pics', {'weight': 0.13671098288432298}),
 ('fo76', 'Market76', {'weight': 0.12769520619635755}),
 ('worldnews', 'politics', {'weight': 0.11230484409393723}),
 ('worldnews', 'todayilearned', {'weight': 0.10655380450989838}),
 ('teenagers', 'memes', {'weight': 0.10558927668184118}),
 ('politics', 'news', {'weight': 0.1017155451749264}),
 ('teenagers', 'dankmemes', {'weight': 0.10084801349325337}),
 ('pics', 'todayilearned', {'weight': 0.0991111587063611})]

In [13]:
partition = community.best_partition(G, weight='weight', resolution=0.5)
print("modularity:", community.modularity(partition, G, weight='weight'))
print("number of communities:", len(set(partition.values())))

modularity: 0.13487432677172495
number of communities: 22


In [14]:
# top 5
# in same community, list top 5 most popular (based on num_comments)

def get_louvain_subreddits(subreddit, partition_dict):
    
    community = partition_dict[subreddit]
    subreddits = [sub for sub, com in partition_dict.items() if com == community]
    
    return subreddits

get_louvain_subreddits('MMA', partition)

['formula1',
 'FIFA',
 'SquaredCircle',
 'nba',
 'baseball',
 'fantasybaseball',
 'barstoolsports',
 'soccer',
 'reddevils',
 'lakers',
 'hockey',
 'Cricket',
 'nfl',
 'hiphopheads',
 'MMA',
 'torontoraptors',
 'canada',
 'NYYankees']

In [15]:
def get_louvain_preds(subreddit_tuple_list, partition, subreddit_rankings_df):
    
    """subreddit_tuple_list: list of (subreddit, num_comments by user) tuples"""
    
    already_commented = [x[0] for x in subreddit_tuple_list]
    fav_subreddit = max(subreddit_tuple_list, key=lambda x: x[1])[0]
    subreddits = get_louvain_subreddits(fav_subreddit, partition)
    filtered_subreddits = list(set(subreddits).difference(set(already_commented)))
    ranked_subreddits = subreddit_rankings_df[subreddit_rankings_df['subreddit'].isin(filtered_subreddits)].sort_values(by='subreddit_ranking')

    return list(ranked_subreddits['subreddit'][:10])
    
get_louvain_preds([('nba', 34), ('apexlegends', 12)], partition, subreddit_rankings_df)

['soccer',
 'SquaredCircle',
 'Cricket',
 'hockey',
 'nfl',
 'MMA',
 'formula1',
 'reddevils',
 'baseball',
 'FIFA']

In [16]:
def get_louvain_preds_all(subreddit_tuple_list, partition, subreddit_rankings_df):
    
    """subreddit_tuple_list: list of (subreddit, num_comments by user) tuples"""
    
    already_commented = [x[0] for x in subreddit_tuple_list]
    fav_subreddit = max(subreddit_tuple_list, key=lambda x: x[1])[0]
    subreddits = get_louvain_subreddits(fav_subreddit, partition)
    filtered_subreddits = list(set(subreddits).difference(set(already_commented)))
    ranked_subreddits = subreddit_rankings_df[subreddit_rankings_df['subreddit'].isin(filtered_subreddits)].sort_values(by='subreddit_ranking')

    return list(ranked_subreddits['subreddit'])
    
get_louvain_preds([('nba', 34), ('apexlegends', 12)], partition, subreddit_rankings_df)

['soccer',
 'SquaredCircle',
 'Cricket',
 'hockey',
 'nfl',
 'MMA',
 'formula1',
 'reddevils',
 'baseball',
 'FIFA']

## Evaluation

In [17]:
masked_test = pd.read_csv("../data/evaluation/v2/test_2k_mask_answer_v2.csv")[['author_fullname', 'subreddit']]
masked_dict = dict(zip(masked_test['author_fullname'], masked_test['subreddit']))

test_data = pd.read_csv("../data/evaluation/v2/test_2k_unmasked.csv")

user_history = test_data.groupby(
    'author_fullname'
)[['subreddit', 't_user_sub_comments']].apply(
    lambda x: list(map(tuple, x.values))
).to_dict()

In [18]:
%%time

results = pd.DataFrame()
results['author_fullname'] = list(user_history.keys())
results['history'] = results['author_fullname'].apply(lambda x: user_history[x])
results['pred_list'] = results['history'].apply(
    lambda x: get_louvain_preds(x, partition, subreddit_rankings_df)
)
results['masked'] = results['author_fullname'].apply(lambda x: masked_dict[x])
results['top1'] = results.apply(lambda x: x['masked'] in x['pred_list'][:1], axis=1)
results['top5'] = results.apply(lambda x: x['masked'] in x['pred_list'][:5], axis=1)
results['top10'] = results.apply(lambda x: x['masked'] in x['pred_list'][:10], axis=1)

results['community_preds'] = results['history'].apply(
    lambda x: get_louvain_preds_all(x, partition, subreddit_rankings_df)
)
results['in_community'] = results.apply(lambda x: x['masked'] in x['community_preds'], axis=1)

results = results[results['pred_list'].str.len() > 0]

results.head()

CPU times: user 2.42 s, sys: 24.2 ms, total: 2.45 s
Wall time: 2.42 s


Unnamed: 0,author_fullname,history,pred_list,masked,top1,top5,top10,community_preds,in_community
0,t2_106bpk58,"[(iamatotalpieceofshit, 5)]","[trashy, PublicFreakout, insanepeoplefacebook]",Animemes,False,False,False,"[trashy, PublicFreakout, insanepeoplefacebook]",False
1,t2_10aj9l3k,"[(Showerthoughts, 80), (AskReddit, 6)]",[mildlyinteresting],conspiracy,False,False,False,[mildlyinteresting],False
3,t2_10ckl1le,"[(tifu, 5)]",[personalfinance],PS4,False,False,False,[personalfinance],False
4,t2_10fhvx,"[(funny, 5)]","[pics, aww]",AmItheAsshole,False,False,False,"[pics, aww]",False
7,t2_10o07a,"[(SquaredCircle, 59), (AskReddit, 6)]","[nba, soccer, Cricket, hockey, nfl, MMA, formu...",Games,False,False,False,"[nba, soccer, Cricket, hockey, nfl, MMA, formu...",False


In [19]:
results['top1'].mean()

0.1220216606498195

In [20]:
results['top5'].mean()

0.30180505415162456

In [21]:
results['top10'].mean()

0.3703971119133574

In [22]:
results['in_community'].mean()

0.4043321299638989