# Purpose


**2023-04-07**: 
<br> here we'll test a simplified flow to get an ETA for how long it'll take to take subreddit-embeddings to get user-level embeddings.

We'll test pandas v. polars. The hope is that polars will boost parallel performance enough to make the embedding step feasible to run in less than one day. If we can run the embedding step fast enough, we might be able to cache the job daily so that we can keep the latest data ready to use for PN and for other models.

---



# Imports & Setup

In [1]:
%load_ext autoreload
%autoreload 2

In [108]:
from datetime import datetime
import logging
from logging import info
import os
from pathlib import Path
import json

import polars as pl
import numpy as np
import pandas as pd
import plotly
import seaborn as sns

from tqdm.auto import tqdm
import dask
import mlflow
import hydra

import subclu
from subclu.eda.aggregates import compare_raw_v_weighted_language
from subclu.utils import set_working_directory, get_project_subfolder
from subclu.utils.eda import (
    setup_logging, counts_describe, value_counts_and_pcts,
    notebook_display_config, print_lib_versions,
    style_df_numeric, reorder_array,
)
from subclu.utils.mlflow_logger import MlflowLogger
from subclu.utils.hydra_config_loader import LoadHydraConfig
from subclu.utils.data_irl_style import (
    get_colormap, theme_dirl, 
    get_color_dict, base_colors_for_manual_labels,
    check_colors_used,
)
from subclu.data.data_loaders import LoadPosts, LoadSubreddits, create_sub_level_aggregates


# ===
# imports specific to this notebook
from collections import Counter

import sklearn
from sklearn.decomposition import TruncatedSVD
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.preprocessing import normalize  # if we normalize the data, euclidean distance is approx of cosine


print_lib_versions([hydra, np, pd, pl, plotly, sklearn, subclu])

python		v 3.7.10
===
hydra		v: 1.1.0
numpy		v: 1.19.5
pandas		v: 1.2.4
polars		v: 0.17.1
plotly		v: 5.11.0
sklearn		v: 0.24.1
subclu		v: 0.6.1


In [3]:
# plotting
import matplotlib.pyplot as plt
import matplotlib.ticker as mtick
import matplotlib.dates as mdates
plt.style.use('default')

setup_logging()
notebook_display_config()

# Define custom functions

In [169]:
from typing import Any, Union


def reshape_tos_for_df(
    user_id: str,
    tos_str_dict: str,
    tos_col_name: str = 'tos_pct',
) -> list:
    """Take the nested dict in a df and reshape it so that we can get a long df
    where each row is a user+subreddit Time on Sub percentage
    """
    d_tos_in = json.loads(tos_str_dict)
    
    d_out = {
        'user_id': [user_id] * len(d_tos_in),
        'subreddit_id': list(),
        tos_col_name: list(),
    }
    
    for sub_id, tos_pc in d_tos_in.items():
        d_out['subreddit_id'].append(sub_id)
        d_out[tos_col_name].append(tos_pc)
    
    return d_out


def delayed_select_for_polars(
    pl_df: pl.LazyFrame,
    select_kwargs: Any,
) -> pl.DataFrame:
    """Use this function as a fix for polars.lazy() that breaks when selecting an unnested struct
    If we apply this function with dask.delayed() we can compute the long df_tos in 10 parallel jobs(!)
    """
    return (
        pl_df
        .compute()
        .select(
            select_kwargs
        )
    )
    

# Get data

## Get user IDs to embed with time-on-subreddit features

I'm caching the data in a view so that it's easier to re-use while testing.

The time-on-subreddit feature is already in gazette features so we don't need to waste time computing them.

```SQL
SELECT 
    DISTINCT user_id

FROM `data-prod-165221.fact_tables.post_consume_post_detail_view_events` 
WHERE DATE(pt) BETWEEN (CURRENT_DATE() - 7) AND (CURRENT_DATE() - 2)
    AND subreddit_name IN ('themandaloriantv')
    AND action IN ('consume', 'view')
-- LIMIT 1000
```

