In [1]:
%load_ext autoreload
%autoreload 2

%cd '/workspaces/polarization_reddit'

/workspaces/polarization_reddit


In [2]:
import time

import pandas as pd
import numpy as np

# from load.utils import load_comments
from load.utils import (
    load_users,
    load_user_party,
    load_user_party_parquet,
    load_subreddits,
    load_comments,
    save_df_as_json,
    save_df_as_parquet,
    load_comments_dask,
)
from preprocessing.utils import (
    tokenize_comment,
    calculate_user_party,
    load_event_comments,
    save_event_comments,
    build_vocab,
    save_event_vocab,
)

from preprocessing.constants import (
    EVENTS_DIR,
    ELECTIONS_REGEX,
    MIN_OCCURENCE_FOR_VOCAB,
)

ModuleNotFoundError: No module named 'preprocessing.utils'; 'preprocessing' is not a package

Bad pipe message: %s [b'\xb2$\xb4\xa7\x18E\x06\xfe\xcaF\x1e\xdc\x8a\xc0\xce\x9f\xec\x17 \x92\x9bV\x04\x1ee/\x174\xf1\xd2>\xa2\x15\xe0\xec\xef', b'\xba\x19\xf8jY\x9b\xd0\xf0\xb2\xe9\xd8\xde\x10\x00\x08\x13\x02\x13\x03\x13\x01\x00\xff\x01\x00\x00\x8f\x00\x00\x00\x0e\x00\x0c\x00\x00\t127.0.0.1\x00\x0b\x00\x04\x03\x00\x01\x02\x00\n\x00\x0c\x00\n\x00\x1d\x00\x17\x00\x1e\x00\x19\x00\x18\x00#\x00\x00\x00\x16\x00\x00\x00\x17\x00\x00\x00\r\x00\x1e\x00\x1c\x04\x03\x05\x03\x06\x03\x08\x07', b'\x08\t\x08\n\x08\x0b\x08']
Bad pipe message: %s [b'\x05\x08\x06']
Bad pipe message: %s [b'\x05\x01\x06', b'']
Bad pipe message: %s [b'\x03\x02\x03\x04\x00-\x00\x02\x01\x01\x003\x00&\x00$\x00\x1d\x00 `\xe2\xda\xee\x11\x1c\xec\xd6\xa8X\x1c\xf3\xed\xb9\xa8\x13f>\xee\x14\xa5E']
Bad pipe message: %s [b"\xfc\x9c\xa3@\xec\xc3\xc4\xac\x0e\xb3\xf7\x1cMF\x84O\xfad\x00\x00|\xc0,\xc00\x00\xa3\x00\x9f\xcc\xa9\xcc\xa8\xcc\xaa\xc0\xaf\xc0\xad\xc0\xa3\xc0\x9f\xc0]\xc0a\xc0W\xc0S\xc0+\xc0/\x00\xa2\x00\x9e\xc0\xae\xc0\xac\xc0

In [None]:
from dask.distributed import Client, LocalCluster

In [None]:
cluster = LocalCluster(processes=False)
client = Client(cluster)

2023-01-14 17:27:18,514 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-t7nxvam1', purging


In [None]:
print(client)

<Client: 'tcp://127.0.0.1:33499' processes=4 threads=4, memory=7.77 GiB>


In [None]:
YEAR = 2016
START_MONTH = 1
STOP_MONTH = 12

EVENT_NAME = f"us_election_{YEAR}"

### User Affiliation

In [None]:
subreddits = load_subreddits()

In [None]:
# TODO: take into account network structure to find other partisan subreddits
# which are not labeled
# Filter partisan subreddits
subreddits = subreddits[subreddits["party"].isin({"dem", "rep"})]

In [None]:
subreddits.groupby("party")["subreddit"].count()


party
dem    43
rep    19
Name: subreddit, dtype: int64

In [None]:
print("Load comments...")

comments = load_comments_dask(
    year=YEAR,
    start_month=START_MONTH,
    stop_month=START_MONTH,
)[["author", "subreddit"]]


Load comments...




TypeError: KilledWorker.__init__() missing 2 required positional arguments: 'last_worker' and 'allowed_failures'

In [None]:
print("Merge party information to comments...")
comments_party = comments.merge(subreddits, on="subreddit", how="inner")

Merge party information to comments...


In [None]:
print(len(comments_party))

25


In [None]:
user_party = comments_party.groupby(by="author").apply(
    calculate_user_party,
    meta={
        "dem_cnt": "int",
        "rep_cnt": "int",
        "score": "int",
        "party": "string",
    },
)
user_party = user_party[user_party["score"] != 0].compute()

In [None]:
user_party = user_party.reset_index()
user_party["author"] = user_party["author"].astype("string")
user_party["party"] = user_party["party"].astype("string")

In [None]:
print(f"Nr of users: {len(user_party)}")

print(user_party.groupby(by="party").count())

Nr of users: 18
dem    18
Name: party, dtype: Int64


In [None]:
save_df_as_parquet(
    data=user_party,
    target_file=f"user_party_{YEAR}.parquet",
)

## Filter event comments

In [None]:
# print("Load user party")
# user_party = load_user_party_parquet(year=YEAR)

In [None]:
comments = load_comments_dask(
    year=YEAR,
    start_month=START_MONTH,
    stop_month=STOP_MONTH,
)

user_comments = comments.merge(
    user_party,
    right_on="author",
    left_on="author",
    how="inner",
)

# Filter event data based on keywords
event_comments = user_comments[
    user_comments["body_cleaned"].str.contains(
        ELECTIONS_REGEX[YEAR],
        regex=True,
    )
]

## Tokenize and stem comments

In [None]:
event_comments["tokens"] = event_comments["body_cleaned"].apply(
    tokenize_comment,
    meta=("tokens", "string"),
).persist()

In [None]:
print(f"Nr of event comments: {len(event_comments)}")

Nr of event comments: 5964


In [None]:
save_event_comments(event_comments, EVENT_NAME, file_type="parquet")

## Build event vocabulary

In [None]:
# Read event data
# events_comments= load_event_comments(event_comments, EVENT_NAME, file_type="parquet")

event_vocab = build_vocab(
    event_comments["tokens"],
    min_words=MIN_OCCURENCE_FOR_VOCAB,
)

In [None]:
print(len(event_vocab))

2443


In [None]:
save_event_vocab(event_vocab, EVENT_NAME)