In [1]:
%matplotlib inline

In [2]:
spark.conf.set("spark.sql.crossJoin.enabled", True)

In [3]:
# IMPORTS
import json
import pyspark.sql.functions as f
from pyspark.sql.window import Window
import numpy as np
import re
import os

from pyspark.sql.functions import udf
from pyspark.ml.linalg import VectorUDT
from pyspark.ml.linalg import DenseVector

In [4]:
k = 100 # of top subreddits to pick

glove_dim = 50
glove = sc.textFile("/mnt/blobmount/glove.6B.50d.txt")
glove = glove.map(lambda x: (x.split(" ")[0], [float(val) for val in x.split(" ")[1:]]  ))

In [5]:
features_bc = sc.broadcast(['body', 'controversiality', 'created_utc', 'id', 'parent_id', 'edited', 'gilded', 'subreddit'])
label = 'score'

glove_map_broadcast = sc.broadcast(glove.collectAsMap())

stop_words_broadcast = sc.broadcast(set(['ourselves', 'hers', 'between', 'yourself', 'but', 'again', 'there', 'about', 'once', 'during', 'out', 'very', 'having', 'with', 'they', 'own', 'an', 'be', 'some', 'for', 'do', 'its', 'yours', 'such', 'into', 'of', 'most', 'itself', 'other', 'off', 'is', 's', 'am', 'or', 'who', 'as', 'from', 'him', 'each', 'the', 'themselves', 'until', 'below', 'are', 'we', 'these', 'your', 'his', 'through', 'don', 'nor', 'me', 'were', 'her', 'more', 'himself', 'this', 'down', 'should', 'our', 'their', 'while', 'above', 'both', 'up', 'to', 'ours', 'had', 'she', 'all', 'no', 'when', 'at', 'any', 'before', 'them', 'same', 'and', 'been', 'have', 'in', 'will', 'on', 'does', 'yourselves', 'then', 'that', 'because', 'what', 'over', 'why', 'so', 'can', 'did', 'not', 'now', 'under', 'he', 'you', 'herself', 'has', 'just', 'where', 'too', 'only', 'myself', 'which', 'those', 'i', 'after', 'few', 'whom', 't', 'being', 'if', 'theirs', 'my', 'against', 'a', 'by', 'doing', 'it', 'how', 'further', 'was', 'here', 'than']))

types_bc = sc.broadcast({
  'body': str, 
  'controversiality': int, 
  'created_utc': int, 
  'edited': bool, 
  'gilded': int, 
  'score': int,
  'subreddit': str
})

In [6]:
def filter_dict(json_dict):
  if json_dict['score'] is None:
    return False
  if 'id' not in json_dict or json_dict['id'] is None:
    return False

  return True
  
# Takes in a dict and returns a dict with only the feature and label keys. Also do type checking
def extract_dict(json_dict):
  new_dict = {}
  
  for key in json_dict: 
    if key in features_bc.value or key == label:
      if key in types_bc.value:
        t = types_bc.value[key]
        new_dict[key] = t(json_dict[key])
      else:
        new_dict[key] = json_dict[key]

  # make id and parent_id format match
  new_dict['id'] = 't1_' + new_dict['id']
  return new_dict

In [7]:
# pre-process comment. Return embedding and # of words
def process_body(body):
  if body:
    # excludes everything besides alphanumeric and whitespace
    text = body.lower()
    text = re.sub(r"[^A-Za-z0-9^,!.\/'+-=]", " ", text)
    text = re.sub(r"what's", "what is ", text)
    text = re.sub(r"\'s", " ", text)
    text = re.sub(r"\'ve", " have ", text)
    text = re.sub(r"n't", "n not ", text)
    text = re.sub(r"i'm", "i am ", text)
    text = re.sub(r"\'re", " are ", text)
    text = re.sub(r"\'d", " would ", text)
    text = re.sub(r"\'ll", " will ", text)
    text = re.sub(r",", " ", text)
    text = re.sub(r"\.", " ", text)
    text = re.sub(r"!", " ! ", text)
    text = re.sub(r"\/", " ", text)
    text = re.sub(r"\^", " ^ ", text)
    text = re.sub(r"\+", " + ", text)
    text = re.sub(r"\-", " - ", text)
    text = re.sub(r"\=", " = ", text)
    text = re.sub(r"'", " ", text)
    text = re.sub(r"(\d+)(k)", r"\g<1>000", text)
    text = re.sub(r":", " : ", text)
    text = re.sub(r" e g ", " eg ", text)
    text = re.sub(r" b g ", " bg ", text)
    text = re.sub(r" u s ", " american ", text)
    text = re.sub(r"\0s", "0", text)
    text = re.sub(r" 9 11 ", "911", text)
    text = re.sub(r"e - mail", "email", text)
    text = re.sub(r"j k", "jk", text)
    text = re.sub(r"\s{2,}", " ", text)
    
    words = text.split()

    vecs = []
    for word in words:
      if word in glove_map_broadcast.value and word not in stop_words_broadcast.value:
        vecs.append(glove_map_broadcast.value[word])
    if vecs:
      return np.mean(vecs, axis=0), len(vecs)
  return np.zeros(glove_dim), 0

