### Imports

In [2]:
import os
import re
import sys
import ast
import json
import datetime
import requests
import numpy as np
import pandas as pd 
from io import StringIO
from tqdm.auto import tqdm

import subprocess
from subprocess import Popen, PIPE

import psycopg2
from psycopg2.extras import execute_values, execute_batch

sys.path.append("C:\\Users\\SHIRAM\\Documents\\WSB\\")
from generate_urls import get_all_tickers
from reddit_stuff import get_conn, get_sqlalchemy_engine, get_reddit_client_praw

# import spacy
# from spacy import displacy
from joblib import Parallel, delayed

import plotly.express as px
import plotly.graph_objects as go

from utils import batch
from wsb import Gather

import pyarrow as pa
import pyarrow.parquet as pq

gather = Gather()
tqdm.pandas()

import matplotlib.pyplot as plt
%matplotlib inline

%load_ext autoreload
%autoreload 2

### What do we have online?

In [None]:
with gather.get_psycopg2_conn() as conn: 
    with conn.cursor() as cur: 
        cur.execute("SELECT * FROM wsb_comments_temp;")
        wsb_comments_analytics_df = pd.DataFrame.from_records(
            cur.fetchall(), 
            columns=["created_utc", "submission_id", "comment_id"]
        )

In [None]:
with gather.get_psycopg2_conn() as conn: 
    with conn.cursor() as cur: 
        cur.execute("SELECT created_utc, submission_id, comment_id FROM wsb_comments_analytics;")
        wsb_comments_analytics_df = pd.DataFrame.from_records(
            cur.fetchall(), 
            columns=["created_utc", "submission_id", "comment_id"]
        )

wsb_comments_aggs_df = wsb_comments_analytics_df.groupby([wsb_comments_analytics_df["created_utc"].dt.date, "submission_id"])["comment_id"].nunique()
wsb_comments_aggs_df = pd.DataFrame(wsb_comments_aggs_df)
wsb_comments_aggs_df.columns = ["comments_nunique"]
wsb_comments_aggs_df = wsb_comments_aggs_df.reset_index()
wsb_comments_aggs_df.loc[:, "created_utc"] = pd.to_datetime(wsb_comments_aggs_df["created_utc"])
wsb_comments_aggs_df = wsb_comments_aggs_df.groupby([wsb_comments_aggs_df.created_utc]).apply(lambda x: x.sort_values("comments_nunique", ascending=False).max() )
wsb_comments_aggs_df = wsb_comments_aggs_df.drop(columns=["created_utc"])
wsb_comments_aggs_df.to_csv("wsb_comments_aggs_df.csv")

In [None]:
if "wsb_comments_aggs_df" not in locals(): 
    wsb_comments_aggs_df = pd.read_csv("wsb_comments_aggs_df.csv", index_col=0)
    wsb_comments_aggs_df.index = pd.to_datetime(wsb_comments_aggs_df.index)
    
wsb_comments_aggs_df["comments_nunique"].plot(figsize=(20, 10))

In [None]:
### Where are the gaps?
all_dates_in_db = wsb_comments_aggs_df.index.tolist()
all_bdates = pd.bdate_range(start=wsb_comments_aggs_df.index.min(), end=wsb_comments_aggs_df.index.max(), freq="1D")
dates_not_found_in_db = [ele for ele in all_bdates if ele not in all_dates_in_db]
dates_not_found_in_db = sorted(dates_not_found_in_db)
missing_dates_arr = np.array(dates_not_found_in_db) 
missing_start_end_date_pairs = [(int(start.timestamp()), int(end.timestamp()) ) for start, end in zip(missing_dates_arr, np.roll(missing_dates_arr, -1)[:-1] )]

In [None]:
saved_filename = "all_saved_responses_before_sleep_only_stickied.json"

if os.path.isfile(saved_filename): 
    with open(saved_filename, "r") as f: 
        all_responses = json.load(f)
else:
    all_responses = []
    for start, end in tqdm(missing_start_end_date_pairs):
        host = "https://api.pushshift.io/reddit/search/submission/?"
        params = {"after": f"{start}", "before": f"{end}", "subreddit": "wallstreetbets", "stickied": "true"}
        response = requests.get(host, params)
        if response.status_code == 200: 
            all_responses += response.json()["data"]
    # write this shit down to a file
    with open(saved_filename, "w") as f: 
        json.dump(all_responses, f)

