In [None]:
from google.cloud import bigquery
from google.oauth2 import service_account
import sys

sys.path.insert(0, "../dags/")
from jobs.common.bigquery import get_table_as_records
import pandas as pd

In [None]:
key_path = "/Users/yco/.config/dbt-user-creds.json"
credentials = service_account.Credentials.from_service_account_file(
    key_path  # , scopes=["https://www.googleapis.com/auth/cloud-platform"],
)

client = bigquery.Client(
    credentials=credentials,
    project=credentials.project_id,
)

In [None]:
def sanitize(d):
    return {
        k: str(v)
        if isinstance(v, dict)
        else v.strftime("%Y-%m-%d")
        if k in ("date", "fetch_date")
        else v
        for k, v in d.items()
    }

# Replace raw posts content to change schema

In [None]:
data = get_table_as_records(
    client,
    "reddit_raw",
    "posts",
)
# data[0]["likes"] = False

In [None]:
data = pd.DataFrame(
    get_table_as_records(
        client,
        "reddit_texts",
        "posts_clean",
    )
)
data.to_csv("../reddit_data.csv")

In [None]:
pd.DataFrame(data).to_csv("~/data/bkp_reddit.csv")

In [None]:
job_config = bigquery.LoadJobConfig()
job_config.autodetect = True
job_config.schema_update_options = [
    "ALLOW_FIELD_ADDITION",
    "ALLOW_FIELD_RELAXATION",
]
job_config.write_disposition = "WRITE_APPEND"
table = "reddit_raw.posts"
try:
    client.delete_table(table)
except:
    pass
client.load_table_from_json(
    [sanitize(d) for d in data], table, job_config=job_config
).result()

# change model perfs models

In [None]:
query = "select * from `myreddit-338008.model_perfs.textcat` where model = 'models'"
perfs = pd.DataFrame(
    get_table_as_records(
        client,
        "model_perfs",
        "textcat",
    )
)
new_data = []
for _, data in perfs.to_dict(orient="index").items():
    if data["model"] == "models":
        continue
    data["model"] = "textcat_bow"
    data["model_type"] = "subreddit_classif"
    new_data.append(sanitize(data))
print(new_data)

In [None]:
job_config = bigquery.LoadJobConfig()
job_config.autodetect = True
job_config.schema_update_options = [
    "ALLOW_FIELD_ADDITION",
    "ALLOW_FIELD_RELAXATION",
]
job_config.write_disposition = "WRITE_APPEND"
table = "model_perfs.textcat"
try:
    client.delete_table(table)
except:
    pass
client.load_table_from_json(new_data, table, job_config=job_config).result()

In [None]:
from textacy import preprocessing

import html
import re


def replace_md_url(text):
    return re.compile(r"\[(.*)\]\(([^\(\)\[\]]*)\)").sub(r"\g<1> \g<2>", text)


preproc_pipe = preprocessing.make_pipeline(
    html.unescape,
    html.unescape,
    preprocessing.normalize.whitespace,
    preprocessing.normalize.bullet_points,
    preprocessing.normalize.hyphenated_words,
    replace_md_url,
    preprocessing.replace.urls,
    preprocessing.remove.brackets,
    preprocessing.normalize.unicode,
    preprocessing.remove.accents,
    preprocessing.replace.emojis,
    preprocessing.replace.numbers,
    preprocessing.replace.user_handles,
)

In [None]:
query = """
SELECT * from `reddit_texts.post_contents` LIMIT 20
"""
query_job = client.query(query)


rows = []
for i, row in enumerate(query_job):
    row = dict(row)
    text = row.pop("selftext")
    row["text"] = preproc_pipe(text)
    rows.append(row)

rows