# Subreddit Explorer

This project was originally completed using 97 million comments from September 2018. However, Git has a 100 MB file size limit. Therefore, I created a sample set of approximately 1 million comments and used it below.
<br><br>
The results are decent using the sample set (but not as good as the results from the full data set). On the plus side, the entire notebook finishes running in under 2 minutes on my machine.

### Initialize Spark
- I used a 4-core CPU with 16 GB of memory.
- These settings may need to be adjusted for your machine.

In [1]:
import pyspark as ps
conf = ps.SparkConf().setAll([('spark.executor.memory', '6g'), 
                                   ('spark.executor.cores', '3'),
                                   ('spark.driver.memory','6g')])
spark = ps.sql.SparkSession.builder.config(conf=conf).getOrCreate()

### Load sample comments

In [2]:
sample_sept = spark.read.csv('sample_sept.csv/', header=True)

In [3]:
sample_sept.count()

964524

In [4]:
sample_sept.printSchema()

root
 |-- author_fullname: string (nullable = true)
 |-- author: string (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- subreddit_id: string (nullable = true)



### Drop rows where author_fullname = null
- These are users where the account was deleted after the comment was created.
- The sample comments file should not have any of these.
- However, this is an important step if you are using your own source data.

In [5]:
# sample_sept = sample_sept.filter(sample_sept.author_fullname != 'null')

### Convert subreddit_id to base 10
- Original format is base 36 (0-9 and a-z)
- New column is 'sr_id'

In [6]:
from pyspark.sql.functions import conv
from pyspark.sql.functions import substring
from pyspark.sql.types import IntegerType

In [7]:
# substring is used to remove prefix 't5_'
base10_df = sample_sept.withColumn('sr_id', 
            conv(substring(sample_sept.subreddit_id, 4, 
            8), 36, 10).cast(IntegerType()))

In [8]:
base10_df.show(5)

+---------------+---------------+-----------------+------------+-------+
|author_fullname|         author|        subreddit|subreddit_id|  sr_id|
+---------------+---------------+-----------------+------------+-------+
|       t2_dca4l|Doctah_Whoopass|             cars|    t5_2qhl2|4595078|
|       t2_m9hec|     buggiegirl|            funny|    t5_2qh33|4594431|
|       t2_qq3rl|     Aguas-chan|        AskReddit|    t5_2qh1i|4594374|
|    t2_1qm8eh7n|  DenverNuggetz|interestingasfuck|    t5_2qhsa|4595338|
|       t2_y99ws|        progwok|        AskReddit|    t5_2qh1i|4594374|
+---------------+---------------+-----------------+------------+-------+
only showing top 5 rows



In [9]:
base10_df.printSchema()

root
 |-- author_fullname: string (nullable = true)
 |-- author: string (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- subreddit_id: string (nullable = true)
 |-- sr_id: integer (nullable = true)



### Group rows to create rating column
- Each row represents a comment
- Multiple comments in a subreddit by the same person need to be grouped
- Rating column represents the number of comments (per subreddit)
- This is the format required by PySpark ALS

In [10]:
from pyspark.sql.functions import count
from pyspark.sql.functions import lit
grouped_df = base10_df.groupBy(base10_df.columns).agg(count(lit(1)).alias('rating'))

In [11]:
grouped_df.count()

198312

In [12]:
grouped_df.show(5)

+---------------+-----------------+---------+------------+-------+------+
|author_fullname|           author|subreddit|subreddit_id|  sr_id|rating|
+---------------+-----------------+---------+------------+-------+------+
|       t2_5uyfq|         tman916x|      nba|    t5_2qo4s|4603564|    61|
|       t2_99q54|         jmeshvrd|     tifu|    t5_2to41|4743505|     2|
|       t2_frjqq|      GhostCheese|pokemongo|    t5_34jka|5250826|    32|
|      t2_103ceu|         Shodan30| Konosuba|    t5_3c02n|5598815|     5|
|    t2_1l3cv8h9|International_Way|AskReddit|    t5_2qh1i|4594374|   162|
+---------------+-----------------+---------+------------+-------+------+
only showing top 5 rows



### Create new author identifier column
- The original base 36 author identifier will not work
    - Many of them exceed the maximum integer size for PySpark ALS
- We will instead create our own author identifier
- I tried using monotonically_increasing_id() and ZipWithUniqueId()
    - Both created ids larger than the maximum integer size for PySpark ALS
    - (When using full data set of 97 million rows)
- zipWithIndex() works but uses RDDs which adds additional steps
- StringIndexer() is easiest solution that I found

In [13]:
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import col
stringIndexer = StringIndexer(inputCol="author", outputCol="au_id")

In [14]:
si_model = stringIndexer.fit(grouped_df)
auth_df = si_model.transform(grouped_df).withColumn("au_id", 
                                     col("au_id").cast("int"))

In [15]:
# Cache here if you are doing further data exploration.
auth_df.cache()

DataFrame[author_fullname: string, author: string, subreddit: string, subreddit_id: string, sr_id: int, rating: bigint, au_id: int]

In [16]:
auth_df.show(5)

+---------------+-----------------+---------+------------+-------+------+-----+
|author_fullname|           author|subreddit|subreddit_id|  sr_id|rating|au_id|
+---------------+-----------------+---------+------------+-------+------+-----+
|       t2_5uyfq|         tman916x|      nba|    t5_2qo4s|4603564|    61| 6488|
|       t2_99q54|         jmeshvrd|     tifu|    t5_2to41|4743505|     2| 1397|
|       t2_frjqq|      GhostCheese|pokemongo|    t5_34jka|5250826|    32| 1782|
|      t2_103ceu|         Shodan30| Konosuba|    t5_3c02n|5598815|     5| 3508|
|    t2_1l3cv8h9|International_Way|AskReddit|    t5_2qh1i|4594374|   162|  617|
+---------------+-----------------+---------+------------+-------+------+-----+
only showing top 5 rows



### Create dataframe for ALS input
- ALS requires (user, item, rating) format
- All three fields must be integers

In [17]:
als_df = auth_df.select(['au_id', 'sr_id', 'rating'])

In [18]:
als_df.printSchema()

root
 |-- au_id: integer (nullable = true)
 |-- sr_id: integer (nullable = true)
 |-- rating: long (nullable = false)



---
## Run ALS implicit recommendation model
https://spark.apache.org/docs/2.2.0/mllib-collaborative-filtering.html

In [19]:
from pyspark.mllib.recommendation import ALS
als = ALS()
# This setting may help people with less memory
# sc.setCheckpointDir('/tmp')

In [20]:
als_fitted = als.trainImplicit(als_df.rdd, rank=75, iterations=5,
                                  lambda_=0.01, alpha=0.01, seed=10)

### Get items (and users) matrices
- PySpark ALS returns these in RDD format
- First, we convert these into a Spark dataframe with nested array

In [21]:
items_rdd = als_fitted.productFeatures()
# users_rdd = als_fitted.userFeatures()

In [22]:
from pyspark.sql.types import Row

def f(x):
    d = {}
    for i in range(len(x)):
        d[str(i)] = x[i]
    return d

items_df = items_rdd.map(lambda x: Row(**f(x))).toDF()
# users_df = users_rdd.map(lambda x: Row(**f(x))).toDF()

In [23]:
items_df.show(5)

+-------+--------------------+
|      0|                   1|
+-------+--------------------+
|4594300|[0.00937710888683...|
|4594600|[-0.0023872260935...|
|4594700|[-3.4862846951000...|
|4596900|[-3.3788694418035...|
|4598300|[-8.3739380352199...|
+-------+--------------------+
only showing top 5 rows



### Convert into a pandas dataframe as a true matrix
- The matrix in the dataframe is nested within a single column
- It's much easier to do matrix operations if we convert to a matrix format

In [24]:
import numpy as np
import pandas as pd

items_pd = pd.DataFrame(np.row_stack(items_df.select('1').collect()))
# users_pd = pd.DataFrame(np.row_stack(users_df.select('1').collect()))

In [25]:
items_pd.head(5)

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,65,66,67,68,69,70,71,72,73,74
0,0.009377,0.001945,0.021777,-0.004703,0.001998,-0.000648,-0.002421,-0.009784,0.003545,0.005625,...,0.0055,0.007964,-0.00973,-0.004119,-0.007043,-0.002126,-0.005411,-0.009777,-0.016776,0.007655
1,-0.002387,-0.00247,0.002993,4.7e-05,-1e-06,-0.001413,-0.00249,0.004532,-0.0046,0.004172,...,-0.003803,0.005502,0.001468,0.002383,-0.002151,0.002215,0.005457,0.0056,0.004212,-0.002091
2,-0.000349,-0.004091,-0.002694,0.000687,-0.00088,-0.001034,-0.002247,-0.002289,-0.001002,0.0001,...,2.2e-05,-0.002538,0.000222,0.00263,0.001108,0.003386,0.000761,-0.000951,0.003046,0.003184
3,-0.000338,-0.002806,-0.002333,0.000484,-0.002035,0.001499,0.000388,0.000348,0.000585,0.001262,...,0.000467,-0.003581,-0.001602,-0.000692,-0.000991,0.000854,0.000137,0.000685,0.000358,-0.00238
4,-0.000837,-0.000333,0.000532,-0.000519,0.000791,3e-05,0.000158,0.000155,-0.000722,0.001307,...,-0.001045,-0.000203,0.000142,0.000764,0.000128,-0.001107,-0.00026,-0.000582,-0.000172,-3.2e-05


### Create Sub_list to translate sr_id to row number

In [26]:
sub_list = pd.DataFrame(items_df.select('0').collect())

### Create table to translate sr_id to subreddit name
- We will need this to generate final outputs

In [27]:
sr_lookup_df = auth_df.select(['subreddit', 'sr_id']).distinct()
sr_lookup = sr_lookup_df.toPandas()

---
## Finally, find subreddits using item-item similarity
- This function returns the 10 most similar subreddits to a given input
- Based on item-item similarity from the item-feature matrix

In [28]:
subreddit_set = set(sr_lookup.iloc[:,0])

def item_item_sim(search_name):
    if str(search_name) not in subreddit_set:
        return "Error: " + str(search_name) + " is not an active subreddit"
    
    # Get subreddit id number and translate to row number
    search_id = sr_lookup[sr_lookup['subreddit'] == str(search_name)].iloc[0,1]
    search_row_num = sub_list[sub_list[0] == search_id].index[0]
    
    # Get item-feature row and calculate dot product
    item_feature_np = np.asarray(items_pd.iloc[search_row_num, :])
    item_dp = np.dot(item_feature_np, items_pd.T)
    
    # Get top ten similar rows, translate to subreddit name
    sim_rows = list(item_dp.argsort())[-11:-1][::-1]
    out_ids = sub_list.iloc[sim_rows]
    out_names = list(out_ids.join(sr_lookup.set_index('sr_id'), on=0).loc[:,'subreddit'])
    
    return out_names

In [29]:
item_item_sim('nfl')

['fantasyfootball',
 'CFB',
 'nba',
 'baseball',
 'AskMen',
 'hockey',
 'Browns',
 'quityourbullshit',
 'food',
 'hiphopheads']