In [21]:
import json
import pandas as pd
import numpy as np
import os.path
import re
from os import listdir, remove
from os.path import isfile, join
from logger import log
import gc

def sum_pos(s):
    return s[s>0].sum()

def count_pos(s):
    return s[s>0].count()

def base36_to_int(val):
    return int(str(val), 36)

## After .agg(), id_int_last will be a float
## So, need to cast to int
def int_to_base36(integer):
    return np.base_repr(int(integer), 36).lower()

def logMemUsage(df, deep=False):
    mem = df.memory_usage(deep=deep)
    log(str(mem))
    log(str(mem.sum() / (1024**2))+" MB")

input_file = 'test/1-pushshift_slim/RC_2018-11.csv'
n_rows = 100

# df = pd.read_csv(input_file,
#                          dtype={'id': 'str',
#                                 'created_utc': 'int',
#                                 'subreddit': 'category',
#                                 'score': 'int', # Saves memory. Hard to believe a reddit score will ever exceed 2 billion. Current top is 439k
#                                 'is_removed': 'bool'})

df = pd.read_csv(input_file,
                         dtype={'id': 'str',
                                'created_utc': 'uint32',
                                'subreddit': 'category',
                                'score': 'int32', # Saves memory. Hard to believe a reddit score will ever exceed 2 billion. Current top is 439k
                                'is_removed': 'bool'})

df.drop_duplicates(subset='id', keep='last', inplace=True)
## Sort input data, don't assume it is already sorted
df['id_int'] = df.id.apply(base36_to_int)
df.sort_values(['created_utc', 'id_int'], ascending=[True, True], inplace=True)
df.drop('id', axis=1, inplace=True)
g = df.groupby('subreddit').cumcount() // n_rows
logMemUsage(df,True)

2021-07-05 19:17  Index           800000
created_utc     400000
subreddit      1022342
score           400000
is_removed      100000
id_int          800000
dtype: int64
2021-07-05 19:17  3.3591670989990234 MB


In [22]:
df_agg = (df.groupby(['subreddit', g], sort=False, observed=True)
          .agg({'score': [sum_pos,'count'], 'created_utc':'last', 'id_int':'last'}))
log('df_agg = (df.groupby...) complete')
logMemUsage(df,True)
df.drop(['created_utc'], axis=1, inplace=True)
df_agg_pickle_file = 'tmp_monthly_df_agg.pickle'
df_agg.to_pickle(df_agg_pickle_file)
del df_agg
gc.collect()
logMemUsage(df,True)
removed_df_score = df[(df['is_removed']) & (df['score'] > 1)].groupby(['subreddit', g], sort=False)['score']
df.drop(['score', 'subreddit', 'is_removed'], axis=1, inplace=True)
removed_df = removed_df_score.agg(['sum', 'count', 'idxmax', 'max'])
log('removed_df = complete')
del g
gc.collect()
df_agg = pd.read_pickle(df_agg_pickle_file)
remove(df_agg_pickle_file)
df_agg.columns = df_agg.columns.map('{0[0]}_{0[1]}'.format)
log('df_agg.columns = complete')

2021-07-05 19:17  df_agg = (df.groupby...) complete
2021-07-05 19:17  Index          2913576
created_utc     400000
subreddit      1022342
score           400000
is_removed      100000
id_int          800000
dtype: int64
2021-07-05 19:17  5.37483024597168 MB
2021-07-05 19:17  Index         2913576
subreddit     1022342
score          400000
is_removed     100000
id_int         800000
dtype: int64
2021-07-05 19:17  4.99336051940918 MB
2021-07-05 19:17  removed_df = complete
2021-07-05 19:17  df_agg.columns = complete


In [23]:
df_agg['rate'] = df_agg[['score_sum_pos']].rdiv(removed_df['sum'], axis=0)

In [24]:
df_agg['num_pos_upvotes_removed'] = removed_df['sum']
df_agg['num_items_with_pos_upvotes_removed'] = removed_df['count']
df_agg['score_of_max_pos_removed_item'] = removed_df['max']
df_agg['id_of_max_pos_removed_item'] = removed_df['idxmax'].map(df['id_int'].apply(int_to_base36))

#df_agg.id_int_last = df_agg.id_int_last.astype(int)

df_agg.id_int_last = df_agg.id_int_last.apply(int_to_base36)

d = {'created_utc_last':'last_created_utc','id_int_last':'last_id','score_sum_pos':'total_pos_upvotes','score_count':'total_items'}
log('df_agg[] = removed_df complete')
df = df_agg.reset_index(level=1, drop=True).reset_index().rename(columns=d)

2021-07-05 19:17  df_agg[] = removed_df complete