In [None]:
automod_results = [ele for ele in all_responses if "author" in ele.keys() and ele["author"] == "AutoModerator"]
automod_df = pd.DataFrame([{"id": ele["id"], "created_utc": ele["created_utc"], "num_comments": ele["num_comments"], "url": ele["url"]} for ele in all_responses])
automod_df.loc[:, "created_utc"] = automod_df["created_utc"].apply(lambda x: pd.to_datetime(x, unit="s"))
automod_df = automod_df.set_index("created_utc")

automod_df_counts = automod_df.groupby(automod_df.index.date).aggregate({"num_comments": "sum"})
automod_df_counts.index.name = "created_utc"
automod_df_counts.index = pd.to_datetime(automod_df_counts.index)

fig, ax = plt.subplots(figsize=(20, 10))
wsb_comments_aggs_df["comments_nunique"].plot(ax=ax, label="Old")
automod_df_counts["num_comments"].plot(ax=ax, label="New")
ax.legend()

### Gather all comments from those new submissions and insert them

In [27]:
all_comments_filename = "../data/all_comments_arr_2021_08_17.json"
all_missing_comments_filename = "../data/all_missing_comment_ids.json"

# if the file is 
if "all_comments_arr" not in locals() and os.path.isfile(all_comments_filename):
    with open(all_comments_filename, "r") as f: 
        all_comments_arr = json.load(f)
        
    # if this is present, then all missing comment ids must also be present
    if "all_missing_comment_ids" not in locals() and os.path.isfile(all_missing_comments_filename): 
        with open(all_missing_comments_filename, "r") as f: 
            all_missing_comment_ids = json.load(f)