In [5]:
%%time
%%bigquery df_users_raw_tos --project data-science-prod-218515 

-- pull the user IDs and the time on sub, but do the calculations in python b/c 
--   it can be a pain to conver to rows in BQ

SELECT
    ui.user_id
    , t.* EXCEPT(user_id)
    -- , t.feature_value
    -- , t.json_val 
    -- , t.feature_name
FROM (
        SELECT
            entity_id AS user_id
            , feature_value
            -- , (SAFE.parse_json(feature_value)) AS json_val
        FROM `data-prod-165221.user_feature_platform.time_on_subreddit_pct_time_over_30_day_v1`

        WHERE DATE(pt) = CURRENT_DATE() - 3 -- Latest appears to be (Today-3) b/c of lag from other tables
    ) AS t
    INNER JOIN (
        SELECT *
        FROM `reddit-employee-datasets.david_bermejo.pn_test_users_for_embedding` 
        -- LIMIT 10
    ) AS ui
        ON ui.user_id = t.user_id
;

Query complete after 0.00s: 100%|██████████| 5/5 [00:00<00:00, 3921.38query/s]                        
Downloading: 100%|██████████| 856423/856423 [00:10<00:00, 79436.67rows/s] 


CPU times: user 8.91 s, sys: 4.57 s, total: 13.5 s
Wall time: 35.7 s


In [6]:
df_users_raw_tos.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 856423 entries, 0 to 856422
Data columns (total 2 columns):
 #   Column         Non-Null Count   Dtype 
---  ------         --------------   ----- 
 0   user_id        856423 non-null  object
 1   feature_value  856423 non-null  object
dtypes: object(2)
memory usage: 13.1+ MB


In [7]:
df_users_raw_tos.head()

Unnamed: 0,user_id,feature_value
0,t2_8p8h0ki0b,"{""t5_3fn31"":0.87899,""t5_2qhsa"":0.00445,""t5_2tex6"":0.00187,""t5_2xcv7"":0.00351,""t5_dyqlw"":0.05844,""t5_2qh33"":0.00406,""t5_3boqz"":0.00117,""t5_2s7tt"":0.00421,""t5_3aegn"":0.0103,""t5_2r5rp"":0.00335,""t5_2w7mz"":0.00437,""t5_3fzm9"":0.00359,""t5_2xhv..."
1,t2_8p90xabzf,"{""t5_3phq9"":0.13502,""t5_2qh1i"":0.22281,""t5_23gidu"":0.04516,""t5_2vg7t"":0.04516,""t5_2w844"":0.01567,""t5_2qkhb"":0.01912,""t5_2yrq6"":0.02788,""t5_2qh0u"":0.18687,""t5_2rqcm8"":0.07166,""t5_3boqz"":0.02304,""t5_3gcwj"":0.07742,""t5_2si92"":0.10645,""t5_3..."
2,t2_8p81pzwi9,"{""t5_2wzm6"":0.0219,""t5_2cneq"":0.00365,""t5_2tex6"":0.01825,""t5_30tmh"":0.01095,""t5_386zh"":0.04866,""t5_2w844"":0.06813,""t5_2w7mz"":0.09611,""t5_2vegg"":0.0438,""t5_3mj3w"":0.04988,""t5_2r40o"":0.13504,""t5_3i3vm"":0.0365,""t5_2qhds"":0.04136,""t5_2qh72""..."
3,t2_8p90y,"{""t5_38y725"":0.00134,""t5_27xp1u"":2e-05,""t5_7m6f8m"":1e-05,""t5_2tdzg"":0.00057,""t5_2taso"":2e-05,""t5_2sokd"":0.02098,""t5_2rjz2"":0.00395,""t5_2s4pd"":0.01769,""t5_2qh61"":0.0028,""t5_2qimj"":2e-05,""t5_r1l2z"":6e-05,""t5_2s5ti"":0.00164,""t5_2xinb"":0.00..."
4,t2_8p853,"{""t5_2s6nz"":0.00038,""t5_2y1j5"":8e-05,""t5_4b0bzv"":0.0001,""t5_2va9w"":5e-05,""t5_35n7t"":0.05491,""t5_2z8fuj"":0.00109,""t5_2xp02"":0.00107,""t5_3chmw"":0.00012,""t5_2tk0s"":1e-05,""t5_3ptyd"":0.00131,""t5_mvcq5"":0.01587,""t5_2vxxc"":0.0004,""t5_2s28b"":0...."


