In [1]:
from proj_utils import *

import os
import pickle
import gc
import time
from os.path import join
from copy import deepcopy
from collections import defaultdict, Counter
from itertools import combinations
from datetime import datetime
from dateutil.relativedelta import relativedelta
import ipywidgets as widgets
from IPython.display import display
from collections import OrderedDict

import numpy as np
import pandas as pd
from tqdm import tqdm

import sklearn as sk
from scipy.optimize import linear_sum_assignment
from scipy.optimize import curve_fit
from sklearn.metrics.pairwise import cosine_similarity

import torch
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
import mpltern
import ipywidgets as widgets
from ipywidgets import interact, interact_manual

#import bertopic
from proj_utils import *

  @numba.jit()
  @numba.jit()
  @numba.jit()
  @numba.jit()
[nltk_data] Downloading package punkt to
[nltk_data]     /home/seungwoong.ha/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


In [2]:
aggregate_start_date = datetime(2012, 5, 28, 0, 0)  ## Monday

collection_name = 'Gatewaypundit'
model_name = MODEL_NAMES[collection_name]

is_global = False

start_date, end_date = DATE_RANGES[collection_name]
if is_global:
    num_topics = NUM_TOPICS['global']
else:
    num_topics = NUM_TOPICS[collection_name]

In [None]:
def aggregate_save(tmp_df, comment_date_cutoff, is_global=False):

    comment_createdAt_list = []
    comment_id_list = []
    comment_topics_list = []
    comment_embeddings_list = []
    comment_sentiments_list = []
    topic_freq = Counter({i: 0 for i in range(-1, num_topics)})
    
    for article in tmp_df.itertuples():
        filtered_index = np.where(np.array([(comment_createdAt - article.createdAt).days for comment_createdAt in article.comment_createdAt]) < comment_date_cutoff)[0]
        comment_createdAt_list.append([article.comment_createdAt[i] for i in filtered_index])
        comment_id_list.append([article.comment_id[i] for i in filtered_index])
        if is_global:
            comment_topics = [article.comment_topics_global[i] for i in filtered_index]
        else:
            comment_topics = [article.comment_topics[i] for i in filtered_index]
        comment_topics_list.append(comment_topics)
        topic_freq.update(comment_topics)
        comment_embeddings_list.append([article.comment_embeddings[i] for i in filtered_index])
        comment_sentiments_list.append([article.comment_sentiments[i] for i in filtered_index])
        
    tmp_df = tmp_df.assign(comment_id=comment_id_list, comment_topics=comment_topics_list, comment_createdAt=comment_createdAt_list, comment_embeddings=comment_embeddings_list, comment_sentiments=comment_sentiments_list)

    topic_dict_tmp_df = tmp_df[['_id', 'topic_num', 'topic_prob', 'comment_topics']]

    topic_embedding_list = [[] for _ in range(num_topics+1)] # including -1
    topic_sentiment_list = [[] for _ in range(num_topics+1)] # including -1
    topic_mean_embedding_list = [[] for _ in range(num_topics+1)] # including -1
    topic_mean_sentiment_list = [[] for _ in range(num_topics+1)] # including -1

    for article in tmp_df.itertuples():
        for i in range(num_topics+1):  # including -1  
            comment_index = np.where(np.array(article.comment_topics) == i-1)[0]
            if len(comment_index)>0:
                topic_embedding_list[i].append(np.array(article.comment_embeddings)[comment_index])
                topic_sentiment_list[i].append(np.array(article.comment_sentiments)[comment_index])
                    
    for i in range(num_topics+1): 
        assert len(topic_embedding_list[i]) == len(topic_sentiment_list[i])
        
        if len(topic_embedding_list[i])>0:
            averaged_embedding = np.vstack(topic_embedding_list[i]).mean(axis=0)
        else:
            averaged_embedding = np.zeros(384)  # embedding shape
            
        if len(topic_sentiment_list[i])>0:
            averaged_sentiment = np.vstack(topic_sentiment_list[i]).mean(axis=0)
        else:
            averaged_sentiment = np.zeros(11)  # sentiment shape
        
        topic_mean_embedding_list[i] = averaged_embedding 
        topic_mean_sentiment_list[i] = averaged_sentiment
        
    # sort topic_freq by key and get values
    topic_freq = [topic_freq[key] for key in sorted(topic_freq.keys())]
    summary_df = pd.DataFrame({'topic_freq': topic_freq, 'topic_mean_embedding': topic_mean_embedding_list, 'topic_mean_sentiment': topic_mean_sentiment_list})

    summary_df_csv = pd.DataFrame({'topic_freq': topic_freq})
    tme = pd.DataFrame(topic_mean_embedding_list, columns=['e'+str(i) for i in range(384)])
    tms = pd.DataFrame(topic_mean_sentiment_list, columns=['s'+str(i) for i in range(11)])
    summary_df_csv = pd.concat([summary_df_csv, tme, tms], axis=1)
    
    return summary_df, summary_df_csv, topic_dict_tmp_df