else:
    if "all_missing_comment_ids" not in locals() and os.path.isfile("all_missing_comment_ids.json"): 
        with open("all_missing_comment_ids.json", "r") as f: 
            all_missing_comment_ids = json.load(f)
    else:
        hosts = [f"https://api.pushshift.io/reddit/submission/comment_ids/{ele['id']}" for ele in all_responses]
        all_missing_comment_ids = gather.get_all_comments_from_pmaw_single_thread(submissions=all_responses)
        with open("all_missing_comment_ids.json", "w") as f:
            json.dump(all_missing_comment_ids, f)

    all_missing_comments_only = []
    for ele in all_missing_comment_ids:
        all_missing_comments_only += ele["comment_ids"]

    batch_len = 600
    missing_comments_batched = batch(all_missing_comments_only, n=batch_len)

    all_comments_arr = []
    for missing_comments in tqdm(missing_comments_batched, total=len(all_missing_comments_only)//batch_len):
        comments = gather.pmaw_api.search_comments(ids=missing_comments)
        all_comments_arr += comments
    
    with open("all_comments_arr_2021_08_17.json", "w") as f:
        json.dump(all_comments_arr, f)

In [32]:
# clean up comment submissions
comment_submission_ids = pd.DataFrame(all_missing_comment_ids).explode("comment_ids")[["comment_ids", "submission_id"]]#.reset_index(drop=True).to_dict()
comment_submission_ids.columns = ["id", "submission_id"]
comment_submission_ids.set_index("id", inplace=True)

# merge all_comments_df with the comment_submissions, and convert it to a list with dict type
all_comments_df = pd.DataFrame.from_dict(all_comments_arr)
all_comments_df.set_index("id", inplace=True)
all_comments_df = pd.merge(all_comments_df, comment_submission_ids, left_index=True, right_index=True)
all_comments_df = all_comments_df.reset_index()

# all_comments_df.to_csv("all_comments_with_sub_ids.csv", index=False)

In [33]:
all_comments_df = all_comments_df.drop_duplicates(inplace=False, subset=["id"])

In [40]:
comments_columns = ["created_utc", "retrieved_on", "id", "parent_id", "link_id", "author", "submission_id", "body", "subreddit"]
all_comments_df = all_comments_df[comments_columns]

In [8]:
# gather.insert_all_comments_to_db_using_df_pmaw(df=all_comments_df)

In [41]:
print("formatting into correct format...")
comments_cols = []
to_insert_comments = ["created_utc", "retrieved_on", "id", "parent_id", "link_id", "author", "author_fullname", "submission_id", "body", "subreddit"]

# make a reasonable batch len
batch_len = 50000

# there are too many comments, batch them
batched_comments = batch(all_comments_arr, n=batch_len)

# we need to pre-calculate the total number of batches here
total_batches = int(len(all_comments_df) // batch_len)

formatting into correct format...


In [72]:
all_comments_df = all_comments_df.drop_duplicates(subset=["created_utc", "id"], inplace=False)

In [76]:
text_stream = StringIO()
all_comments_df.to_csv(text_stream, header=True, index=False)
text_stream.seek(0)

print("-- Copy-ing into the db...")
focused_cols = ",".join(all_comments_df.columns.tolist())
with gather.get_psycopg2_conn() as conn:
    with conn.cursor() as cur:
        try:
            cur.copy_expert(
                sql=f"""copy comments({focused_cols}) from stdin with (format csv, delimiter ',', header);""",
                file=text_stream,
            )
            conn.commit()
        except psycopg2.programmingerror as e:
            print(e)
print("-- Done")

-- Copy-ing into the db...
-- Done


#### Extras

In [2]:
# all_comments_df = pd.read_json("all_comments_arr_2021_08_17.json", lines=True, chunksize=10000)
# all_comments_df = pd.read_json("all_comments_arr_2021_08_17.json")
# gather.insert_all_comments_to_db_pmaw(df=all_comments_df)

In [None]:
# all_sub_id_with_comments = []
# for submission_comment_ids in tqdm(all_missing_comment_ids):
#     for sub_id, comments in submission_comment_ids.items():
#         comments_arr = gather.pmaw_api.search_comments(ids=comments)
#         for c in comments_arr:
#             c["submission_id"] = sub_id
#             all_sub_id_with_comments.append(c)

In [None]:
with open("all_sub_id_with_comments.json", "w") as f: 
    json.dump(all_sub_id_with_comments, f)

In [None]:
host_v2 = "https://api.pushshift.io/reddit/search/comment/?"

params_v2 = {"q": "MVIS", "subreddit": "wallstreetbets", "before": "60d"}
# params_v2 = {"q": "MVIS", "subreddit": "wallstreetbets", "after": int(missing_dates_arr[0].timestamp()), "before": int(missing_dates_arr[-1].timestamp())}

params_v21 = {"q": "Microvision", "subreddit": "wallstreetbets", "before": "60d"}
# params_v21 = {"q": "Microvision", "subreddit": "wallstreetbets", "after": int(missing_dates_arr[0].timestamp()), "before": int(missing_dates_arr[-1].timestamp())}

params_v22 = {"q": "mvis", "subreddit": "wallstreetbets", "before": "60d"}
# params_v22 = {"q": "mvis", "subreddit": "wallstreetbets", "after": int(missing_dates_arr[0].timestamp()), "before": int(missing_dates_arr[-1].timestamp())}

all_responses_v2 = []
for param in tqdm([params_v2, params_v21, params_v22]):
    response_v2 = requests.get(host_v2, params=param)

    if response_v2.status_code == 200:
        response_v2_json = response_v2.json()
        all_responses_v2 += response_v2_json["data"]

### What does it say?

In [None]:
wsb_com_df = pd.read_csv("polygonio_public_wsb_comments.csv")

wsb_com_df.loc[~pd.isna(wsb_com_df[colname]), colname] = \
    wsb_com_df.loc[~pd.isna(wsb_com_df[colname]), colname].apply(lambda x: datetime.datetime.fromtimestamp(x))
    
wsb_com_df.loc[:, colname] = pd.to_datetime(wsb_com_df[colname])

### Get all tickers

In [None]:
nlp = spacy.load("en_core_web_sm")
tickers = get_all_tickers(active="true")
tickers_only = [ticker["ticker"].lower() for ticker in tickers]

In [None]:
# Get data from the comments db
with get_conn() as conn:
    with conn.cursor() as cur: 
        query = "SELECT created_utc, comment_id, body FROM wsb_comments_analytics;"
        cur.execute(query)
        res = cur.fetchall()

wsb_comments_analytics_df = pd.DataFrame.from_records(res)
wsb_comments_analytics_df.columns = ["datetime", "comment_id", "body"]
wsb_comments_analytics_df.head()

In [None]:
def chunker(iterable, total_length, chunksize):
    return (iterable[pos: pos + chunksize] for pos in range(0, total_length, chunksize))

def flatten(list_of_lists):
    "Flatten a list of lists to a combined list"
    return [item for sublist in list_of_lists for item in sublist]

if "clean_text" in locals():
    del clean_text
    
def clean_text(text):
    text = re.sub(r'\s+', ' ', text)
    text_arr = text.replace(":", "").replace(";", "").lower().split(" ")
    return text_arr

def tokenize(doc):
    token_list = {str(token).lower(): token.pos_ for token in doc}
    return token_list

def complex_tkr_condition(tkr, comment_split, comment_split_pos):
    if (tkr not in ["HAS", "A", "FOR", "NEW", "ME", "CAN", "HOW", "GO", "IF", "WHEN", "THERE"]) and (tkr in comment_split_pos.keys()):
        if (tkr in comment_split) and (comment_split_pos[tkr] in ["PROPN", "NOUN"]):   
            return tkr
        else:
            return "notickers"
    
def process_chunk(texts, tickers):
    processed_pipe = []
    texts = [clean_text(text) for text in texts]
    for doc in nlp.pipe(texts, batch_size=20):
        tokenized_doc = tokenize(doc)
        tickers_found = [tkr for tkr in tickers if complex_tkr_condition(tkr, tokenized_doc)]
        processed_pipe.append(tickers_found)
    return processed_pipe
    
def parallel_nlp(texts, tickers, total_len, chunksize=100):
    executor = Parallel(n_jobs=7, backend='multiprocessing', prefer="threads")
    tasks = (delayed(process_chunk)(chunk, tickers) for chunk in tqdm(chunker(texts, total_len, chunksize=chunksize)))
    result = executor(tasks)
    return flatten(result)

def update_wsb_analytics(conn, all_tickers_found, all_comment_ids):
    with conn.cursor() as cur: 
        cur.execute("PREPARE updateStmt as UPDATE wsb_comments_analytics SET tickers_found=$1 WHERE comment_id=$2")
        execute_batch(cur, 
                      "EXECUTE updateStmt (%(tickers_found)s, %(comment_id)s", 
                      all_tickers_found, 
                      all_comment_ids, 
                      page_size=100)
        cur.execute("DEALLOCATE updateStmt")
        conn.commit()

In [None]:
if "comm_id_ticker_info" in locals():
    existing_keys = [list(c.keys())[0] for c in comm_id_ticker_info]
    df_ = wsb_comments_analytics_df.loc[~wsb_comments_analytics_df["comment_id"].isin(existing_keys), ["comment_id", "body"]].values
else:
    comm_id_ticker_info = []
    df_ = wsb_comments_analytics_df[["comment_id", "body"]].values
    
with get_conn() as conn:
    prepared_statements = []
    for (comment_id, comment_body) in tqdm(df_):
        d = {"comment_id": comment_id, "tickers": []}
        try:
            comment_text = clean_text(text=comment_body)
            comment_split_pos = tokenize(doc=nlp(comment_body))
            d["tickers"] = [tkr for tkr in tickers_only if complex_tkr_condition(tkr, comment_text, comment_split_pos)]
            if d["tickers"]:
                prepared_statements.append(d)
        except TypeError as e:
            pass
        

prepared_statements_df = pd.DataFrame(prepared_statements)

null_mask = ~pd.isna(prepared_statements_df["tickers"])
prepared_statements_df.loc[null_mask, "tickers"] = prepared_statements_df.loc[null_mask, "tickers"].progress_apply(lambda x: ', '.join(ast.literal_eval(x)))

prepared_statements_df.to_csv("prepared_statements.csv", index=False)

In [None]:
def make_prepared_statements(comment_df: pd.DataFrame): 
    with get_conn() as conn: 
        with conn.cursor() as cur: 
            cur.execute("""SELECT tbl1.id FROM wsb_comments tbl1 WHERE NOT EXISTS (SELECT FROM wsb_comment_analytics tbl2 WHERE tbl2.comment_id = tbl1.id);""")
        
    prepared_statements = []
    for (comment_id, comment_body) in tqdm(comment_df):
        d = {"comment_id": comment_id, "tickers": []}
        try:
            comment_text = clean_text(text=comment_body)
            comment_split_pos = tokenize(doc=nlp(comment_body))
            d["tickers"] = [tkr for tkr in tickers_only if complex_tkr_condition(tkr, comment_text, comment_split_pos)]
            if d["tickers"]:
                prepared_statements.append(d)
        except TypeError as e:
            pass


    prepared_statements_df = pd.DataFrame(prepared_statements)

    null_mask = ~pd.isna(prepared_statements_df["tickers"])
    prepared_statements_df.loc[null_mask, "tickers"] = prepared_statements_df.loc[null_mask, "tickers"].progress_apply(lambda x: ', '.join(ast.literal_eval(x)))

    prepared_statements_df.to_csv("prepared_statements.csv", index=False)

In [None]:
prepared_statements_df = pd.read_csv("prepared_statements.csv")

In [None]:
# stage 1 
print("-- Start")
filepath = r"./prepared_statements.csv"
psql = r"/usr/lib/postgresql/13/psql"
psql_win = r"C:\\Program Files\\PostgreSQL\\bin\\psql.exe"

db_conn = "postgresql://postgres:rogerthat@localhost:5433/polygonio"
command_1 = [psql_win, '--command', "\copy wsb_comments_analytics_smaller_temp FROM './prepared_statements.csv' WITH (FORMAT csv, header)", '-d', db_conn]

process = Popen(command_1, stdout=PIPE, stderr=PIPE, shell=True)

# Print whatever the shell would normally display.
stdout = process.communicate()[0].decode('utf-8').strip()
print(stdout)

# # Running command 2.
# process = Popen(command2, stdout = PIPE, stderr = PIPE)

# stdout = process.communicate()[0].decode('utf-8').strip()
# print(stdout)

In [None]:
# stage 2 
create_wsb_comments_analytics_temp = \
"""DROP TABLE IF EXISTS esb_comments_analytics_temp;
    CREATE TABLE wsb_comments_analytics_temp AS (
        SELECT t1.created_utc, t1.submission_id, t1.parent_id, t1.comment_id, t1.author, t1.body, t2.tickers FROM wsb_comments_analytics t1
        LEFT JOIN
        wsb_comments_analytics_tickers_found t2 on t1.comment_id = t2.comment_id);""""

# stage 3
truncate_and_drop_and_rename_wsb_comments_analytics = \
"""TRUNCATE wsb_comments_analytics;DROP TABLE wsb_comments_analytics;ALTER TABLE wsb_comments_analytics_temp RENAME TO wsb_comments_analytics;"""
create_hypertable_wsb_comments_analytics = "SELECT create_hypertable('wsb_comments_analytics', 'created_utc', 'comment_id', number_partitions := 10, migrate_data := TRUE);"
drop_wsb_comments_analytics_tickers_found = "DROP TABLE IF EXISTS wsb_comments_analytics_tickers_found;"

In [None]:
prepared_statements_df.to_sql(
    name="wsb_comments_analytics_smaller_temp", 
    con=get_sqlalchemy_engine(),
    if_exists="replace", 
    index=False
)

#### Inserting using psycopg2 ? 

In [None]:
def batch(iterable, n=1):
    l = len(iterable)
    for ndx in range(0, l, n):
        yield iterable[ndx : min(ndx + n, l)]

insert_statement = \
"UPDATE wsb_comments_analytics SET tickers_found = string_to_array(%s, ', ') WHERE comment_id = %s;"

values = tuple(prepared_statements_df[["tickers", "comment_id"]].to_records(index=False))

all_mogrified_statements = []
with get_conn() as conn: 
    with conn.cursor() as cur: 
        for i in tqdm(range(len(values))):
            mogrified_update_stmt = cur.mogrify(insert_statement, (values[i][0], values[i][1]))
            all_mogrified_statements.append(mogrified_update_stmt)

### Uploading more ticker details

In [None]:
companies_df = pd.read_csv("companies.csv")
companies_df["tags"] = companies_df["tag 1"] + "," + companies_df["tag 2"] + "," + companies_df["tag 3"]
companies_df = companies_df.drop(columns=["tag 1", "tag 2", "tag 3"])

companies_df = companies_df.drop(columns=["short name"])

companies_df.columns = [
    "symbol", 
    "name", 
    "industry", 
    "description", 
    "url", 
    "logo", 
    "ceo", 
    "exchange", 
    "marketcap", 
    "sector", 
    "tags"
]

cols_str = ', '.join(companies_df.columns.tolist())

companies_df["tags"] = companies_df.loc[~companies_df["tags"].isna(), "tags"].apply(lambda x: json.dumps(x.split(",")))

companies_df.loc[pd.isna(companies_df["tags"]), "tags"] = json.dumps("[]")

with get_conn() as conn:
    with conn.cursor() as cur: 
        execute_values(cur, f"INSERT INTO ticker_details({cols_str}) VALUES %s ON CONFLICT (symbol) DO NOTHING;", tuple(companies_df.to_records(index=False)))

### Read all that again and find out

In [None]:
# with open("comm_id_ticker_info.json", "r") as f: 
#     comm_id_ticker_info = json.load(f)

In [None]:
all_clean_eles = []
for ele in tqdm(comm_id_ticker_info):
    sub_id = list(ele.keys())[0]
    all_clean_eles.append({ "submission_id": sub_id, "sentence": ele[sub_id]["body"], "tickers": [e["ticker"] for e in ele[sub_id]["tickers"]] })

In [None]:
clean_sents_tickers_df = pd.DataFrame.from_records(all_clean_eles)

In [None]:
clean_sents_tickers_df

In [None]:
pd.merge(left=wsb_com_df, right=clean_sents_tickers_df[["submission_id","tickers"]], how="", left_on="submission_id", right_on="submission_id")