# Purpose

**2022-08-15: v0.6.0**
<br>Test `dask.delayed` to run aggregation of multiple subreddits in parallel.
With the new project we expect to aggregate posts for over 300k subreddits. For most of the process, each subreddit can be processed independently of other subreddits, so it makes sense to try to split up the work so we can speed things up.

**2022-06-29: v0.5.0**
<br>Because we embedded post & text as a single embedding and we didn't use MLflow to create those embeddings, it's easier to  run the embeddings in this notebook rather than to re-use or re-write the old `AggregateEmbeddings` class.

Provenance:
* `v0.4.1 / djb_03.01-2021-12-aggregate_v041_posts_and_comments_pandas.ipynb`

# Notebook setup

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
from collections import defaultdict
from datetime import datetime, timedelta
import gc
import os
import logging
from logging import info
from pathlib import Path
from pprint import pprint

import numpy as np
import pandas as pd
import plotly
import plotly.express as px
import seaborn as sns

import dask
from dask import dataframe as dd
from tqdm.auto import tqdm

import mlflow
import hydra

import subclu
from subclu.utils.hydra_config_loader import LoadHydraConfig
from subclu.models.aggregate_embeddings import (
    AggregateEmbeddings, AggregateEmbeddingsConfig,
    load_config_agg_jupyter, get_dask_df_shape,
)
from subclu.models import aggregate_embeddings_pd

from subclu.utils import set_working_directory, get_project_subfolder
from subclu.utils.eda import (
    setup_logging, counts_describe, value_counts_and_pcts,
    notebook_display_config, print_lib_versions,
    style_df_numeric,
    elapsed_time,
)
from subclu.utils.mlflow_logger import MlflowLogger, save_pd_df_to_parquet_in_chunks
from subclu.eda.aggregates import (
    compare_raw_v_weighted_language
)
from subclu.utils.data_irl_style import (
    get_colormap, theme_dirl
)

from subclu.i18n_topic_model_batch.subclu2.utils.data_loaders_gcs import LoadSubredditsGCS


print_lib_versions([dask, hydra, mlflow, np, pd, plotly, sns, subclu])

python		v 3.7.10
===
dask		v: 2021.06.0
hydra		v: 1.1.0
mlflow		v: 1.16.0
numpy		v: 1.19.5
pandas		v: 1.2.4
plotly		v: 4.14.3
seaborn		v: 0.11.1
subclu		v: 0.6.0


In [3]:
# plotting
import matplotlib.pyplot as plt
import matplotlib.ticker as mtick
import matplotlib.dates as mdates
plt.style.use('default')

setup_logging()
notebook_display_config()

# Set Local model paths

In [4]:
manual_model_timestamp = datetime.utcnow().strftime('%Y-%m-%d_%H%M%S')
path_this_model = get_project_subfolder(
    f"data/models/aggregate_embeddings/manual_v060_{manual_model_timestamp}"
)
Path.mkdir(path_this_model, parents=True, exist_ok=True)
path_this_model

PosixPath('/home/jupyter/subreddit_clustering_i18n/data/models/aggregate_embeddings/manual_v060_2022-08-16_033651')

# Load config for embeddings aggregation

For v0.6.0 embeddings I didn't use mlflow to track the embeddings inference. We'll need to get them from these folders in GCS:

- [Subreddit metadata](https://console.cloud.google.com/storage/browser/i18n-subreddit-clustering/i18n_topic_model_batch/runs/20220629/subreddits/text/embedding/2022-06-29_084555)
    - `i18n-subreddit-clustering/i18n_topic_model_batch/runs/20220629/subreddits/text/embedding/2022-06-29_084555`
- [Post + Comment Text (already combined)](https://console.cloud.google.com/storage/browser/i18n-subreddit-clustering/i18n_topic_model_batch/runs/20220629/post_and_comment_text_combined/text_subreddit_seeds/embedding/2022-06-29_091925)
    - `i18n-subreddit-clustering/i18n_topic_model_batch/runs/20220629/post_and_comment_text_combined/text_subreddit_seeds/embedding/2022-06-29_091925`



In [69]:
cfg_agg_embeddings = LoadHydraConfig(
    config_name='aggregate_embeddings_v0.6.0.yaml',
    config_path="../config",
    overrides=[
        f"mlflow_experiment=v0.6.0_mUSE_aggregates_test",
        f"n_sample_posts_files=2",
        f"n_parallel_jobs=4",
    ],
)
print(cfg_agg_embeddings.config_dict.keys())

dict_keys(['data_text_and_metadata', 'data_embeddings_to_aggregate', 'aggregate_params', 'bucket_output', 'mlflow_tracking_uri', 'mlflow_experiment', 'n_sample_subreddits', 'n_sample_posts_files', 'n_sample_comments_files', 'n_parallel_jobs'])


In [70]:
# cfg_agg_embeddings.config_dict['data_embeddings_to_aggregate']

In [71]:
for k_, v_ in cfg_agg_embeddings.config_dict.items():
    if isinstance(v_, dict):
        print(f"{k_}:")
        for k2_, v2_ in v_.items():
            print(f"    {k2_}: {v2_}")
    else:
        print(f"{k_}: {v_}")

data_text_and_metadata:
    dataset_name: v0.6.0 inputs. ~110k seed subreddits, ~340k with 3+ posts, ~700k total subreddits
    bucket_name: i18n-subreddit-clustering
    folder_subreddits_text_and_meta: i18n_topic_model_batch/runs/20220811/subreddits/text
    folder_posts_text_and_meta: i18n_topic_model_batch/runs/20220811/posts
    folder_comments_text_and_meta: i18n_topic_model_batch/runs/20220811/comments
    folder_post_and_comment_text_and_meta: i18n_topic_model_batch/runs/20220811/post_and_comment_text_combined/text_all
data_embeddings_to_aggregate:
    bucket_embeddings: i18n-subreddit-clustering
    post_and_comments_folder_embeddings: i18n_topic_model_batch/runs/20220811/post_and_comment_text_combined/text_all/embedding/2022-08-11_084218
    subreddit_desc_folder_embeddings: i18n_topic_model_batch/runs/20220811/subreddits/text/embedding/2022-08-11_082859
    col_subreddit_id: subreddit_id
aggregate_params:
    min_post_and_comment_text_len: 3
    agg_post_post_and_comment_wei

In [72]:
# RUN_DATE = '20220629'

# BUCKET_NAME = 'i18n-subreddit-clustering'
# EMBEDDINGS_SUB_ID = '2022-06-29_084555'
# EMBEDDINGS_POST_COMMENT_ID = '2022-06-29_091925'

# Start MLflow & Log base params

In [73]:
mlf = MlflowLogger(tracking_uri=cfg_agg_embeddings.config_dict['mlflow_tracking_uri'])

In [131]:
mlflow.end_run("FAILED")

In [74]:
mlflow.end_run("FAILED")
#TODO(djb): uncomment
mlflow_experiment = cfg_agg_embeddings.config_dict['mlflow_experiment']
# 'v0.6.0_mUSE_aggregates', 'v0.6.0_mUSE_aggregates_test'


t_start_agg_embed = datetime.utcnow()
info(f"== Start run_aggregation() method ==")



info(f"MLflow tracking URI: {mlflow.get_tracking_uri()}")
mlf.set_experiment(mlflow_experiment)
mlflow.start_run()
mlf.add_git_hash_to_active_run()
mlf.set_tag_hostname(key='host_name')
mlf.log_param_hostname(key='host_name')
mlf.log_cpu_count()
mlf.log_ram_stats(param=True, only_memory_used=False)

04:32:42 | INFO | "== Start run_aggregation() method =="
04:32:42 | INFO | "MLflow tracking URI: sqlite:////home/jupyter/subreddit_clustering_i18n/mlflow_sync/djb-100-2021-04-28-djb-eda-german-subs/mlruns.db"
04:32:42 | INFO | "host_name: djb-100-2021-04-28-djb-eda-german-subs"
04:32:42 | INFO | "cpu_count: 96"
04:32:43 | INFO | "RAM stats:
{'memory_used_percent': '2.03%', 'memory_total': '1,444,961', 'memory_used': '29,348', 'memory_free': '1,407,217'}"


{'memory_total': 1444961,
 'memory_used_percent': 0.020310582776974603,
 'memory_used': 29348,
 'memory_free': 1407217}

In [75]:
# set weights
# Normalize them by dividing by 100
WEIGHT_POST_COMMENT = (
    cfg_agg_embeddings.config_dict['aggregate_params']['agg_post_post_and_comment_weight'] / 100
)
WEIGHT_SUB_META = (
    cfg_agg_embeddings.config_dict['aggregate_params']['agg_post_subreddit_desc_weight'] / 100
)
print(WEIGHT_POST_COMMENT + WEIGHT_SUB_META)
assert(1.0 == WEIGHT_POST_COMMENT + WEIGHT_SUB_META)


gcs_sub_embeddings = cfg_agg_embeddings.config_dict['data_embeddings_to_aggregate']['subreddit_desc_folder_embeddings']
print(gcs_sub_embeddings)
gcs_post_comment_embeddings = cfg_agg_embeddings.config_dict['data_embeddings_to_aggregate']['post_and_comments_folder_embeddings']
print(gcs_post_comment_embeddings)

#TODO(djb): uncomment
# mlflow.log_params(
#     {
#         'embeddings_bucket': BUCKET_NAME,
#         'embeddings_subreddit_path': gcs_sub_embeddings,
#         'embeddings_post_and_comments_path': gcs_post_comment_embeddings,
#         'weight_post_and_comments': WEIGHT_POST_COMMENT,
#         'weight_subreddit_meta': WEIGHT_SUB_META,
#     }
# )

1.0
i18n_topic_model_batch/runs/20220811/subreddits/text/embedding/2022-08-11_082859
i18n_topic_model_batch/runs/20220811/post_and_comment_text_combined/text_all/embedding/2022-08-11_084218


# Load data

In [76]:
%%time
t_start_data_load_ = datetime.utcnow()

subs_v = LoadSubredditsGCS(
    bucket_name=cfg_agg_embeddings.config_dict['data_embeddings_to_aggregate']['bucket_embeddings'],
    gcs_path=gcs_sub_embeddings,
    local_cache_path="/home/jupyter/subreddit_clustering_i18n/data/local_cache/",
    columns=None,
    col_unique_check='subreddit_id',
    df_format='pandas',
    unique_check=True,
    verbose= True,
    
    n_sample_files=None,
    n_files_slice_start=None,
    n_files_slice_end=None,
)
subs_v.local_cache()

df_v_subs = subs_v.read_as_one_df()
r_subs, c_subs = df_v_subs.shape
mlflow.log_metrics(
    {
        f"df_v_subs-rows": r_subs,
        f"df_v_subs-cols": c_subs,
    }
)
print(f"{r_subs:,.0f} rows, {c_subs:,.0f} cols")

04:32:46 | INFO | "  Local folder to download artifact(s):
  /home/jupyter/subreddit_clustering_i18n/data/local_cache/i18n-subreddit-clustering/i18n_topic_model_batch/runs/20220811/subreddits/text/embedding/2022-08-11_082859"
04:32:47 | INFO | "  7 <- Files matching prefix"
04:32:47 | INFO | "  7 <- Files to check"
04:32:47 | INFO | "    000000000000-131971_by_514.parquet <- File already exists, not downloading"
04:32:47 | INFO | "    000000000001-198630_by_514.parquet <- File already exists, not downloading"
04:32:47 | INFO | "    000000000002-441159_by_514.parquet <- File already exists, not downloading"
04:32:47 | INFO | "    2022-08-11_08-28-59_vectorize_text.log <- File already exists, not downloading"
04:32:47 | INFO | "  Files already cached: 4"
04:32:47 | INFO | "  Files already downloaded."
04:32:47 | INFO | "  df format: pandas"
04:32:51 | INFO | "  Checking ID uniqueness..."


771,760 rows, 514 cols
CPU times: user 4.23 s, sys: 4.75 s, total: 8.98 s
Wall time: 6.68 s


In [77]:
%%time

pc_v = LoadSubredditsGCS(
    bucket_name=cfg_agg_embeddings.config_dict['data_embeddings_to_aggregate']['bucket_embeddings'],
    gcs_path=gcs_post_comment_embeddings,
    local_cache_path="/home/jupyter/subreddit_clustering_i18n/data/local_cache/",
    columns=None,
    col_unique_check='post_id',
    df_format='pandas',
    unique_check=False,
    verbose= True,
    
    n_sample_files=cfg_agg_embeddings.config_dict['n_sample_posts_files'],
    n_files_slice_start=None,
    n_files_slice_end=None,
)
pc_v.local_cache()

df_v_pc = pc_v.read_as_one_df()
r_pc, c_pc = df_v_pc.shape
mlflow.log_metrics(
    {
        f"df_v_post_comments-rows": r_pc,
        f"df_v_post_comments-cols": c_pc,
    }
)
print(f"{r_pc:,.0f} rows, {c_pc:,.0f} cols")

t_data_load = elapsed_time(start_time=t_start_data_load_, log_label='Data Loading Time', verbose=True)
mlflow.log_metric('time_fxn-data_loading_time',
                  t_data_load / timedelta(minutes=1)
                  )
mlf.log_ram_stats(only_memory_used=True)

04:32:53 | INFO | "  Local folder to download artifact(s):
  /home/jupyter/subreddit_clustering_i18n/data/local_cache/i18n-subreddit-clustering/i18n_topic_model_batch/runs/20220811/post_and_comment_text_combined/text_all/embedding/2022-08-11_084218"
04:32:53 | INFO | "  2 <- Files matching prefix"
04:32:53 | INFO | "  2 <- Files to check"
04:32:53 | INFO | "    000000000000-264431_by_515.parquet <- File already exists, not downloading"
04:32:53 | INFO | "    000000000001-249532_by_515.parquet <- File already exists, not downloading"
04:32:53 | INFO | "  Files already cached: 2"
04:32:53 | INFO | "  Files already downloaded."
04:32:53 | INFO | "  df format: pandas"
04:32:57 | INFO | "  0:00:12.369568 <- Data Loading Time time elapsed"


513,963 rows, 515 cols


04:32:57 | INFO | "RAM stats:
{'memory_used_percent': '2.44%', 'memory_used': '35,188'}"


CPU times: user 3.45 s, sys: 4.16 s, total: 7.61 s
Wall time: 6.75 s


{'memory_used_percent': 0.024352214350421914, 'memory_used': 35188}

# Set weights & create copy dfs for new weights

In [78]:
l_ix_sub_level = ['subreddit_id', 'subreddit_name']
l_ix_post_level = l_ix_sub_level + ['post_id']

l_embedding_cols = [c for c in df_v_pc if c.startswith('embeddings_')]
print(len(l_embedding_cols))

512


In [110]:
%%time
#### UPDATE TO RESET TEST
df_v_pc_weighted = df_v_pc.copy()

df_v_subs_weighted = df_v_subs.copy()

# should be True b/c they're copies
print(np.allclose(df_v_pc_weighted.iloc[:1000,3:515], df_v_pc.iloc[:1000,3:515]))

print(np.allclose(df_v_subs_weighted.iloc[:1000,2:515], df_v_subs.iloc[:1000,2:515]))

True
True
CPU times: user 534 ms, sys: 459 ms, total: 993 ms
Wall time: 992 ms


In [111]:
counts_describe(df_v_pc[l_ix_post_level])

Unnamed: 0,dtype,count,unique,unique-percent,null-count,null-percent
subreddit_id,object,513963,3467,0.67%,0,0.00%
subreddit_name,object,513963,3467,0.67%,0,0.00%
post_id,object,513963,513963,100.00%,0,0.00%


In [112]:
%%time
# apply weight to all posts & subreddit meta at once (vectorized)
df_v_subs_weighted[l_embedding_cols] = df_v_subs_weighted[l_embedding_cols] * WEIGHT_SUB_META

CPU times: user 910 ms, sys: 548 ms, total: 1.46 s
Wall time: 1.45 s


In [113]:
%%time
# apply weight to all posts & subreddit meta at once (vectorized)
df_v_pc_weighted[l_embedding_cols] = df_v_pc_weighted[l_embedding_cols] * WEIGHT_POST_COMMENT

CPU times: user 624 ms, sys: 366 ms, total: 990 ms
Wall time: 988 ms


In [114]:
# NOW they shouldn't be equal (Should be False)
print(np.allclose(df_v_pc_weighted.iloc[:1000,3:515], df_v_pc.iloc[:1000,3:515]))

print(np.allclose(df_v_subs_weighted.iloc[:1000,2:515], df_v_subs.iloc[:1000,2:515]))

False
False


# Aggregate to Post-Level: Post&Comments + Subreddit Meta

It's better to let pandas handle the interations with `.groupby('subreddit_id')`. Otherwise we have to create masks for each subreddit that can take much longer (17+ hours).

- ETA with masks: +17.6 hours
- ETA with groupby ~2.5 hours

```
# mask:
0%  329/81973 [04:18<17:42:36, 1.28it/s]

# .groupby()
6% 4751/81973 [09:56<2:35:06, 8.30it/s]
```

---

Updates using `dask.delayed`:
By combining .groupby() + `dask.delayed` we can process things ~3x faster:


```
# .groupby(), no dask delayed
100% 3467/3467 [08:20<00:00, 6.97it/s]
  0:08:21.661816 <- Total Agg fxn time time elapsed


# .groupby() + dask.delayed(....to_numpy())
100% 3467/3467 [02:31<00:00, 23.08it/s]
  Wall time: 2min 38s


# masks with dask.delayed():
#  This is 2x faster than serial processing, but .groupby() + dask.delayed() is much faster
100% 3467/3467 [00:11<00:00, 299.85it/s]
05:44:20 | INFO | "Define new C1 df DAG in dask"
05:44:20 | INFO | "COMPUTE new C1 df START"
05:48:20 | INFO | "COMPUTE new C1 df DONE"
05:48:20 | INFO | "  0:04:11.393036 <- Total Agg fxn time time elapsed"
CPU times: user 4min 33s, sys: 24.3 s, total: 4min 57s
Wall time: 4min 12s


```


In [93]:
%%time

info(f"Start C1 - posts + comments + sub descriptions")
t_start_agg_post_c1 = datetime.utcnow()

l_df_c1_weights_no_delay_ = list()

for s_id, df_ in tqdm(
    df_v_pc_weighted.groupby('subreddit_id'),
    ascii=True, mininterval=5,
):
    df_.loc[:, l_embedding_cols] = np.add(
        df_v_subs_weighted[df_v_subs_weighted['subreddit_id'] == s_id][l_embedding_cols].to_numpy(),
        df_[l_embedding_cols]
    )
    l_df_c1_weights_no_delay_.append(df_)
    del df_


info(f"Create new C1 df")
df_posts_agg_c1 = pd.concat(l_df_c1_weights_no_delay_, ignore_index=True)

r_, c_ = df_posts_agg_c1.shape
mlflow.log_metrics(
    {
        f"df_posts_agg_c1-rows": r_,
        f"df_posts_agg_c1-cols": c_,
    }
)
print(f"{r_:,.0f} rows, {c_:,.0f} cols")
del r_, c_

t_agg_pc_c1 = elapsed_time(start_time=t_start_agg_post_c1, log_label='Total Agg fxn time', verbose=True)
mlflow.log_metric('time_fxn-df_posts_agg_c1_no_delay',
                  t_agg_pc_c1 / timedelta(minutes=1)
                  )
info(f"C1 - post level complete")

05:01:58 | INFO | "Start C1 - posts + comments + sub descriptions"


  0%|          | 0/3467 [00:00<?, ?it/s]

05:10:18 | INFO | "Create new C1 df"
05:10:19 | INFO | "  0:08:21.661816 <- Total Agg fxn time time elapsed"
05:10:19 | INFO | "C1 - post level complete"


513,963 rows, 515 cols


In [94]:
df_posts_agg_c1.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 513963 entries, 0 to 513962
Columns: 515 entries, subreddit_id to embeddings_511
dtypes: float32(512), object(3)
memory usage: 1015.6+ MB


In [95]:
df_posts_agg_c1.iloc[:5, :10]

Unnamed: 0,subreddit_id,subreddit_name,post_id,embeddings_0,embeddings_1,embeddings_2,embeddings_3,embeddings_4,embeddings_5,embeddings_6
0,t5_1001tl,jewel_xo,t3_w6lnkt,-0.02675,-0.03619,0.093885,0.095756,-0.029146,0.069554,-0.029806
1,t5_10029e,milkyhentai,t3_wacyh8,-0.052194,-0.001679,0.06613,-0.001884,-0.048534,0.063257,0.087531
2,t5_1006k8,badwouldyourather,t3_v9i9a9,-0.056388,0.053583,-0.036472,0.077083,0.103991,0.072819,0.060729
3,t5_100806,jojojosiah,t3_v49gii,0.050979,-0.01909,0.066897,0.010666,-0.003617,0.106058,-0.079753
4,t5_100806,jojojosiah,t3_v49tw9,-0.024774,0.023409,0.084235,0.09153,0.115938,0.019496,0.011256


In [96]:
%%time
info(f"Start C1 - posts + comments + sub descriptions")
t_start_agg_post_c1 = datetime.utcnow()

l_df_c1_weights = list()

for s_id, df_ in tqdm(
    df_v_pc_weighted.groupby('subreddit_id'),
    ascii=True, mininterval=5,
):
    df_pc_embeddings_ = dask.delayed(np.add)(
        # df_v_subs_weighted[df_v_subs_weighted['subreddit_id'] == s_id][l_embedding_cols].to_numpy(),
        dask.delayed(df_v_subs_weighted[df_v_subs_weighted['subreddit_id'] == s_id][l_embedding_cols].to_numpy()),
        dask.delayed(df_[l_embedding_cols])
    )
    l_df_c1_weights.append(
        dask.delayed(pd.concat)([dask.delayed(df_[l_ix_post_level]), df_pc_embeddings_], ignore_index=False, axis=1)
    )

info(f"Define new C1 df DAG in dask")
df_posts_agg_c1_delayed = dask.delayed(pd.concat)(l_df_c1_weights, ignore_index=True)

info(f"COMPUTE new C1 df START")
df_posts_agg_c1_dd = df_posts_agg_c1_delayed.compute()
info(f"COMPUTE new C1 df DONE")

r_, c_ = df_posts_agg_c1_dd.shape
mlflow.log_metrics(
    {
        f"df_posts_agg_c1-rows": r_,
        f"df_posts_agg_c1-cols": c_,
    }
)
print(f"{r_:,.0f} rows, {c_:,.0f} cols")
del r_, c_

t_agg_pc_c1 = elapsed_time(start_time=t_start_agg_post_c1, log_label='Total Agg fxn time', verbose=True)
mlflow.log_metric('time_fxn-df_posts_agg_c1_dask_delayed',
                  t_agg_pc_c1 / timedelta(minutes=1)
                  )
info(f"C1 - post level complete")
mlf.log_ram_stats(only_memory_used=True)

05:18:56 | INFO | "Start C1 - posts + comments + sub descriptions"


  0%|          | 0/3467 [00:00<?, ?it/s]

05:21:28 | INFO | "Define new C1 df DAG in dask"
05:21:28 | INFO | "COMPUTE new C1 df START"
05:21:34 | INFO | "COMPUTE new C1 df DONE"
05:21:34 | INFO | "  0:02:37.885677 <- Total Agg fxn time time elapsed"
05:21:34 | INFO | "C1 - post level complete"


513,963 rows, 515 cols


05:21:35 | INFO | "RAM stats:
{'memory_used_percent': '2.48%', 'memory_used': '35,872'}"


CPU times: user 2min 37s, sys: 2.71 s, total: 2min 39s
Wall time: 2min 38s


{'memory_used_percent': 0.02482558352785992, 'memory_used': 35872}

In [97]:
df_[l_ix_post_level].head()

Unnamed: 0,subreddit_id,subreddit_name,post_id
247128,t5_1387r9,amazonfc,t3_umx0ug
247129,t5_1387r9,amazonfc,t3_umx2v1
247130,t5_1387r9,amazonfc,t3_umxauu
247131,t5_1387r9,amazonfc,t3_umxiur
247132,t5_1387r9,amazonfc,t3_umxsr4


In [100]:
# df_posts_agg_c1_delayed.visualize(filename='posts_agg_test', format='svg', optimize_graph=True)

In [89]:
df_posts_agg_c1.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 513963 entries, 0 to 513962
Columns: 515 entries, 0 to 514
dtypes: float32(512), object(3)
memory usage: 1015.6+ MB


In [101]:
df_posts_agg_c1_dd.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 513963 entries, 0 to 513962
Columns: 515 entries, subreddit_id to embeddings_511
dtypes: float32(512), object(3)
memory usage: 1015.6+ MB


In [102]:
df_posts_agg_c1_dd.iloc[:5, :10]

Unnamed: 0,subreddit_id,subreddit_name,post_id,embeddings_0,embeddings_1,embeddings_2,embeddings_3,embeddings_4,embeddings_5,embeddings_6
0,t5_1001tl,jewel_xo,t3_w6lnkt,-0.02675,-0.03619,0.093885,0.095756,-0.029146,0.069554,-0.029806
1,t5_10029e,milkyhentai,t3_wacyh8,-0.052194,-0.001679,0.06613,-0.001884,-0.048534,0.063257,0.087531
2,t5_1006k8,badwouldyourather,t3_v9i9a9,-0.056388,0.053583,-0.036472,0.077083,0.103991,0.072819,0.060729
3,t5_100806,jojojosiah,t3_v49gii,0.050979,-0.01909,0.066897,0.010666,-0.003617,0.106058,-0.079753
4,t5_100806,jojojosiah,t3_v49tw9,-0.024774,0.023409,0.084235,0.09153,0.115938,0.019496,0.011256


In [116]:
%%time
info(f"Start C1 - posts + comments + sub descriptions")
t_start_agg_post_c1 = datetime.utcnow()

l_df_c1_weights_dd = list()

# instead of groupby, try running list of subreddit IDs to try to run df masks in parallel
l_subreddits_in_pc_ = df_v_pc_weighted['subreddit_id'].unique()

df_v_subs_weighted_dd = dask.delayed(df_v_subs_weighted)
df_v_pc_weighted_dd = dask.delayed(df_v_pc_weighted)

for s_id in tqdm(
    l_subreddits_in_pc_,
    ascii=True, mininterval=5,
):
    df_ = dask.delayed(df_v_pc_weighted_dd[df_v_pc_weighted_dd['subreddit_id'] == s_id])
    df_pc_embeddings_ = dask.delayed(np.add)(
        dask.delayed(df_v_subs_weighted_dd[df_v_subs_weighted_dd['subreddit_id'] == s_id][l_embedding_cols].to_numpy()),
        dask.delayed(df_[l_embedding_cols])
    )
    l_df_c1_weights_dd.append(
        dask.delayed(pd.concat)([dask.delayed(df_[l_ix_post_level]), df_pc_embeddings_], ignore_index=False, axis=1)
    )

info(f"Define new C1 df DAG in dask")
df_posts_agg_c1_delayed2 = dask.delayed(pd.concat)(l_df_c1_weights_dd, ignore_index=True)

info(f"COMPUTE new C1 df START")
df_posts_agg_c1_dd2 = df_posts_agg_c1_delayed2.compute()
info(f"COMPUTE new C1 df DONE")

r_, c_ = df_posts_agg_c1_dd2.shape
mlflow.log_metrics(
    {
        f"df_posts_agg_c1-rows": r_,
        f"df_posts_agg_c1-cols": c_,
    }
)
print(f"{r_:,.0f} rows, {c_:,.0f} cols")
del r_, c_

t_agg_pc_c1 = elapsed_time(start_time=t_start_agg_post_c1, log_label='Total Agg fxn time', verbose=True)
mlflow.log_metric('time_fxn-df_posts_agg_c1_dask_delayed',
                  t_agg_pc_c1 / timedelta(minutes=1)
                  )
info(f"C1 - post level complete")
mlf.log_ram_stats(only_memory_used=True)

05:44:08 | INFO | "Start C1 - posts + comments + sub descriptions"


  0%|          | 0/3467 [00:00<?, ?it/s]

05:44:20 | INFO | "Define new C1 df DAG in dask"
05:44:20 | INFO | "COMPUTE new C1 df START"
05:48:20 | INFO | "COMPUTE new C1 df DONE"
05:48:20 | INFO | "  0:04:11.393036 <- Total Agg fxn time time elapsed"
05:48:20 | INFO | "C1 - post level complete"


513,963 rows, 515 cols


05:48:20 | INFO | "RAM stats:
{'memory_used_percent': '2.63%', 'memory_used': '37,949'}"


CPU times: user 4min 33s, sys: 24.3 s, total: 4min 57s
Wall time: 4min 12s


{'memory_used_percent': 0.026262992565197263, 'memory_used': 37949}

### Check that delayed & sequential outputs are the same

In [103]:
np.allclose(df_posts_agg_c1_dd[l_embedding_cols], df_posts_agg_c1[l_embedding_cols])

True

In [117]:
np.allclose(df_posts_agg_c1_dd2[l_embedding_cols], df_posts_agg_c1[l_embedding_cols])

False

In [118]:
np.allclose(df_posts_agg_c1_dd[l_embedding_cols], df_posts_agg_c1_dd2[l_embedding_cols])

False

### Save post-level

In [19]:
d_dfs_to_save = defaultdict(dict)

In [19]:
# %%time
# d_dfs_to_save['df_posts_agg_c1']['local'] = (
#     path_this_model / f"df_posts_agg_c1_{datetime.utcnow().strftime('%Y-%m-%d_%H%M')}"
# )

# save_pd_df_to_parquet_in_chunks(
#     df_posts_agg_c1,
#     d_dfs_to_save['df_posts_agg_c1']['local'],
#     write_index=False
# )

# info(f"  Logging df to mlflow...")
# mlflow.log_artifacts(d_dfs_to_save['df_posts_agg_c1']['local'], artifact_path='df_posts_agg_c1')

# Aggregate to Subreddit Level

In [126]:
# first, figure out how many posts each subreddit has
c_post_embedding_count = 'posts_for_embeddings_count'

df_posts_for_embeddings = (
    df_posts_agg_c1
    .groupby(l_ix_sub_level, as_index=False)
    .agg(**{c_post_embedding_count: ('post_id', 'nunique')})
)
# fill subs that have no posts
df_posts_for_embeddings = pd.concat(
    [
        df_posts_for_embeddings, 
        df_v_subs[l_ix_sub_level].assign(**{c_post_embedding_count: 0})
    ],
    axis=0
)

# then, apply rules based on # of posts
# 3+ posts: simple mean()

# 1-2 posts: 1) append subreddit_desc, 2) apply mean of posts + subreddit_desc

# 0 posts: fill in subreddit_desc


In [127]:
df_posts_for_embeddings.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 775227 entries, 0 to 441158
Data columns (total 3 columns):
 #   Column                      Non-Null Count   Dtype 
---  ------                      --------------   ----- 
 0   subreddit_id                775227 non-null  object
 1   subreddit_name              775227 non-null  object
 2   posts_for_embeddings_count  775227 non-null  int64 
dtypes: int64(1), object(2)
memory usage: 23.7+ MB


In [130]:
df_posts_for_embeddings[df_posts_for_embeddings[c_post_embedding_count] >= 1][c_post_embedding_count].describe()

count    3467.000000
mean      148.244303
std       743.004199
min         1.000000
25%         1.000000
50%         5.000000
75%        29.000000
max      8400.000000
Name: posts_for_embeddings_count, dtype: float64

In [128]:
df_posts_for_embeddings.head()

Unnamed: 0,subreddit_id,subreddit_name,posts_for_embeddings_count
0,t5_1001tl,jewel_xo,1
1,t5_10029e,milkyhentai,1
2,t5_1006k8,badwouldyourather,1
3,t5_100806,jojojosiah,2
4,t5_1009a3,memesenespanol,380


In [129]:
df_posts_for_embeddings.tail()

Unnamed: 0,subreddit_id,subreddit_name,posts_for_embeddings_count
441154,t5_6rmvt7,shaunthebuilder,0
441155,t5_6e6udp,thedepthsoftokyo,0
441156,t5_4la7jo,spongebobcringememes,0
441157,t5_6gghfe,hardddd,0
441158,t5_6dn0c7,minecraft_heroes,0


In [134]:
df_v_subs.head()

Unnamed: 0,subreddit_id,subreddit_name,embeddings_0,embeddings_1,embeddings_2,embeddings_3,embeddings_4,embeddings_5,embeddings_6,embeddings_7,embeddings_8,embeddings_9,embeddings_10,embeddings_11,embeddings_12,embeddings_13,embeddings_14,embeddings_15,embeddings_16,embeddings_17,embeddings_18,embeddings_19,embeddings_20,embeddings_21,embeddings_22,embeddings_23,embeddings_24,embeddings_25,embeddings_26,embeddings_27,...,embeddings_482,embeddings_483,embeddings_484,embeddings_485,embeddings_486,embeddings_487,embeddings_488,embeddings_489,embeddings_490,embeddings_491,embeddings_492,embeddings_493,embeddings_494,embeddings_495,embeddings_496,embeddings_497,embeddings_498,embeddings_499,embeddings_500,embeddings_501,embeddings_502,embeddings_503,embeddings_504,embeddings_505,embeddings_506,embeddings_507,embeddings_508,embeddings_509,embeddings_510,embeddings_511
0,t5_2qh1i,askreddit,0.022789,-0.059892,-0.001451,0.046439,0.066976,0.064597,0.057965,-0.013464,-0.046748,-0.044211,-0.014546,-0.037996,0.028808,-0.060112,0.013253,-0.026466,0.061427,-0.066526,0.031853,0.000255,-0.06149,-0.054536,0.03371,-0.04571,-0.010254,-0.002935,-0.012825,-0.014563,...,-0.05294,0.060304,-0.012639,-0.054029,-0.065171,-0.065921,-0.029465,0.024692,0.021926,-0.041012,-0.061232,0.044805,-0.033858,-0.023948,-0.009575,0.058824,-0.057304,0.012438,-0.040396,0.054599,0.045959,-0.010542,0.03491,0.038669,0.00164,0.064855,0.017133,0.043527,-0.023271,0.065759
1,t5_2ubgg,mildlyinfuriating,0.035654,0.031603,0.068379,0.05736,0.037998,0.048036,0.031026,0.002395,0.017303,0.044306,0.021372,-0.012552,-0.012714,0.045416,-0.06623,-0.001657,0.011504,-0.039013,0.054168,-0.017587,-0.068444,-0.074627,0.018681,0.063553,0.003351,-0.014084,0.027576,-0.071292,...,-0.050256,-0.036606,-0.060719,-0.071781,-0.030471,-0.074615,0.062072,-0.05049,-0.010802,0.025156,0.045255,0.046165,0.042267,-0.020638,-0.059023,-0.029685,-0.021343,-0.041109,-0.010315,0.013957,-0.000502,0.023802,-0.068301,0.004283,-0.024348,0.004078,-0.009777,-0.038842,0.042836,0.052957
2,t5_2qh33,funny,0.062798,-0.063764,-0.011798,-0.01751,0.066221,0.052268,0.023606,0.062803,-0.05654,-0.014388,-0.061934,-0.040253,0.021859,-0.032234,-0.054955,0.000749,-0.029629,-0.064663,0.062429,-0.059817,-0.058802,-0.061951,0.044884,0.048427,-0.050783,-0.030948,-0.000821,0.006162,...,0.004339,0.02012,-0.066478,-0.034199,0.034718,-0.065102,-0.028542,0.052982,0.034749,-0.029207,0.045916,-0.005312,-0.048881,-0.031554,0.004063,0.051525,0.001364,0.026816,-0.01356,-0.004556,-0.029917,-0.011541,-0.06291,0.025208,0.030484,0.030693,0.018124,-0.060697,0.033602,0.054727
3,t5_2y77d,antiwork,-0.036583,0.056425,0.052511,0.001249,0.033096,0.015373,0.028894,-0.038881,0.050351,-0.014261,0.035479,0.001364,-0.035047,-0.025128,-0.067778,0.050507,0.061732,-0.045989,-0.062415,0.044178,0.062185,-0.063504,0.065321,-0.047625,-0.05271,0.003137,-0.039466,-0.067172,...,-0.043989,-0.012928,-0.064315,-0.044713,-0.036838,-0.068507,0.015802,-0.026234,-0.006225,0.060826,0.053683,-0.034692,0.032022,0.02028,-0.019914,0.020297,-0.01652,-0.021231,0.032348,-0.019186,-0.025301,0.033985,-0.054834,0.010192,-0.022206,-0.014984,-0.052996,-0.029742,-0.032605,0.062173
4,t5_2qh3l,news,-0.048809,0.061595,0.045624,0.007804,0.072698,0.035929,0.063459,0.054612,-0.011245,0.070547,-0.031765,-0.057275,0.055175,-0.003909,-0.047737,0.011671,0.017775,-0.047829,-0.044221,-0.007286,0.01621,0.017299,0.011912,0.053568,-0.057371,0.036779,0.059203,-0.058659,...,0.033511,-0.008072,-0.040351,-0.041777,0.000349,-0.074678,-0.014992,0.069233,0.01304,-0.053166,0.044095,0.015649,-0.066042,0.037967,0.050074,0.058147,-0.062874,-0.009892,-0.015297,0.029885,0.013304,-0.042156,-0.03675,-0.030743,0.064008,0.018819,0.036405,-0.073308,-0.042354,0.074897


In [142]:
%%time

# first, figure out how many posts each subreddit has
info(f"Count posts per subreddit...")
c_post_embedding_count = 'posts_for_embeddings_count'


df_posts_for_embedding_count = (
    df_posts_agg_c1
    .groupby(l_ix_sub_level, as_index=False)
    .agg(**{c_post_embedding_count: ('post_id', 'nunique')})
)
# fill subs that have no posts
df_posts_for_embedding_count = pd.concat(
    [
        df_posts_for_embedding_count, 
        df_v_subs[l_ix_sub_level].assign(**{c_post_embedding_count: 0})
    ],
    axis=0
)

# min_posts >= -> regular mean. If it's less than this, then mix in subreddit_description into average
n_min_posts_for_regular_mean = 3
subreddits_above_n_ = (
    df_posts_for_embeddings
    [df_posts_for_embeddings[c_post_embedding_count] >= n_min_posts_for_regular_mean]
    ['subreddit_id']
)
subreddits_below_n_ = set(df_v_subs['subreddit_id']) - set(subreddits_above_n_)
mask_min_posts_for_reg_mean = df_posts_agg_c1['subreddit_id'].isin(subreddits_above_n_)


info(f"SUBREDDIT-LEVEL C1 - posts + comments + sub descriptions")
t_start_agg_subs_c1 = datetime.utcnow()

# 3+ posts: simple mean()
info(f"Mean for subs above threshold: {n_min_posts_for_regular_mean}")
df_subs_agg_c1_Nplus = (
    df_posts_agg_c1[mask_min_posts_for_reg_mean]
    .groupby(l_ix_sub_level, as_index=False)
    .mean()
)

# calculate mean for all other subs: add UNWEIGHTED subreddit_description into averages
info(f"Calculating mean for subs BELOW post threshold...")
df_subs_agg_c1_Nbelow = (
    pd.concat(
        [
            df_posts_agg_c1[~mask_min_posts_for_reg_mean],
            df_v_subs[df_v_subs['subreddit_id'].isin(subreddits_below_n_)]
        ]
    )
    .groupby(l_ix_sub_level, as_index=False)
    .mean()
)
info(f"Combining all subreddits...")
df_subs_agg_c1 = (
    df_posts_for_embedding_count
    .merge(
        pd.concat([df_subs_agg_c1_Nplus, df_subs_agg_c1_Nbelow]),
        how='outer',
        on=l_ix_sub_level
    )
    .sort_values(by=l_ix_sub_level)
)

r_, c_ = df_subs_agg_c1.shape
mlflow.log_metrics(
    {
        f"df_subs_agg_c1-rows": r_,
        f"df_subs_agg_c1-cols": c_,
    }
)
print(f"{r_:,.0f} rows, {c_:,.0f} cols")
del r_, c_

t_agg_subs_c1 = elapsed_time(start_time=t_start_agg_subs_c1, log_label='Total Agg fxn time', verbose=True)
mlflow.log_metric('time_fxn-df_subs_agg_c1',
                  t_agg_subs_c1 / timedelta(minutes=1)
                  )
info(f"  <- df_subs_agg_c1.shape (posts + comments + sub description)")
mlf.log_ram_stats(only_memory_used=True)

06:58:26 | INFO | "Count posts per subreddit..."
06:58:27 | INFO | "SUBREDDIT-LEVEL C1 - posts + comments + sub descriptions"
06:58:27 | INFO | "Mean for subs above threshold: 3"
06:58:30 | INFO | "Calculating mean for subs BELOW post threshold..."
06:58:49 | INFO | "Combining all subreddits..."
06:59:00 | INFO | "  0:00:32.975905 <- Total Agg fxn time time elapsed"
06:59:00 | INFO | "  <- df_subs_agg_c1.shape (posts + comments + sub description)"


775,227 rows, 515 cols


06:59:00 | INFO | "RAM stats:
{'memory_used_percent': '3.04%', 'memory_used': '43,936'}"


CPU times: user 27.3 s, sys: 6.53 s, total: 33.8 s
Wall time: 34.2 s


{'memory_used_percent': 0.030406356988181688, 'memory_used': 43936}

In [143]:
df_subs_agg_c1.iloc[-8:, :10]

Unnamed: 0,subreddit_id,subreddit_name,posts_for_embeddings_count,embeddings_0,embeddings_1,embeddings_2,embeddings_3,embeddings_4,embeddings_5,embeddings_6
153860,t5_zzsmq,floridamanatheart,0,-0.003054,0.041708,-0.054448,0.050554,0.039986,0.026442,-0.013792
35803,t5_zzszh,circumcisiongrief,0,-0.016152,0.052365,-0.029968,0.045438,0.052287,0.040738,-0.013399
100710,t5_zzw6f,missourisingles,0,0.016408,-0.042155,-0.012844,0.036364,0.026492,0.037414,0.060839
383226,t5_zzw7y,geofssim,0,-0.042752,0.071976,-0.052646,-0.083755,-0.03228,8.6e-05,0.00368
144759,t5_zzwrs,hypnosisisbs,0,0.014406,-0.054694,-0.001471,-0.045701,0.05693,0.056256,-0.057434
339276,t5_zzyg0,creepyscarystories,0,0.017779,0.03914,0.03519,0.01229,0.056131,0.063757,0.053728
465011,t5_zzze9,demonmemes,0,-0.043147,-0.073882,-0.074882,0.004195,0.037517,0.05564,-0.048743
502502,t5_zzzyw,rachelnicki,0,-0.050092,-0.033809,-0.03328,0.019984,0.050981,0.06007,0.047383


### Save Subreddit level

This one we can save as a pandas df, no need to split it into multiple files

In [23]:
%%time
df_subs_agg_c1.to_parquet(
    path_this_model / f"df_subs_agg_c1-{datetime.utcnow().strftime('%Y-%m-%d_%H%M')}.parquet"
)

CPU times: user 2.46 s, sys: 363 ms, total: 2.82 s
Wall time: 2.53 s


In [24]:
%%time
d_dfs_to_save['df_subs_agg_c1']['local'] = (
    path_this_model / f"df_subs_agg_c1{datetime.utcnow().strftime('%Y-%m-%d_%H%M')}"
)

save_pd_df_to_parquet_in_chunks(
    df_subs_agg_c1,
    d_dfs_to_save['df_subs_agg_c1']['local'],
    write_index=False
)

info(f"  Logging df to mlflow...")
mlflow.log_artifacts(d_dfs_to_save['df_subs_agg_c1']['local'], artifact_path='df_subs_agg_c1')

14:36:54 | INFO | "Converting pandas to dask..."
14:36:54 | INFO | "   171.2 MB <- Memory usage"
14:36:54 | INFO | "       1	<- target Dask partitions	  350.0 <- target MB partition size"
14:36:57 | INFO | "  Logging df to mlflow..."


CPU times: user 2.82 s, sys: 428 ms, total: 3.25 s
Wall time: 5.22 s


In [25]:
# finish logging total time + end mlflow run
total_fxn_time = elapsed_time(start_time=t_start_agg_embed, log_label='Total Agg fxn time', verbose=True)
mlflow.log_metric('time_fxn-full_aggregation_fxn_minutes',
                  total_fxn_time / timedelta(minutes=1)
                  )
mlflow.end_run()

14:36:59 | INFO | "  2:54:43.728899 <- Total Agg fxn time time elapsed"


In [132]:
mlflow.end_run("FAILED")