In [None]:
aggregate_day_list = ['3d', 'week', 'month']
comment_threshold = 10
add_dict = {'3d':relativedelta(days=3), 'week':relativedelta(weeks=1), 'month':relativedelta(months=1)}
comment_date_cutoff_dict = {'3d': 1, 'week' : 2, 'month': 7}

topic_dict = {aggregate_day : {} for aggregate_day in aggregate_day_list}
topic_tuple_dict_dict = {aggregate_day : {} for aggregate_day in aggregate_day_list}
title_topic_num_list_dict = {aggregate_day : [] for aggregate_day in aggregate_day_list}
title_constant_list_dict = {aggregate_day : [] for aggregate_day in aggregate_day_list}

tmp_list_df = {aggregate_day : [] for aggregate_day in aggregate_day_list}
len_list_dict = {aggregate_day: {} for aggregate_day in aggregate_day_list}

# make folders under /data/collmind/article

article_folder_name = 'article_global' if is_global else 'article'

for aggregate_day in aggregate_day_list:
    os.makedirs(join('/data', 'collmind', article_folder_name, collection_name.lower(), aggregate_day), exist_ok=True)
    os.makedirs(join('/data', 'collmind', article_folder_name, collection_name.lower(), aggregate_day, 'csv'), exist_ok=True)
    os.makedirs(join('/data', 'collmind', article_folder_name, collection_name.lower(), aggregate_day, 'df'), exist_ok=True)

final_day = (end_date - relativedelta(days=30)).strftime('%Y-%m-%d')
print(final_day)

while True:
    file_path = join('article', collection_name.lower(), model_name, 'articles_by_day', f'{final_day}.parquet')
    if os.path.exists(file_path):
        print('aggregate start')
        break
    else:
        print(datetime.now(), 'still waiting')
        time.sleep(60 * 60)  # Sleep for 60 minutes


file_names = os.listdir(join('article', collection_name.lower(), model_name, 'articles_by_day'))
keys = sorted([file_name.split('.')[0] for file_name in file_names], key=lambda x: datetime.strptime(x, '%Y-%m-%d'))
start = datetime.strptime(keys[0], '%Y-%m-%d')

start_end_dict = {}
# 3d
for aggregate_day in aggregate_day_list:
    start_date = deepcopy(aggregate_start_date)
    
    if aggregate_day == 'month':
        
        start_date = start.replace(day=1)
        
        end_date = (start_date + relativedelta(months=1)).replace(day=1)

    else:
        while True:
            if start_date + add_dict[aggregate_day] > start:
                break
            else:
                start_date += add_dict[aggregate_day]
        
        end_date = start_date + add_dict[aggregate_day] 
   
    start_end_dict[aggregate_day] = (start_date, end_date)
    

for key in keys:
    print(key)
    articles = pd.read_parquet(join('article', collection_name.lower(), model_name, 'articles_by_day', key +'.parquet'))
    articles = articles[articles['comment_id'].apply(len) > comment_threshold]
    
    for aggregate_day in aggregate_day_list:
        if datetime.strptime(key, '%Y-%m-%d') >= start_end_dict[aggregate_day][1]:
            len_list_dict[aggregate_day][start_end_dict[aggregate_day][0].strftime('%Y-%m-%d')] = len(tmp_list_df[aggregate_day])
            tmp_df = pd.concat(tmp_list_df[aggregate_day])
            
            if len(tmp_df) > 0:
                tmp_df = pd.concat(tmp_list_df[aggregate_day])
                
                if is_global:
                    topic_tuple_dict = defaultdict(list)
                    for article in tmp_df.itertuples():
                        topic_tuple = tuple(sorted(article.topic_num_global[:3]))
                        topic_tuple_dict[topic_tuple].append((article._1, article.createdAt.strftime('%Y-%m-%d')))
                    topic_tuple_dict_dict[aggregate_day][start_end_dict[aggregate_day][0].strftime('%Y-%m-%d')] = topic_tuple_dict
                    
                    title_constant_list_dict[aggregate_day].append(len(tmp_df))
                    title_topic_num_list_dict[aggregate_day].append(np.zeros((num_topics, 3)))
                    for t in range(num_topics):
                        for i in range(3):
                            title_topic_num_list_dict[aggregate_day][-1][t][i] = len(tmp_df[np.vstack(tmp_df['topic_num_global'])[:, i] == t])
                else:
                    topic_tuple_dict = defaultdict(list)
                    for article in tmp_df.itertuples():
                        topic_tuple = tuple(sorted(article.topic_num[:3]))
                        topic_tuple_dict[topic_tuple].append((article._1, article.createdAt.strftime('%Y-%m-%d')))
                    topic_tuple_dict_dict[aggregate_day][start_end_dict[aggregate_day][0].strftime('%Y-%m-%d')] = topic_tuple_dict
                    
                    title_constant_list_dict[aggregate_day].append(len(tmp_df))
                    title_topic_num_list_dict[aggregate_day].append(np.zeros((num_topics, 3)))
                    for t in range(num_topics):
                        for i in range(3):
                            title_topic_num_list_dict[aggregate_day][-1][t][i] = len(tmp_df[np.vstack(tmp_df['topic_num'])[:, i] == t])
                    
                summary_df, summary_df_csv, topic_dict_tmp_df = aggregate_save(tmp_df, comment_date_cutoff_dict[aggregate_day], is_global)
                topic_dict[aggregate_day][start_end_dict[aggregate_day][0].strftime('%Y-%m-%d')] = topic_dict_tmp_df
                
                # save summary_df
                summary_df.to_parquet(join('/data', 'collmind', article_folder_name, collection_name.lower(), aggregate_day, 'df', start_end_dict[aggregate_day][0].strftime('%Y-%m-%d') + '.parquet'), compression='gzip')
                summary_df_csv.to_csv(join('/data', 'collmind', article_folder_name, collection_name.lower(), aggregate_day, 'csv', start_end_dict[aggregate_day][0].strftime('%Y-%m-%d') + '.csv'))
                
            tmp_list_df[aggregate_day] = []
            start_end_dict[aggregate_day] = (start_end_dict[aggregate_day][1], start_end_dict[aggregate_day][1] + add_dict[aggregate_day])
        
        tmp_list_df[aggregate_day].append(articles)
          