## Get subreddit-level embeddings

In [8]:
%%time
%%bigquery df_sub_emb --project data-science-prod-218515 

SELECT
  subreddit_id
  , subreddit_name
  , ARRAY_CONCAT(embedding) AS embedding
FROM `data-prod-165221.ml_content.subreddit_embeddings_ft2`
WHERE DATE(pt) = '2023-04-04'

Query complete after 0.00s: 100%|██████████| 1/1 [00:00<00:00, 1375.18query/s]
Downloading: 100%|██████████| 242345/242345 [00:03<00:00, 76432.72rows/s] 


CPU times: user 613 ms, sys: 414 ms, total: 1.03 s
Wall time: 4.24 s


In [9]:
df_sub_emb.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 242345 entries, 0 to 242344
Data columns (total 3 columns):
 #   Column          Non-Null Count   Dtype 
---  ------          --------------   ----- 
 0   subreddit_id    242345 non-null  object
 1   subreddit_name  242345 non-null  object
 2   embedding       242345 non-null  object
dtypes: object(3)
memory usage: 5.5+ MB


In [10]:
df_sub_emb.head()

Unnamed: 0,subreddit_id,subreddit_name,embedding
0,t5_4f57cs,fuckangelhernandez,"[-0.3996698260307312, 0.20269957184791565, -0.3055022060871124, -0.005287058185786009, 0.121136873960495, 0.06231916323304176, -0.37654876708984375, 0.48437756299972534, -0.2151433378458023, 0.31947633624076843, 0.4782879948616028, 0.23..."
1,t5_2s2gt,fuckapple,"[-0.0029481418896466494, 0.053129445761442184, -0.17849114537239075, -0.1555848866701126, -0.20715922117233276, 0.2648947238922119, 0.24177874624729156, 0.23757821321487427, 0.011948454193770885, 0.3712220788002014, 0.3585907518863678, ..."
2,t5_7s1lcv,fuckaround_n_findout,"[-0.25010135769844055, -0.05121876671910286, -0.4104999303817749, 0.17199638485908508, -0.2111128717660904, -0.01367927622050047, -0.41924068331718445, 0.15966467559337616, 0.1576160341501236, 0.398811936378479, 0.3563167154788971, 0.07..."
3,t5_2mpk27,fuckautismspeaks,"[-0.643264889717102, -0.3077889084815979, -0.8766250610351562, 0.34814009070396423, -0.3688446581363678, -0.09676993638277054, -0.01420139241963625, 0.11278355121612549, 0.2917800843715668, 0.48437413573265076, -0.09958196431398392, -0...."
4,t5_5rkktn,fuckbagin,"[0.09801478683948517, -0.26190200448036194, 0.09046468883752823, -0.18976961076259613, -0.06223154440522194, 0.34227171540260315, -0.07571414113044739, 0.3605010509490967, 0.013768891803920269, 0.4096928536891937, 0.27020058035850525, -..."


# Transform - Count of subreddits per user


In [11]:
%%time

dfp_users_tos = pl.DataFrame(df_users_raw_tos)

CPU times: user 1.09 s, sys: 842 ms, total: 1.93 s
Wall time: 1.89 s


In [12]:
%%time

df_users_tos = df_users_raw_tos.copy()

CPU times: user 28.2 ms, sys: 7.95 ms, total: 36.1 ms
Wall time: 35.5 ms


## Pandas
Reshape user data to get:
- count the number of subreddits for each user



In [19]:
%%time

df_users_tos['subreddit_count'] = (
    df_users_tos['feature_value'].apply(lambda x: len(json.loads(x)) )
)

CPU times: user 38.7 s, sys: 894 ms, total: 39.6 s
Wall time: 39.6 s


