# Scrape data from pushshift API

In [1]:
ROOT_DIR = "reddit"
PUSHSHIFT_REDDIT_URL = "http://api.pushshift.io/reddit"
MONGO_URI = "mongodb://35.230.135.125:27017/"
MONGO_DB = "reddit"
MONGO_SUBS_COLLECTION = "submissions"
SUBS_FILE = "submission.json"
COMMENTS_FILE = "comment.json"
CLEAN_COMMENTS_FILE = "clean_comments.json"
COMMENT_EMOTIONS_FILE = "comment_emotions.json"
COMMENT_EMOTION_THRESHOLD = 0.2
DEFAULT_THRESHOLDS = [0.5] * 10

In [3]:
import requests
import json
import re
import time
from datetime import datetime

def fetchObjects(data_type="submission", subreddit="Art", after=1104537600):
    params = {
        "sort_type":"created_utc",
        "sort":"asc",
        "size": 1000,
        "type": data_type,
        "subreddit": subreddit,
        "after": after
    }

    r = requests.get(PUSHSHIFT_REDDIT_URL + "/" + data_type + "/search/", params=params, timeout=30)
    if r.status_code == 200:
        response = json.loads(r.text)
        data = response['data']
        return data

def scrape_reddit(data_type="submission", subreddit="Art"):
    try:
        file = open(f"{ROOT_DIR}/{data_type}.json", "r+")
        for line in file:
            pass
        last_item = json.loads(line)
        after = last_item.get("created_utc", 1104537600)
    except:
        file = open(f"{ROOT_DIR}/{data_type}.json", "w")
        after = 1104537600
        
    while True:
        new_data = fetchObjects(data_type, subreddit, after) 
        for item in new_data:
            file.write(json.dumps(item) + "\n")
        after = item["created_utc"]
        dt_object = datetime.fromtimestamp(after)
        print("after =", dt_object)
        print("results =", len(new_data))
        time.sleep(0.5)

In [9]:
import pymongo


def get_collection(mongo_uri=MONGO_URI, db=MONGO_DB, col=MONGO_SUBS_COLLECTION):
    myclient = pymongo.MongoClient(mongo_uri)
    mydb = myclient[db]
    collection = mydb[col]
    return collection

In [5]:
import jsonlines
import pymongo
from tqdm.auto import tqdm


def build_mongo_submissions_collection(filename=SUBS_FILE, db=MONGO_DB, collection=MONGO_SUBS_COLLECTION, mongo_uri=MONGO_URI):
    subs = get_collection(mongo_uri, db, collection)
    subs.create_index("id", unique=True)
    subs.create_index("downloadTried")
    subs.create_index("local_image_path")
    
    with jsonlines.open(f"{ROOT_DIR}/{filename}") as reader:
        for obj in tqdm(reader):
            try:
                subs.insert(obj)
            except KeyboardInterrupt: 
                break
            except: 
                continue
            

In [6]:
# build_mongo_submissions_collection()