# process leftovers          
          
for aggregate_day in aggregate_day_list:
        
    len_list_dict[aggregate_day][start_end_dict[aggregate_day][0].strftime('%Y-%m-%d')] = len(tmp_list_df[aggregate_day])
    tmp_df = pd.concat(tmp_list_df[aggregate_day])
    
    if len(tmp_df) > 0:
        tmp_df = pd.concat(tmp_list_df[aggregate_day])
                
        topic_tuple_dict = defaultdict(list)
        for article in tmp_df.itertuples():
            topic_tuple = tuple(sorted(article.topic_num[:3]))
            topic_tuple_dict[topic_tuple].append((article._1, article.createdAt.strftime('%Y-%m-%d')))
        topic_tuple_dict_dict[aggregate_day][start_end_dict[aggregate_day][0].strftime('%Y-%m-%d')] = topic_tuple_dict
        
        title_constant_list_dict[aggregate_day].append(len(tmp_df))
        title_topic_num_list_dict[aggregate_day].append(np.zeros((num_topics, 3)))
        for t in range(num_topics):
            for i in range(3):
                title_topic_num_list_dict[aggregate_day][-1][t][i] = len(tmp_df[np.vstack(tmp_df['topic_num'])[:, i] == t])
        
        summary_df, summary_df_csv, topic_dict_tmp_df = aggregate_save(tmp_df, comment_date_cutoff_dict[aggregate_day], is_global)
        topic_dict[aggregate_day][start_end_dict[aggregate_day][0].strftime('%Y-%m-%d')] = topic_dict_tmp_df
        
        # save summary_df
        summary_df.to_parquet(join('/data', 'collmind', article_folder_name, collection_name.lower(), aggregate_day, 'df', start_end_dict[aggregate_day][0].strftime('%Y-%m-%d') + '.parquet'))
        summary_df_csv.to_csv(join('/data', 'collmind', article_folder_name, collection_name.lower(), aggregate_day, 'csv', start_end_dict[aggregate_day][0].strftime('%Y-%m-%d') + '.csv'))
        tmp_list_df[aggregate_day] = []
    
result_dict = {'topic_tuple_dict_dict': topic_tuple_dict_dict, 'len_list_dict': len_list_dict, 'title_constant_list_dict': title_constant_list_dict, 'title_topic_num_list_dict': title_topic_num_list_dict}

# save num_elements
with open(join('/data', 'collmind', article_folder_name, collection_name.lower(), 'result_dict.pkl'), 'wb') as f:
    pickle.dump(result_dict, f)
    
for aggregate_day in aggregate_day_list:
    with open(join('/data', 'collmind', article_folder_name, collection_name.lower(), aggregate_day, f'topic_dict_{aggregate_day}.pkl'), 'wb') as f:
        pickle.dump(topic_dict[aggregate_day], f)

In [None]:
pd.read_parquet(join('article', collection_name.lower(), model_name, 'articles_by_day', '2015-12-03' +'.parquet')).columns

In [None]:
pd.read_parquet(join('article', collection_name.lower(), model_name, 'articles_by_day', '2022-12-12' +'.parquet')).columns

In [None]:
articles = pd.read_parquet(join('article', collection_name.lower(), model_name, 'articles_by_day', '2015-02-02' +'.parquet'))

In [None]:
articles.head(20)