In [20]:
df_users_tos.describe()

Unnamed: 0,subreddit_count
count,856423.0
mean,152.496149
std,141.629998
min,1.0
25%,47.0
50%,118.0
75%,215.0
max,2085.0


## Polars

In [15]:
dfp_users_tos.describe()

describe,user_id,feature_value
str,str,str
"""count""","""856423""","""856423"""
"""null_count""","""0""","""0"""
"""mean""",,
"""std""",,
"""min""","""t2_10009m""","""{""t5_100kj8"":4…"
"""max""","""t2_zzzz9""","""{""t5_zzwxp"":0.…"
"""median""",,


In [21]:
%%time

dfp_users_tos = dfp_users_tos.with_columns(
    pl.col('feature_value').apply(lambda x: len(json.loads(x)) ).alias('subreddit_count')
)

CPU times: user 40 s, sys: 1.13 s, total: 41.1 s
Wall time: 41.1 s


In [22]:
dfp_users_tos.describe()

describe,user_id,feature_value,subreddit_count
str,str,str,f64
"""count""","""856423""","""856423""",856423.0
"""null_count""","""0""","""0""",0.0
"""mean""",,,152.496149
"""std""",,,141.629998
"""min""","""t2_10009m""","""{""t5_100kj8"":4…",1.0
"""max""","""t2_zzzz9""","""{""t5_zzwxp"":0.…",2085.0
"""median""",,,118.0


In [23]:
dfp_users_tos.head()

user_id,feature_value,subreddit_count
str,str,i64
"""t2_8p8h0ki0b""","""{""t5_3fn31"":0.…",14
"""t2_8p90xabzf""","""{""t5_3phq9"":0.…",14
"""t2_8p81pzwi9""","""{""t5_2wzm6"":0.…",28
"""t2_8p90y""","""{""t5_38y725"":0…",84
"""t2_8p853""","""{""t5_2s6nz"":0.…",369


# Transform - get long form 
Each row = user+subreddit

## Example on a single user

In [24]:
# convert 1 user's row to a df where each row is user+subreddit time

df_users_tos[df_users_tos['user_id'] == 't2_8fe238nke']['feature_value']

632536    {"t5_3nxko":0.02033,"t5_2r40o":0.02439,"t5_2t7no":0.00813,"t5_2thzw":0.01626,"t5_2qvp9":0.04472,"t5_2yrq6":0.03252,"t5_45hae8":0.01626,"t5_3ajbp":0.11382,"t5_2qi58":0.03659,"t5_2qhpn":0.04878,"t5_34a0h":0.49187,"t5_35mye":0.03252,"t5_2t...
Name: feature_value, dtype: object

In [25]:
# reshape_tos_for_df(
#     't2_8fe238nke',
#     df_users_tos[df_users_tos['user_id'] == 't2_8fe238nke']['feature_value'].values[0],
# )

pd.DataFrame(reshape_tos_for_df(
    't2_8fe238nke',
    df_users_tos[df_users_tos['user_id'] == 't2_8fe238nke']['feature_value'].values[0],
))

Unnamed: 0,user_id,subreddit_id,tos_pct
0,t2_8fe238nke,t5_3nxko,0.02033
1,t2_8fe238nke,t5_2r40o,0.02439
2,t2_8fe238nke,t5_2t7no,0.00813
3,t2_8fe238nke,t5_2thzw,0.01626
4,t2_8fe238nke,t5_2qvp9,0.04472
5,t2_8fe238nke,t5_2yrq6,0.03252
6,t2_8fe238nke,t5_45hae8,0.01626
7,t2_8fe238nke,t5_3ajbp,0.11382
8,t2_8fe238nke,t5_2qi58,0.03659
9,t2_8fe238nke,t5_2qhpn,0.04878


## Time estimates on just converting the data to the right format

with 10k users, Pandas seems WAY faster in this step:

```bash
# Pandas
803 ms ± 3.74 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

# Polars
2,020 ms s ± 17.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

##  2.02 s ± 17.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
```

In [28]:
%%timeit
(
    df_users_tos.head(int(1e4)).apply(lambda x: reshape_tos_for_df(x['user_id'], x['feature_value']), axis='columns')
)

803 ms ± 3.74 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [30]:
%%timeit

(
    dfp_users_tos.head(int(1e4)).select(
        pl.struct(['user_id', 'feature_value']).apply(
            lambda x: reshape_tos_for_df(x['user_id'], x['feature_value'])
        ).alias('subreddit_tos_expanded')
    )
)

2.02 s ± 17.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


## Time estimates on converting the data to long df (on a sample of users)

We need to take into account the full time to convert to the output df

And we also compare the output of the two methods to check each other's outputm

In [49]:
n_long_test = int(5e4)
n_long_test

50000

In [51]:
%%time

# create a chain, instead of keeping a tmp df
dfp_users_tos_long_test = (
    dfp_users_tos.head(n_long_test).select(
        pl.struct(['user_id', 'feature_value'])
        .apply(
            lambda x: reshape_tos_for_df(x['user_id'], x['feature_value'])
        )
        .alias('subreddit_tos_expanded')
    )
    ['subreddit_tos_expanded'].struct.unnest().select(
        [
            pl.col('user_id').arr.explode(),
            pl.col('subreddit_id').arr.explode(),
            pl.col('tos_pct').arr.explode(),
        ]
    )
)
print(dfp_users_tos_long_test.shape)

(7054807, 3)
CPU times: user 11.5 s, sys: 1.46 s, total: 12.9 s
Wall time: 12.6 s


In [64]:
%%time

df_users_tos_long_test = pd.concat(
    [pd.DataFrame(_) for _ in
        (
            df_users_tos.head(n_long_test).apply(lambda x: reshape_tos_for_df(x['user_id'], x['feature_value']), axis='columns')
        ).to_list()
    ],
    ignore_index=True,
)
print(df_users_tos_long_test.shape)

(7054807, 3)
CPU times: user 37.9 s, sys: 1.01 s, total: 38.9 s
Wall time: 38.6 s


In [53]:
38.5 / 12.6

3.055555555555556

### Make sure that the df outputs are the same!

Note that we need to reset & drop the index for the pandas df because it keeps the index for each individual df unless we ignore index at concat time.

_NOTE_: When we use pd.equals() it also check the dataframe's index.

In [76]:
%%time

pa_eq_pl = dfp_users_tos_long_test.to_pandas().equals(df_users_tos_long_test.reset_index(drop=True))
print(pa_eq_pl)
assert pa_eq_pl
del pa_eq_pl

True
CPU times: user 1.71 s, sys: 104 ms, total: 1.81 s
Wall time: 1.79 s


In [72]:
np.array_equal(dfp_users_tos_long_test.to_pandas().index, df_users_tos_long_test.index)

True

In [77]:
shape_pol_test_ = dfp_users_tos_long_test.to_pandas().shape
shape_pan_test_ = df_users_tos_long_test.shape
print(shape_pol_test_)
print(shape_pan_test_)
print(shape_pan_test_ == shape_pol_test_)
del shape_pol_test_, shape_pan_test_

(7054807, 3)
(7054807, 3)
True


### Run timeit on test data [optional]
Don't really need to run time it because it's clear that polars is ~3x faster when we measure end-to-end result for final output (df_long)

In [54]:
# %%timeit

# df_users_tos_long_test = pd.concat(
#     [pd.DataFrame(_) for _ in 
#         (
#             df_users_tos.head(n_long_test).apply(lambda x: reshape_tos_for_df(x['user_id'], x['feature_value']), axis='columns')
#         ).to_list()
#     ]
# )

In [55]:
# %%timeit

# dfp_users_tos_long_test = (
#     dfp_users_tos.head(n_long_test).select(
#         pl.struct(['user_id', 'feature_value'])
#         .apply(
#             lambda x: reshape_tos_for_df(x['user_id'], x['feature_value'])
#         )
#         .alias('subreddit_tos_expanded')
#     )
#     ['subreddit_tos_expanded'].struct.unnest().select(
#         [
#             pl.col('user_id').arr.explode(),
#             pl.col('subreddit_id').arr.explode(),
#             pl.col('tos_pct').arr.explode(),
#         ]
#     )
# )

## Convert full data
with pandas, we have a lot of overhead so the whole process takes 13+ minutes (creating individual dataframes & then concat into a final df)

```bash
# Pandas
CPU times: user 12min 39s, sys: 1min 12s, total: 13min 51s
Wall time: 13min 46s


# Polars
CPU times: user 3min 49s, sys: 40.4 s, total: 4min 30s
Wall time: 4min 25s
```

In [81]:
(13 + 46/60) / (4 + 25/60)

3.116981132075472

In [78]:
%%time

# create a chain, instead of keeping a tmp df
dfp_users_tos_long_single = (
    dfp_users_tos
    .select(
        pl.struct(['user_id', 'feature_value'])
        .apply(
            lambda x: reshape_tos_for_df(x['user_id'], x['feature_value'])
        )
        .alias('subreddit_tos_expanded')
    )
    ['subreddit_tos_expanded'].struct.unnest()
    .select(
        [
            pl.col('user_id').arr.explode(),
            pl.col('subreddit_id').arr.explode(),
            pl.col('tos_pct').arr.explode(),
        ]
    )
)
print(dfp_users_tos_long_single.shape)

(130601209, 3)
CPU times: user 3min 49s, sys: 40.4 s, total: 4min 30s
Wall time: 4min 25s


In [69]:
# %%time

# df_tos_long = pd.concat(
#     [pd.DataFrame(_) for _ in 
#         (
#             df_users_tos.apply(lambda x: reshape_tos_for_df(x['user_id'], x['feature_value']), axis='columns')
#         ).to_list()
#     ]
# )
# print(df_tos_long)

CPU times: user 12min 39s, sys: 1min 12s, total: 13min 51s
Wall time: 13min 46s


### Can we speed it up even more by running a few calculations explicitly in parallel?

with `dask.delayed()` we should be able to queue and run a few jobs in parallel which _should_ speed up the `polars` calculation even more



In [None]:
BREAK

for s_id, df_ in tqdm(
        df_v_pc_weighted.groupby('subreddit_id'),
        ascii=True, mininterval=5,
    ):
        # For each post in a subreddit, get new embedding: combine subreddit_meta + post(and_comment)
        df_pc_embeddings_ = dask.delayed(np.add)(
            dask.delayed(df_v_subs_weighted[df_v_subs_weighted['subreddit_id'] == s_id][l_embedding_cols].to_numpy()),
            dask.delayed(df_[l_embedding_cols])
        )
        l_df_c1_weights.append(
            dask.delayed(pd.concat)([dask.delayed(df_[l_ix_post_level]), df_pc_embeddings_], ignore_index=False, axis=1)
        )

info(f"Define new posts C1 df DAG in dask")
df_posts_agg_c1_delayed = dask.delayed(pd.concat)(l_df_c1_weights, ignore_index=True)

info(f"COMPUTE new C1 df START")
df_posts_agg_c1 = df_posts_agg_c1_delayed.compute()
info(f"COMPUTE new C1 df DONE")

In [104]:
# dfp_users_tos.filter(dfp_users_tos['user_id'].is_in(a_user_splits[0]))

In [112]:
%%time

n_parallel_splits = 10

a_user_splits = np.array_split(df_users_raw_tos['user_id'], n_parallel_splits)
print(len(a_user_splits))

l_df_long_parallel = list()

info(f"Create DAG to calculate user<>subreddit long df in parallel...")
for users_ in tqdm(a_user_splits):
    l_df_long_parallel.append(
        dask.delayed(
            dfp_users_tos
            .lazy()
            .filter(dfp_users_tos['user_id'].is_in(users_))
            .select(
                # Calculate the new dict to expand the user<>subreddit ToS
                pl.struct(['user_id', 'feature_value'])
                .apply(
                    lambda x: reshape_tos_for_df(x['user_id'], x['feature_value'])
                )
                .alias('subreddit_tos_expanded')
            )
            .select(
                # Pick only the column with the expanded ToS dictionary (struct)
                pl.col('subreddit_tos_expanded')
                .apply(lambda x: x.struct.unnest())
            )
            # The explode step fails on a lacy-df, so do it as a separate step
            .select(
                # Extract each value into its own column
                [
                    pl.col('user_id').arr.explode(),
                    pl.col('subreddit_id').arr.explode(),
                    pl.col('tos_pct').arr.explode(),
                ]
            )
            .lazy()
        )
    )

info(f"Define df_tos_long DAG in dask")
dfp_users_tos_long_delayed = dask.delayed(pl.concat)(l_df_long_parallel, how='vertical', parallel=True)

info(f"COMPUTE df_tos_long START")
dfp_users_tos_long = dfp_users_tos_long_delayed.compute().collect()
info(f"COMPUTE df_tos_long DONE")

01:59:48 | INFO | "Create DAG to calculate user<>subreddit long df in parallel..."


10


  0%|          | 0/10 [00:00<?, ?it/s]

01:59:50 | INFO | "Define df_tos_long DAG in dask"
01:59:50 | INFO | "COMPUTE df_tos_long START"


ColumnNotFoundError: user_id

Error originated just after this operation:
 SELECT [col("subreddit_tos_expanded").map_list()] FROM
   SELECT [col("user_id").as_struct([col("feature_value")]).map_list().alias("subreddit_tos_expanded")] FROM
    DF ["user_id", "feature_value", "subreddit_count"]; PROJECT */3 COLUMNS; SELECTION: "None"

In [143]:
dfp_users_tos_long

In [153]:
%%time

info(f"Start lazy df DAG...")
df_lazy1 = (
    dfp_users_tos
    .filter(dfp_users_tos['user_id'].is_in(users_))
    [['user_id', 'feature_value']]
    .lazy()
)

df_lazy_final = ( 
            df_lazy1
            .select(
                pl.struct(['user_id', 'feature_value'])
                .apply(
                    lambda x: reshape_tos_for_df(x['user_id'], x['feature_value'])
                )
                .alias('subreddit_tos_expanded')
            )
            # select and unnest the struct (nested dict)
            .unnest('subreddit_tos_expanded')
            .select(
                # Extract each value into its own column
                [
                    pl.col('user_id').arr.explode(),
                    pl.col('subreddit_id').arr.explode(),
                    pl.col('tos_pct').arr.explode(),
                ]
            )
            # .lazy()
)

info(f"DAG plan")
display(df_lazy_final)

info(f"Compute df")
df_lazy_final.collect()

02:41:27 | INFO | "Start lazy df DAG..."


PanicException: internal error: entered unreachable code

In [165]:
%%time

info(f"Start lazy df DAG...")
df_lazy1 = (
    dfp_users_tos
    .lazy()
    .filter(dfp_users_tos['user_id'].is_in(users_))
    .head(10000)
    .select(
        pl.struct(['user_id', 'feature_value'])
        .apply(
            lambda x: reshape_tos_for_df(x['user_id'], x['feature_value'])
        )
        .alias('subreddit_tos_expanded')
    )
    .unnest('subreddit_tos_expanded')
#     .explode('user_id')
    # .explode(['user_id', 'subreddit_id', 'tos_pct'])
    # This final select is what breaks when using .lazy()
    #. So maybe I can push this until the end?
#     .select(
#         # Extract each value into its own column
#         [
#             pl.col('user_id').arr.explode(),
#             pl.col('subreddit_id').arr.explode(),
#             pl.col('tos_pct').arr.explode(),
#         ]
#     )
    
)

info(f"DAG plan [except for .explode()]")
display(df_lazy1)

info(f"Compute df AND .explode()")
(
    df_lazy1.collect()
#     .select(
#         # Extract each value into its own column
#         [
#             pl.col('user_id').arr.explode(),
#             pl.col('subreddit_id').arr.explode(),
#             pl.col('tos_pct').arr.explode(),
#         ]
#     )
)

02:57:51 | INFO | "Start lazy df DAG..."
02:57:51 | INFO | "DAG plan [except for .explode()]"


02:57:52 | INFO | "Compute df AND .explode()"


CPU times: user 2.66 s, sys: 561 ms, total: 3.22 s
Wall time: 3.15 s


subreddit_id,tos_pct,user_id
list[str],list[f64],list[str]
"[""t5_2r8tu"", ""t5_2ti4h"", … ""t5_2y77d""]","[0.00012, 0.00037, … 0.0003]","[""t2_539ww"", ""t2_539ww"", … ""t2_539ww""]"
"[""t5_39ne7"", ""t5_2qi4s"", … ""t5_2urg0""]","[0.00528, 0.00176, … 0.0056]","[""t2_53auiz1hy"", ""t2_53auiz1hy"", … ""t2_53auiz1hy""]"
"[""t5_2qh0s"", ""t5_2usxq"", … ""t5_2qh1i""]","[0.00242, 0.00121, … 0.00606]","[""t2_53ex1mv5a"", ""t2_53ex1mv5a"", … ""t2_53ex1mv5a""]"
"[""t5_43zwn1"", ""t5_2ti4h"", … ""t5_2s5oq""]","[0.00229, 0.00019, … 0.01252]","[""t2_53ido95y"", ""t2_53ido95y"", … ""t2_53ido95y""]"
"[""t5_2tpjq"", ""t5_2qpxi"", … ""t5_2swe2""]","[0.05736, 0.00748, … 0.03741]","[""t2_5388qrup"", ""t2_5388qrup"", … ""t2_5388qrup""]"
"[""t5_2rwal"", ""t5_5qpzgw"", … ""t5_2r5rp""]","[0.00008, 0.0061, … 0.03447]","[""t2_534q7hmy"", ""t2_534q7hmy"", … ""t2_534q7hmy""]"
"[""t5_7zqmpv"", ""t5_2yo6b"", … ""t5_3g0svs""]","[0.00002, 0.00039, … 0.00022]","[""t2_53mr0uhii"", ""t2_53mr0uhii"", … ""t2_53mr0uhii""]"
"[""t5_39ne7"", ""t5_2qi0t"", … ""t5_2qqoq""]","[0.02375, 0.0001, … 0.00016]","[""t2_53dp9pfy"", ""t2_53dp9pfy"", … ""t2_53dp9pfy""]"
"[""t5_31s00"", ""t5_2qhae"", … ""t5_3exv5""]","[0.00005, 0.00014, … 0.00025]","[""t2_531e0iet"", ""t2_531e0iet"", … ""t2_531e0iet""]"
"[""t5_3lqlo"", ""t5_2qh16"", … ""t5_321rv""]","[0.0, 0.00239, … 0.00013]","[""t2_53djmu1m"", ""t2_53djmu1m"", … ""t2_53djmu1m""]"


In [130]:
%%time

(
    dfp_users_tos
    .filter(dfp_users_tos['user_id'].is_in(users_))
)

CPU times: user 298 ms, sys: 124 ms, total: 422 ms
Wall time: 250 ms


user_id,feature_value,subreddit_count
str,str,i64
"""t2_539ww""","""{""t5_2r8tu"":0.…",377
"""t2_53auiz1hy""","""{""t5_39ne7"":0.…",18
"""t2_53ex1mv5a""","""{""t5_2qh0s"":0.…",45
"""t2_53ido95y""","""{""t5_43zwn1"":0…",199
"""t2_5388qrup""","""{""t5_2tpjq"":0.…",6
"""t2_534q7hmy""","""{""t5_2rwal"":8e…",75
"""t2_53mr0uhii""","""{""t5_7zqmpv"":2…",740
"""t2_53dp9pfy""","""{""t5_39ne7"":0.…",214
"""t2_531e0iet""","""{""t5_31s00"":5e…",230
"""t2_53djmu1m""","""{""t5_3lqlo"":0,…",361