def make_feature_vec(json_dict, subreddit_indexer_bc):
  # 3 raw, 1 subreddits, glove, 4 engineered
  features = np.zeros(3 + 1 + glove_dim + 4)
  index = 0
  
  if json_dict['subreddit'] in subreddit_indexer_bc.value:
    features[index] = subreddit_indexer_bc.value[json_dict['subreddit']]
  index += 1
  
  ## Raw features
  for feature in ['controversiality', 'edited', 'gilded']:
    val = json_dict[feature]
    if feature == 'edited':
      features[index] = int(val == 'true')
      index += 1
    else:
      features[index] = int(val)
      index += 1
  
  ## Glove features
  glove, num_words = process_body(json_dict['body'])
  features[index:index + glove_dim] = glove
  index += glove_dim
  
  ## Engineered features
  # time between this comment and its parent
  utc, p_utc = json_dict['created_utc'], json_dict['p_utc']
  if utc and p_utc:
    features[index] = int(utc) - int(p_utc)
  index += 1
  
  # cosine similarity between this comment and its parent
  p_glove, _ = process_body(json_dict['p_body'])
  if not ((glove == 0).all() or (p_glove == 0).all()):
    features[index] = np.dot(glove, p_glove) / (np.linalg.norm(glove) * np.linalg.norm(p_glove))
  index += 1
  
  # parent score
  p_score = json_dict['p_score']
  if p_score:
    features[index] = p_score
  index += 1
  
  # num words
  features[index] = num_words
  index += 1
  
  return DenseVector(features)

In [8]:
path = 'mnt/blobmount/'
rdds = []

for date in ['2017-11']:
  file = 'RC_' + date + '.bz2'
  print('loading...' + file)
  rdd = (sc.textFile("/mnt/blobmount/"+file, minPartitions=96)
        .map(json.loads)
        .filter(filter_dict)
        .map(extract_dict))
  rdds.append(rdd)
  
# create raw total dataset
total_rdd = sc.union(rdds)
total_df = total_rdd.toDF().cache()

### Extract top subreddits

In [10]:
df = total_df.groupBy(total_df.subreddit).count().orderBy("count", ascending=False)
df = df.withColumn('cumul_sum', f.sum('count').over(Window.orderBy(f.col('count').desc()).rowsBetween(Window.unboundedPreceding, 0)))
df = df.withColumn('cumul_percent', f.col('cumul_sum')/f.sum('count').over(Window.partitionBy()))

top_k = (df.select(df.subreddit)
          .limit(k)
          .rdd.map(lambda row: row['subreddit'])
          .collect())

In [11]:
subreddit_indexer_bc = sc.broadcast({subreddit: idx for idx, subreddit in enumerate(top_k)})

### Join to get parent-level features

In [13]:
parent_df = (total_df
             .select(total_df.id.alias('p_id'), 
                     total_df.body.alias('p_body'), 
                     total_df.score.alias('p_score'),
                     total_df.created_utc.alias('p_utc'))
             .repartition('p_id'))

total_df = total_df.repartition(total_df.parent_id)

joined_df = total_df.join(parent_df, total_df.parent_id == parent_df.p_id, 'left_outer')

### Create actual features (vector representation)

In [15]:
# Converts row to (label, features dict) tuple
def helper(row):
  d = row.asDict()
  del d[label]
  return (row[label], d)

def transform(rdd):
  rdd = rdd.map(helper)
  df = rdd.toDF(['label', 'features'])
  myudf = udf(lambda row: make_feature_vec(row, subreddit_indexer_bc), VectorUDT())
  df = df.select(df.label, myudf(df.features).alias('features')).cache()
  return df

### Make splits and then save

In [17]:
# subsample so that our resulting data isn't too large
joined_rdd = (joined_df.sample(False, 0.4)
              .rdd
              .repartition(96)
              .cache())

features_df = transform(joined_rdd)

train_df, val_df, test_df = features_df.randomSplit([0.8, 0.1, 0.1])

In [18]:
def save_csv(df, fname):
  csv_rdd = (df.rdd.map(lambda row: str(row['label']) + ',' + ','.join(str(x) for x in row['features'])))
  csv_rdd.saveAsTextFile(os.path.join(path, fname))
  
save_csv(train_df, 'full_processed_train')
save_csv(test_df, 'full_processed_test')
save_csv(val_df, 'full_processed_val')