HBox(children=(HTML(value=''), FloatProgress(value=1.0, bar_style='info', layout=Layout(width='20px'), max=1.0…

  





# Prepare the comments for emotion classification

In [6]:
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("reddit").getOrCreate()

In [126]:
import os
import numpy as np


from pyspark.sql.functions import  collect_list, struct, col, udf, lit, concat, lower
from pyspark.sql.types import StructType, StringType, ArrayType, FloatType, BooleanType, IntegerType


emot_schema = StructType()\
    .add("id", StringType(), True)\
    .add("emotion_vec", ArrayType(FloatType()), True)


com_schema = StructType()\
    .add("id", StringType(), True)\
    .add("author", StringType(), True)\
    .add("link_id", StringType(), True)\
    .add("parent_id", StringType(), True)\
    .add("body", StringType(), True)\
    .add("subreddit", StringType(), True)\
    .add("created_utc", StringType(), True)\
    .add("score", StringType(), True)


def prepare_comments_df(path=COMMENTS_FILE):
    """
        Loads scraped comments file into a spark dataframe.
        Filters out main moderator comments and deleted commments.
    """
    df = spark.read.schema(com_schema).json(f"{ROOT_DIR}/{path}")
    df = df.filter(col("link_id") == col("parent_id"))
    df = df.filter(col("author") != "AutoModerator")
    df = df.filter(col("author") != "art_moderator_bot")
    df = df.filter(col("body") != "[deleted]")
    return df


def df_save_json(df, filename=CLEAN_COMMENTS_FILE):
    df.repartition(1).write.json(f"{ROOT_DIR}/{filename}_dir")
    os.system(f"cat {ROOT_DIR}/{filename}_dir/*.json > {ROOT_DIR}/{filename}")
    os.system(f"rm -rf {ROOT_DIR}/{filename}_dir")
    
    
def task_create_clean_comments_file(path=COMMENTS_FILE):
    df = prepare_comments_df(path)
    df.show()
    df_save_json(df, CLEAN_COMMENTS_FILE)

In [19]:
# task_create_clean_comments_file()

+-------+------------+--------+---------+--------------------+---------+-----------+-----+
|     id|      author| link_id|parent_id|                body|subreddit|created_utc|score|
+-------+------------+--------+---------+--------------------+---------+-----------+-----+
|c03furo|    nerdlife|t3_6a2bf| t3_6a2bf|Im not going to d...|      Art| 1205457635|    2|
|c03h794|      OMouse|t3_6cjdw| t3_6cjdw|Post your art som...|      Art| 1205865252|    3|
|c03hd6h|dsg123456789|t3_6cjdw| t3_6cjdw|It would help if ...|      Art| 1205899588|    1|
|c03hjzh|      OMouse|t3_6copr| t3_6copr|What's really nic...|      Art| 1205954100|    1|
|c03hkdv|   kickstand|t3_6cosf| t3_6cosf|I knew a Lucia on...|      Art| 1205956082|    2|
|c03ij9g|      OMouse|t3_6d1ql| t3_6d1ql|We need more subm...|      Art| 1206236946|    2|
|c03imvr|   [deleted]|t3_6d30x| t3_6d30x|**copy, paste all...|      Art| 1206285242|    1|
|c03iu7e|    nerdlife|t3_6d1wp| t3_6d1wp|Yeh, Andy Kehoe r...|      Art| 1206347403|    1|

# Train comment emotion detection model

Based on this data https://github.com/sarnthil/unify-emotion-datasets

In [None]:

all_emotions = [
    ("joy", "sadness"),
    ("anger", "fear"),
    ("trust", "disgust"),
    ("surprise", "anticipation")
]

def data():
  with smart_open.smart_open("https://storage.googleapis.com/pushshift_reddit/unified-dataset.jsonl") as reader:
    for obj in tqdm(reader, total=221439):
      d = json.loads(obj)
      text = d['text']
      emotions = d['emotions']
      labels = []
      for p, n in all_emotions:
        if emotions.get(p, 0):
          l = 1.0
        elif emotions.get(n, 0):
          l = -1.0
        else:
          l = 0.0
        labels.append(l)
      yield text, labels


# Aggregate comment emotions to get submission vectors

In [275]:
from tqdm.auto import tqdm 
import numpy as np


def threshold(emotion_vec, t):
    res = np.asarray(emotion_vec).astype(np.float32) -  np.asarray(t).astype(np.float32)
    
#     res = np.where(res > 0, res, 0)
#     res = np.argmax(res)
    
    res = np.where(res > 0, 1, 0)
#     res = np.argmax(res)
    
    return res.tolist()


def load_comment_emotions(path=COMMENT_EMOTIONS_FILE, thresholds=DEFAULT_THRESHOLDS):
    """
        Loads emotion vectors predicted by the ktrain bert model on comments.
    """
    threshold_udf = udf(lambda z: threshold(z, t), ArrayType(IntegerType()))
    df = spark.read.schema(emot_schema).json(f"{ROOT_DIR}/{path}")
    df = df.withColumn("class" , threshold_udf(df.emotion_vec))
    return df


def aggregate_emotions(emotions):
    weights = np.asarray([w for vec, w in emotions if int(w) > 0]).astype(np.float32)
    vectors = np.asarray([vec for vec, w in emotions if int(w) > 0]).astype(np.float32)
    try: 
        return np.average(vectors, weights=weights, axis=0).tolist()
    except ZeroDivisionError:
        return 0
  

def aggregate_emotion_vectors(comments_df, emotions_df):
    """
        Create submission emotion vectors by doing a weighted average of comment
        vectors using as weight the comment score.
    """
    aggregate_emotions_udf = udf(lambda z: aggregate_emotions(z), ArrayType(FloatType()))
    df = comments_df.join(emotions_df, emotions_df.id == comments_df.id , "inner")
    df = df.groupby("link_id").agg(collect_list(struct("emotion_vec", "score" )).alias('emotion_vectors'))
    df = df.withColumn("aggregated_emotion", aggregate_emotions_udf("emotion_vectors"))
    df = df.select("link_id", "aggregated_emotion")
    return df


def update_mongo_submission_emotions(aggregated_emotions_df, mongo_uri, db, col):
    collection = get_collection(mongo_uri, db, col)
    data = aggregated_emotions_df.toPandas().to_dict(orient='records')
    for sub in tqdm(data):
        _id = sub["link_id"].replace("t3_", "")
        emotion = sub["aggregated_emotion"]
        collection.update({ "id": _id}, { "$set": { "emotion_vec": emotion}})


def task_aggregate_emotions(comments_path, comment_emotions_path, thresholds, mongo_uri, db, collection):
    comments_df =  prepare_comments_df(comments_path)
    emotions_df =  load_comment_emotions(comment_emotions_path, thresholds=thresholds)
    aggregated_emotions_df = aggregate_emotion_vectors(comments_df, emotions_df)
    aggregated_emotions_df.show()
    update_mongo_submission_emotions(aggregated_emotions_df, mongo_uri, db, collection)

In [276]:
thresholds = {
    "joy": 0.18,
    "sadness": 0.09,
    "anger": 0.04,
    "fear": 0.09,
    "trust": 0.02,
    "disgust": 0.02,
    "surprise": 0.04,
    "noemo": 0.43,
    "other": 0.01,
    "missing": 0.08
}

In [277]:
task_aggregate_emotions(CLEAN_COMMENTS_FILE, COMMENT_EMOTIONS_FILE, t, MONGO_URI, MONGO_DB, MONGO_SUBS_COLLECTION)

+---------+--------------------+
|  link_id|  aggregated_emotion|
+---------+--------------------+
|t3_102aax|[0.20011702, 0.05...|
|t3_10gub6|[0.12210401, 0.05...|
|t3_10qhd2|[0.43869275, 0.02...|
|t3_10ty9j|[0.04781838, 0.00...|
|t3_10y64t|[0.005038334, 0.1...|
|t3_111zxi|[0.19062994, 0.05...|
|t3_11230w|[0.35857823, 0.01...|
|t3_11are5|[0.31641218, 0.01...|
|t3_11odq4|[0.062355034, 0.0...|
|t3_1218z6|[0.01670531, 2.56...|
|t3_121age|[8.4310834E-4, 2....|
|t3_128d69|[0.015222427, 0.0...|
|t3_12j87i|[0.31269696, 0.00...|
|t3_12ru2t|[0.31035754, 0.01...|
|t3_12rxlk|[0.7862867, 0.002...|
|t3_132oct|[0.0252282, 0.019...|
|t3_136902|[0.48327294, 0.01...|
|t3_13o6su|[0.66143954, 0.32...|
|t3_13s2ce|[0.20395271, 0.01...|
|t3_145iy8|[0.02086342, 0.10...|
+---------+--------------------+
only showing top 20 rows



HBox(children=(HTML(value=''), FloatProgress(value=0.0, max=423506.0), HTML(value='')))






# Scrape the images

In [None]:
import fire
import pymongo
import requests


def download_image(_id, url, local_path ,timeout=5):
    try:
        r = requests.get(url, timeout=timeout)
        if r.status_code in range(200, 209):
            extension = url.split(".")[-1]
            path = f"{local_path}/{_id}.{extension}"
            with open(path, 'wb') as f:
                f.write(r.content) 
            r.close()
            return path
        else:
            return None
    except:
        return None


def work(collection, local_path):
    sample = collection.find_one_and_update(
        {"downloadTried": { "$exists": False }},
        {"$set": { "downloadTried": True }})
    if sample:
        url = sample["url"]
        _id = sample["id"]
        path = download_image(_id, url, local_path)
        if path:
            collection.update_one(
                { "id": _id },
                {"$set": { "local_image_path": path, "image_was_downloaded": True }}
            )
        print(_id, url, path)


def run(db="reddit", col="submissions", local_path="reddit/mongo_images", mongo_uri="mongodb://35.230.135.125:27017/"):
    collection = get_collection(mongo_uri, db, col)

    while True:
        work(collection, local_path)


if __name__ == '__main__':
    fire.Fire(run)

# Preprocess images and save as numpy data

In [317]:
%matplotlib inline
from matplotlib import pyplot as plt
import pandas as pd
from tensorflow.keras.preprocessing.image import ImageDataGenerator


def get_images_and_emotions_df(threshold ,mongo_uri=MONGO_URI, db=MONGO_DB, col=MONGO_SUBS_COLLECTION):
    collection = get_collection(mongo_uri, db, col)
    data = collection.find(
        { "emotion_vec": {"$exists": True }, "local_image_path": {"$exists": True }, "image_was_downloaded": True }, 
        { "local_image_path": 1, "emotion_vec": 1 })
    data = (d for d in data if d["emotion_vec"])
    data = (d for d in data if max(d["emotion_vec"]) > threshold)
    df =  pd.DataFrame(data)
    return df

def plot_images(img_gen, batch_size=32, n=4):
    m = int(batch_size / n)
    fig, ax = plt.subplots(m, n, figsize=(10, 2*m))
    plt.setp(ax, xticks=[], yticks=[])
    plt.tight_layout(rect=[0, 0.03, 1, 0.95])
    for (imgs, labels) in img_gen:
        for i in range(m):
            for j in range(n):
                if i*n + j < batch_size:
                    ax[i][j].imshow(imgs[i*n + j])
        break

def fixed_datagen(datagen, batches=452):
    for i in range(batches):
        try:
            x, y = next(datagen)
            fixed_y = np.vstack(y)
            yield x, fixed_y
        except (KeyboardInterrupt, StopIteration) as e:
            break
        except Exception as e:
            print(e)
            continue
        
def generate_numpy_dataset(df, save_dir):
    imgen = ImageDataGenerator(rescale=1.0/255.0)
    datagen = imgen.flow_from_dataframe(
        df, 
        shuffle=False,
        x_col="local_image_path", 
        y_col="emotion_vec", 
        class_mode="raw", 
        batch_size=256,
        validate_filenames=False)
    n_batches = int(df["local_image_path"].size / 256)
    batches = fixed_datagen(datagen, batches=n_batches)
    for i, (x, y) in enumerate(tqdm(batches, total=n_batches)):
        batch_id = str(i).zfill(3)
        with open(f"{save_dir}/batch_{batch_id}_x.npy", "wb") as f_x:
            np.save(f_x, x)
        with open(f"{save_dir}/batch_{batch_id}_y.npy", "wb") as f_y:
            np.save(f_y, y)

In [318]:
def task_generate_numpy_dataset(t=0.5):
    df = get_images_and_emotions_df(t)
    print("Number of images:", df.shape)
    save_dir = f"{ROOT_DIR}/numpy_batches_{t}"
    os.makedirs(save_dir, exist_ok=True)
    generate_numpy_dataset(df, save_dir)

In [None]:
task_generate_numpy_dataset(0.8)

Number of images: (11386, 3)
Found 11386 non-validated image filenames.


HBox(children=(HTML(value=''), FloatProgress(value=0.0, max=44.0), HTML(value='')))

cannot identify image file <_io.BytesIO object at 0x7feaa02cbfc0>
cannot identify image file <_io.BytesIO object at 0x7feb19bc6360>




cannot identify image file <_io.BytesIO object at 0x7feb19bc60f8>
