Triple Generation Scratchpad
===

Testing

In [None]:
%reload_ext autoreload
%autoreload 1
%matplotlib inline

In [None]:
import os
import re
import pandas as pd
import numpy as np

from collections import Counter, defaultdict, OrderedDict
import sqlite3
from nltk import word_tokenize
from tqdm import tqdm
import random
import pickle
import json

from datetime import datetime
from dateutil.relativedelta import relativedelta
import pytz
from pprint import pprint

import matplotlib.pyplot as plt
import matplotlib.dates as md
import matplotlib
import pylab as pl
from IPython.core.display import display, HTML

In [None]:
from pathlib import Path
git_root_dir = !git rev-parse --show-toplevel
git_root_dir = Path(git_root_dir[0].strip())
git_root_dir

In [None]:
import sys
caringbridge_core_path = "/home/lana/levon003/repos/caringbridge_core"
sys.path.append(caringbridge_core_path)

In [None]:
import cbcore.data.paths as paths
import cbcore.data.dates as dates
import cbcore.data.utils as utils

In [None]:
sys.path.append(os.path.join(git_root_dir, 'src'))

In [None]:
%aimport cbrec, cbrec.triple_generation, cbrec.config

In [None]:
import cbrec.triple_generation as tg
import cbrec.config as config

In [None]:
user_site_df, ints_df, journal_df = tg.get_data()

In [None]:
# trim out journal updates that are trivial (short or machine-generated)
journal_df = journal_df[journal_df.is_nontrivial]
# trim out journal updates with invalid dates
invalid_start_date = datetime.fromisoformat('2005-01-01').replace(tzinfo=pytz.UTC)
invalid_end_date = datetime.fromisoformat('2019-02-11').replace(tzinfo=pytz.UTC)
print(f"Keeping journals between {invalid_start_date.isoformat()} and {invalid_end_date.isoformat()}.")
invalid_start_timestamp = invalid_start_date.timestamp() * 1000
invalid_end_timestamp = invalid_end_date.timestamp() * 1000
journal_df = journal_df[(journal_df.created_at>=invalid_start_timestamp)&(journal_df.created_at<=invalid_end_timestamp)]
len(journal_df)

In [None]:
ints_df = ints_df[(ints_df.created_at>=invalid_start_timestamp)&(ints_df.created_at<=invalid_end_timestamp)]
len(ints_df)

In [None]:
journal_df.head()

In [None]:
# this takes 3.5 minutes.
# feels like it might be worth saving this as a new pickle file?
# but on reflection, we actually don't want to do this!
s = datetime.now()
valid_tuples = user_site_df[['user_id', 'site_id']].apply(tuple, 1)
valid_user_ids = set(user_site_df.user_id)
filtered_journal_df = journal_df[journal_df.user_id.isin(valid_user_ids)]
filtered_journal_tuples = filtered_journal_df[['user_id', 'site_id']].apply(tuple, 1)
filtered_journal_df = filtered_journal_df[filtered_journal_tuples.isin(valid_tuples)]
len(filtered_journal_df), str(datetime.now() - s)

In [None]:
# use only journal updates authored by users who will become eligible
# note this includes updates in (user_id, site_id) pairs that aren't in valid_usps
# but we still want to record this activity for feature extraction reasons
s = datetime.now()
valid_usps = user_site_df[['user_id', 'site_id']].apply(tuple, 1)
valid_user_ids = set(user_site_df.user_id)
filtered_journal_df = journal_df[journal_df.user_id.isin(valid_user_ids)]
len(filtered_journal_df), str(datetime.now() - s)

In [None]:
# map of created_at -> (user_id, site_id, journal_oid)
journal_dict = OrderedDict()
for row in tqdm(filtered_journal_df.itertuples(), total=len(filtered_journal_df), desc='Populating journal dict'):
    journal_dict[row.created_at] = (row.user_id, row.site_id, row.journal_oid)

In [None]:
ints_df.head()

In [None]:
ints_df.interaction_type.value_counts()

In [None]:
user_site_df.head()

In [None]:
# map of created_at -> (user_id, site_id)
ts_to_first_update_dict = OrderedDict()
ts_to_third_update_dict = OrderedDict()
for row in tqdm(user_site_df.sort_values(by='user_first_update_timestamp', ascending=True).itertuples(), total=len(user_site_df), desc='Populating user/site eligibility dict'):
    ts_to_first_update_dict[row.user_first_update_timestamp] = (row.user_id, row.site_id)
for row in tqdm(user_site_df.sort_values(by='user_third_update_timestamp', ascending=True).itertuples(), total=len(user_site_df), desc='Populating user/site eligibility dict'):
    ts_to_third_update_dict[row.user_third_update_timestamp] = (row.user_id, row.site_id)
    

In [None]:
class RecentActivityManager():
    def __init__(self):
        activity_count_duration_ms = config.get('ACTIVITY_COUNT_DURATION_MS')
        assert activity_count_duration_ms is not None
        self.activity_counter_dict = {
            'journal_user': RecentActivityCounter(activity_count_duration_ms),
            'journal_site': RecentActivityCounter(activity_count_duration_ms),
            'amp': RecentActivityCounter(activity_count_duration_ms),
            'comment': RecentActivityCounter(activity_count_duration_ms),
            'guestbook': RecentActivityCounter(activity_count_duration_ms),
        }
        self.user_activity_keys = set(['journal_user', 'amp', 'comment', 'guestbook'])
        self.active_user_ids = set()
    
    def update_counts(self, current_timestamp):
        # update activity counters to the current moment
        all_removed_user_ids = set()
        for key, rac in self.activity_counter_dict.items():
            removed_user_ids = rac.update_counts(current_timestamp)
            if key in self.user_activity_keys:
                all_removed_user_ids |= removed_user_ids
        if len(all_removed_user_ids) > 0:
            # recompute active user ids
            self.active_user_ids = set()
            self.active_user_ids.update(*[
                self.activity_counter_dict[key].get_active_ids() 
                for key in self.user_activity_keys
            ])

            
    def get_active_user_ids(self):
        return self.active_user_ids
    
    
    def add_interaction(self, interaction_type, user_id, created_at):
        self.activity_counter_dict[interaction_type].add_interaction(user_id, created_at)
        self.active_user_ids.add(user_id)
        
    def __repr__(self):
        summary = f"{len(self.activity_counter_dict)} activity counters ({len(self.user_activity_keys)} for users). Tracking {len(self.active_user_ids)} active users.\n"
        for int_type, rac in self.activity_counter_dict.items():
            summary += f"{int_type} recent activity: {len(rac.activity_count_dict)} unique users with {np.sum(list(rac.activity_count_dict.values()))} total interactions.\n"
        return summary
    
    def __str__(self):
        return __repr__(self)
        
            
class RecentActivityCounter():
    """
    Written for user_ids, but supports any form of hashable id, e.g. site_ids or (user_id, site_id) tuples
    """
    def __init__(self, activity_count_duration_ms):
        # map of created_at -> list(user_id)
        # tracks one type of activity
        # user_id is a list and not a set because a user may have multiple e.g. amps at the same timestamp
        self.ts_to_user_ids = OrderedDict()
        self.activity_count_dict = {}  # map of user_id -> int
        self.activity_count_duration_ms = activity_count_duration_ms
        self.active_ids = set()
    
    def update_counts(self, current_timestamp):
        """
        Set the timestamp from which to give counts to current_timestamp,
        which has the effect of removing any old activity and updating the counts accordingly.
        Note: current_timestamp must be >= any previous calls to update_counts().
        
        Returns user_ids no longer considered active.
        """
        expired_timestamp = current_timestamp - self.activity_count_duration_ms
        removed_ids = set()
        while len(self.ts_to_user_ids) > 0:
            if next(iter(self.ts_to_user_ids)) < expired_timestamp:
                # this activity has expired
                _, user_id_list = self.ts_to_user_ids.popitem(last=False)
                # update the counts to account for the removal
                for user_id in user_id_list:
                    self.activity_count_dict[user_id] -= 1
                    # delete old keys when count hits 0
                    if self.activity_count_dict[user_id] == 0:
                        del self.activity_count_dict[user_id]
                        self.active_ids.remove(user_id)
                        removed_ids.add(user_id)
            else:
                break
        return removed_ids
    
    def add_interaction(self, user_id, created_at):
        """
        Add an interaction from user_id at time created_at to the activity tracker.
        
        """
        if created_at in self.ts_to_user_ids:
            user_id_list = self.ts_to_user_ids[created_at]
        else:
            user_id_list = []
            self.ts_to_user_ids[created_at] = user_id_list
        user_id_list.append(user_id)
        if user_id in self.activity_count_dict:
            self.activity_count_dict[user_id] += 1
        else:
            self.activity_count_dict[user_id] = 1
            self.active_ids.add(user_id)
    
    def get_count(self, user_id):
        """
        Activity count for a user_id in the last self.activity_count_duration_ms milliseconds
        """
        if user_id not in self.activity_count_dict:
            return 0
        return self.activity_count_dict[user_id]
    
    def get_active_ids(self):
        return self.active_ids

In [None]:
rac = RecentActivityCounter(100)
rac.add_interaction(1, 0)
assert rac.get_count(1) == 1
rac.add_interaction(1, 50)
assert rac.get_count(1) == 2
rac.update_counts(51)
assert rac.get_count(1) == 2
rac.update_counts(101)
assert rac.get_count(1) == 1
rac.update_counts(151)
assert rac.get_count(1) == 0
rac.ts_to_user_ids, rac.activity_count_dict

In [None]:
user_site_df[user_site_df.user_id == 1]

In [None]:
filtered_journal_df[filtered_journal_df.user_id == 1].created_at.sort_values(ascending=True)

In [None]:
list(zip(
    ints_df.interaction_type, 
    ints_df.user_id, 
    ints_df.site_id, 
    ints_df.head(n=50).created_at.map(lambda ca: str(datetime.utcfromtimestamp(ca / 1000)))
))

In [None]:
# usp = user/site pair
# existing vs eligible vs active
from numpy.random import default_rng
rng = default_rng()


eligible_usps = set()  # this set isn't really used...
existing_user_site_map = defaultdict(set)  # map of user_id -> set(site_id), where user_id is an existing author on the set of sites
eligible_user_site_map = defaultdict(set)  # map of user_id -> set(site_id), where user_id is an eligible author on the set of sites
existing_site_user_map = defaultdict(set)  # map of site_id -> set(user_id), where site_id has the set of existing authors
eligible_site_user_map = defaultdict(set)  # map of site_id -> set(user_id), where site_id has the set of eligible authors

site_user_int_dict = defaultdict(set)  # map of site_id -> set(user_id), where each user_id has interacted with this site_id

# map of (user_id, site_id) -> journal_oid of most recent update
# this will be the user's third update once they become eligible
# note we COULD save the THREE most recent journal_oids easily, but it seems like they may not be needed
usp_most_recent_update = {}

activity_manager = RecentActivityManager()

# burnin_start_timestamp is when we should start tracking activity data
# note: it may be that there's no computational benefit to using this
# as the expensive stuff is the graph creation anyway. Seems like it will though
burnin_start_timestamp = datetime.fromisoformat('2013-12-01').replace(tzinfo=pytz.UTC).timestamp() * 1000
generation_start_timestamp = datetime.fromisoformat('2014-01-01').replace(tzinfo=pytz.UTC).timestamp() * 1000
generation_stop_timestamp = datetime.fromisoformat('2019-01-01').replace(tzinfo=pytz.UTC).timestamp() * 1000

for row in tqdm(ints_df.itertuples(), total=len(ints_df), desc='Computing interactions'):
    int_created_at = row.created_at
    int_user_id = row.user_id
    int_site_id = row.site_id
    interaction_type = row.interaction_type
    
    if int_created_at > generation_stop_timestamp:
        # no reason to continue past the generation period
        break
    
    # is_initiation if this is the first time this user_id has interacted with this site_id
    is_initiation = int_user_id not in site_user_int_dict[int_site_id]
    
    # update existing users
    while len(ts_to_first_update_dict) > 0:
        if next(iter(ts_to_first_update_dict)) < int_created_at:
            # this is a new existing user_id
            _, user_site_tup = ts_to_first_update_dict.popitem(last=False)
            user_id, site_id = user_site_tup
            existing_user_site_map[user_id].add(site_id)
            existing_site_user_map[site_id].add(user_id)
            # need to update the graph, as a new user is eligible on this site
            # so, get all users who have previously interacted with this site
            prev_user_ids = site_user_int_dict[site_id]
            for prev_user_id in prev_user_ids:
                # add edge prev_user_id -> user_id
                pass
        else:
            break
    
    # update eligible users
    while len(ts_to_third_update_dict) > 0:
        if next(iter(ts_to_third_update_dict)) < int_created_at:
            # this is a new ELIGIBLE (previously existing) user_id
            _, user_site_tup = ts_to_third_update_dict.popitem(last=False)
            user_id, site_id = user_site_tup
            eligible_usps.add(user_site_tup)
            eligible_user_site_map[user_id].add(site_id)
            eligible_site_user_map[site_id].add(user_id)
        else:
            break
            
    # update journal activity
    while len(journal_dict) > 0:
        if next(iter(journal_dict)) < int_created_at:
            # this is a journal update that happened before this interaction
            journal_created_at, user_site_oid_tup = journal_dict.popitem(last=False)
            journal_user_id, journal_site_id, journal_oid = user_site_oid_tup
            activity_manager.add_interaction('journal_user', journal_user_id, journal_created_at)
            activity_manager.add_interaction('journal_site', journal_site_id, journal_created_at)
            usp_most_recent_update[(journal_user_id, journal_site_id)] = journal_oid
        else:
            break
    
    # update activity counters to the current moment
    activity_manager.update_counts(int_created_at)
        
    # for initiations after some time period, we generate features
    if int_created_at >= generation_start_timestamp and is_initiation:
        # identify usp sources and targets
        # sources is every usp that has this user_id
        source_site_ids = eligible_user_site_map[int_user_id]
        sources = [(int_user_id, site_id) for site_id in source_site_ids]
        # targets is every usp that has this site_id
        target_user_ids = eligible_site_user_map[int_site_id]
        targets = [(user_id, int_site_id) for user_id in target_user_ids]
        
        # to select alternatives, we need to identify ACTIVE eligible users
        # active means "amp, comment, guestbook, or journal" in last X milliseconds
        active_user_ids = activity_manager.get_active_user_ids()
        # and here we filter to eligible active users only
        active_user_ids &= set(eligible_user_site_map.keys())
        
        # finally, remove from active any user_ids invalid for this source user_id
        # invalid users include:
        #  - any target user_id
        #  - any user_id that int_user_id has previously connected with
        # TODO get previous connections from graph
        invalid_user_ids = target_user_ids # | graph.get_edge_targets_for_user_id(int_user_id)
        active_user_ids -= invalid_user_ids
        
        # generate the usps from these active users
        # we use an array to make sampling faster
        active_usps = np.array([
            (active_user_id, site_id)
            for active_user_id in active_user_ids
            for site_id in eligible_user_site_map[active_user_id]
        ])
        if len(active_usps) == 0:
            raise ValueError("No active usps! That means no alternatives, which should never happen.")
            
        # create all combinations of source and target usps
        for source in sources:
            for target in targets:
                # select an alternative (alt) usp
                alt = rng.choice(active_usps)
                
                # generate and save features for this usp triple
                # feature_writer.save_triple(source, target, alt)

    
    
    # update network
    if is_initiation:
        site_user_int_dict[int_site_id].add(int_user_id)
        # who has int_user_id interacted with in the graph?
        # any user who has previously authored an update on this site
        if int_site_id in existing_site_user_map:
            curr_existing_users_on_site = existing_site_user_map[int_site_id]
            for curr_user_id in curr_existing_users_on_site:
                # add edge int_user_id -> curr_user_id
                pass
            
    # update activity counters
    activity_manager.add_interaction(interaction_type, int_user_id, int_created_at)
    

In [None]:
str(datetime.utcfromtimestamp(next(iter(journal_dict)) / 1000)), \
str(datetime.utcfromtimestamp(int_created_at / 1000)),

In [None]:
str(datetime.utcfromtimestamp(next(iter(ts_to_first_update_dict)) / 1000))

In [None]:
it = iter(journal_dict)
for i in range(10):
    print(str(datetime.utcfromtimestamp(next(it) / 1000)))

In [None]:
len(eligible_usps), len(existing_user_site_map), len(existing_site_user_map)

In [None]:
len(active_user_ids), len(active_usps)

In [None]:
for int_type, rac in activity_counter_dict.items():
    print(f"{int_type} recent activity: {len(rac.activity_count_dict)} unique users with {np.sum(list(rac.activity_count_dict.values()))} total interactions.")

In [None]:
summary = f"{len(activity_manager.activity_counter_dict)} activity counters ({len(activity_manager.user_activity_keys)} for users). Tracking {len(activity_manager.active_user_ids)} active users.\n"
for int_type, rac in activity_manager.activity_counter_dict.items():
    summary += f"{int_type} recent activity: {len(rac.activity_count_dict)} unique users with {np.sum(list(rac.activity_count_dict.values()))} total interactions.\n"
print(summary)

In [None]:
health_cond_filepath = os.path.join("/home/lana/shared/caringbridge/data/projects/sna-social-support/user_metadata", "assigned_health_conditions.feather")
user_health_conds_df = pd.read_feather(health_cond_filepath)
print(len(user_health_conds_df))
user_health_conds_df.head()

In [None]:
# load the journal dataframe
s = datetime.now()
journal_metadata_dir = "/home/lana/shared/caringbridge/data/derived/journal_metadata"
journal_metadata_filepath = os.path.join(journal_metadata_dir, "journal_metadata.df")
journal_df = pd.read_feather(journal_metadata_filepath)
print(f"Read {len(journal_df)} rows in {datetime.now() - s}.")

In [None]:
# as a quick fix for invalid dates in journals, when created_at is 0 we use the updated_at instead
# note that only 41 updates have this issue
invalid_created_at = journal_df.created_at <= 0
journal_df.loc[invalid_created_at, 'created_at'] = journal_df.loc[invalid_created_at, 'updated_at']

In [None]:
# read the journal metadata with author type info added
s = datetime.now()
author_type_dir = "/home/lana/shared/caringbridge/data/projects/sna-social-support/author_type"
journal_metadata_filepath = os.path.join(author_type_dir, "journal_metadata_with_author_type.df")
at_journal_df = pd.read_feather(journal_metadata_filepath)
print(datetime.now() - s)
len(at_journal_df)

In [None]:
# as a quick fix for invalid dates in journals, when created_at is 0 we use the updated_at instead
# note that only 41 updates have this issue
#invalid_created_at = at_journal_df.created_at <= 0
#at_journal_df.loc[invalid_created_at, 'created_at'] = at_journal_df.loc[invalid_created_at, 'updated_at']

In [None]:
# load the list of valid user/site pairs
s = datetime.now()
model_data_dir = '/home/lana/shared/caringbridge/data/projects/recsys-peer-match/model_data'
user_site_df = pd.read_csv(os.path.join(model_data_dir, 'user_site_df.csv'))
valid_user_ids = set(user_site_df.user_id)
valid_site_ids = set(user_site_df.site_id)
print(f"Read {len(user_site_df)} rows ({len(valid_user_ids)} unique users, {len(valid_site_ids)} unique sites) in {datetime.now() - s}.")

In [None]:
user_site_df.head()

In [None]:
np.min(user_site_df.user_third_update_timestamp)

In [None]:
journal_creation_times = defaultdict(set)
for key, group in tqdm(journal_df.groupby(['user_id', 'site_id'], sort=False)):
    journal_creation_times[key] = set(group.created_at)

In [None]:
at_journal_creation_times = defaultdict(set)
for key, group in tqdm(at_journal_df.groupby(['user_id', 'site_id'], sort=False)):
    at_journal_creation_times[key] = set(group.created_at)

In [None]:
journal_missing_pred_count = 0
at_journal_missing_pred_count = 0
for _, row in tqdm(user_site_df.iterrows(), total=len(user_site_df)):
    user_id = row['user_id']
    site_id = row['site_id']
    creation_times = journal_creation_times[(user_id, site_id)]
    if row.user_first_update_timestamp not in creation_times:
        journal_missing_pred_count += 1
    if row.user_third_update_timestamp not in creation_times:
        journal_missing_pred_count += 1
    
    creation_times = at_journal_creation_times[(user_id, site_id)]
    if row.user_first_update_timestamp not in creation_times:
        at_journal_missing_pred_count += 1
    if row.user_third_update_timestamp not in creation_times:
        at_journal_missing_pred_count += 1
journal_missing_pred_count, at_journal_missing_pred_count

In [None]:
raw_data_dir = paths.raw_data_2019_filepath
raw_data_dir

In [None]:
working_dir = "/home/lana/shared/caringbridge/data/projects/recsys-peer-match/model_data"
assert os.path.exists(working_dir)
working_dir

In [None]:
# load the list of valid user/site pairs
s = datetime.now()
model_data_dir = '/home/lana/shared/caringbridge/data/projects/recsys-peer-match/model_data'
user_site_df = pd.read_csv(os.path.join(model_data_dir, 'user_site_df.csv'))
valid_user_ids = set(user_site_df.user_id)
valid_site_ids = set(user_site_df.site_id)
print(f"Read {len(user_site_df)} rows ({len(valid_user_ids)} unique users, {len(valid_site_ids)} unique sites) in {datetime.now() - s}.")
user_site_df.head()

In [None]:
guestbook_filepath = os.path.join(raw_data_dir, 'guestbook_scrubbed.json')
output_filepath = os.path.join(working_dir, "guestbook_filtered.csv")
both_valid_count = 0
neither_valid_count = 0
author_valid_count = 0
site_valid_count = 0
with open(output_filepath, 'w') as outfile:
    with open(guestbook_filepath, encoding='utf-8') as infile:
        processed_count = 0
        for i, line in tqdm(enumerate(infile), total=82858710):
            if i < 4002:
                continue
            try:
                gb = json.loads(line)
            except:
                continue
            gb_oid = gb['_id']['$oid']
            site_id = utils.extract_long(gb['siteId'])
            user_id = utils.extract_long(gb['userId'])
            created_at = dates.get_date_from_json_value(gb['createdAt']) if 'createdAt' in gb else 0
            updated_at = dates.get_date_from_json_value(gb['updatedAt']) if 'updatedAt' in gb else 0
            
            if 'amps' in gb and type(gb['amps']) == list:
                # we write out any amps as separate lines
                for amp in gb['amps']:
                    amp_user_id = utils.extract_long(amp)
                    is_user_valid = amp_user_id in valid_user_ids
                    is_site_valid = site_id in valid_site_ids
                    if is_user_valid and is_site_valid:
                        outfile.write(f"{amp_user_id},{site_id},amp,{gb_oid}|{amp_user_id},guestbook,{gb_oid},guestbook,{gb_oid},{created_at},{updated_at}\n")
                        both_valid_count += 1
                    elif is_user_valid and not is_site_valid:
                        author_valid_count += 1
                    elif not is_user_valid and is_site_valid:
                        site_valid_count += 1
                    else:
                        neither_valid_count += 1
            is_user_valid = user_id in valid_user_ids
            is_site_valid = site_id in valid_site_ids
            if is_user_valid and is_site_valid:
                # columns: user_id, site_id, interaction_type, interaction_oid, parent_type, parent_id, ancestor_type, ancestor_id, created_at, updated_at
                outfile.write(f"{user_id},{site_id},guestbook,{gb_oid},None,None,None,None,{created_at},{updated_at}\n")
                both_valid_count += 1
            elif is_user_valid and not is_site_valid:
                author_valid_count += 1
            elif not is_user_valid and is_site_valid:
                site_valid_count += 1
            else:
                neither_valid_count += 1
            processed_count += 1
processed_count, both_valid_count, neither_valid_count, author_valid_count, site_valid_count

In [None]:
comments_filepath = os.path.join(raw_data_dir, 'comment_scrubbed.json')
output_filepath = os.path.join(working_dir, "comment_filtered.csv")
both_valid_count = 0
neither_valid_count = 0
author_valid_count = 0
site_valid_count = 0
with open(output_filepath, 'w') as outfile:
    with open(comments_filepath, encoding='utf-8') as infile:
        for line in tqdm(infile, total=31052715):
            comment = json.loads(line)
            comment_oid = comment['_id']['$oid']
            parent_type = comment['parentType']  # either 'journal' or 'comment'
            parent_oid = comment['parentId']
            journal_oid = comment['ancestorId']  # ancestorType is never guestbook; we seemingly don't have any of the guestbook comment data
            site_id = utils.extract_long(comment['siteId'])
            is_site_valid = site_id in valid_site_ids
            user_id = utils.extract_long(comment['userId'])
            created_at = dates.get_date_from_json_value(comment['createdAt'])
            updated_at = dates.get_date_from_json_value(comment['updatedAt'])
            
            if 'amps' in comment and type(comment['amps']) == list:
                # we write out any amps as separate lines
                for amp in comment['amps']:
                    amp_user_id = utils.extract_long(amp)
                    is_user_valid = amp_user_id in valid_user_ids
                    if is_user_valid and is_site_valid:
                        outfile.write(f"{amp_user_id},{site_id},amp,{comment_oid}|{amp_user_id},comment,{comment_oid},journal,{journal_oid},{created_at},{updated_at}\n")
                        both_valid_count += 1
                    elif is_user_valid and not is_site_valid:
                        author_valid_count += 1
                    elif not is_user_valid and is_site_valid:
                        site_valid_count += 1
                    else:
                        neither_valid_count += 1
            
            is_user_valid = user_id in valid_user_ids
            if is_user_valid and is_site_valid:
                # columns: user_id, site_id, interaction_type, interaction_oid, parent_type, parent_id, ancestor_type, ancestor_id, created_at, updated_at
                outfile.write(f"{user_id},{site_id},comment,{comment_oid},{parent_type},{parent_oid},journal,{journal_oid},{created_at},{updated_at}\n")
                both_valid_count += 1
            elif is_user_valid and not is_site_valid:
                author_valid_count += 1
            elif not is_user_valid and is_site_valid:
                site_valid_count += 1
            else:
                neither_valid_count += 1
both_valid_count, neither_valid_count, author_valid_count, site_valid_count

In [None]:
journal_filepath = os.path.join(raw_data_dir, 'journal.json')
output_filepath = os.path.join(working_dir, "journal_filtered.csv")
both_valid_count = 0
neither_valid_count = 0
author_valid_count = 0
site_valid_count = 0
with open(output_filepath, 'w') as outfile:
    with open(journal_filepath, encoding='utf-8') as infile:
        for line in tqdm(infile, total=19137078):
            journal = json.loads(line)
            
            if 'amps' not in journal:
                continue
            amps = journal['amps']
            if type(amps) != list:
                continue
                
            journal_oid = journal['_id']['$oid']
            site_id = utils.extract_long(journal['siteId'])
            is_site_valid = site_id in valid_site_ids
            user_id = utils.extract_long(journal['userId'])
            
            created_at = dates.get_date_from_json_value(journal['createdAt'])
            updated_at = dates.get_date_from_json_value(journal['updatedAt'])
            
            for amp in amps:
                amp_user_id = utils.extract_long(amp)
                is_user_valid = amp_user_id in valid_user_ids
                if is_user_valid and is_site_valid:
                    outfile.write(f"{amp_user_id},{site_id},amp,{journal_oid}|{amp_user_id},journal,{journal_oid},journal,{journal_oid},{created_at},{updated_at}\n")
                    both_valid_count += 1
                elif is_user_valid and not is_site_valid:
                    author_valid_count += 1
                elif not is_user_valid and is_site_valid:
                    site_valid_count += 1
                else:
                    neither_valid_count += 1
            
            #is_user_valid = user_id in valid_user_ids
            #if is_user_valid and is_site_valid:
            #    # columns: user_id, site_id, interaction_type, interaction_oid, parent_type, parent_id, ancestor_type, ancestor_id, created_at, updated_at
            #    outfile.write(f"{user_id},{site_id},journal,{journal_oid},None,None,None,None,{created_at},{updated_at}\n")
            #    both_valid_count += 1
            #elif is_user_valid and not is_site_valid:
            #    author_valid_count += 1
            #elif not is_user_valid and is_site_valid:
            #    site_valid_count += 1
            #else:
            #    neither_valid_count += 1
both_valid_count, neither_valid_count, author_valid_count, site_valid_count

In [None]:
cols = ['user_id', 'site_id', 'interaction_type', 'interaction_oid', 'parent_type', 'parent_oid', 'ancestor_type', 'ancestor_oid', 'created_at', 'updated_at']
s = datetime.now()
gb_filepath = os.path.join(working_dir, "guestbook_filtered.csv")
gb_df = pd.read_csv(gb_filepath, header=None, names=cols)
print(datetime.now() - s)

s = datetime.now()
comment_filepath = os.path.join(working_dir, "comment_filtered.csv")
comment_df = pd.read_csv(comment_filepath, header=None, names=cols)
print(datetime.now() - s)

s = datetime.now()
journal_filepath = os.path.join(working_dir, "journal_filtered.csv")
journal_df = pd.read_csv(journal_filepath, header=None, names=cols)
print(datetime.now() - s)

len(gb_df), len(comment_df), len(journal_df)

In [None]:
ints_df = pd.concat([gb_df, comment_df, journal_df], sort=False)
ints_df.reset_index(drop=True, inplace=True)
print(len(ints_df))
ints_df.head()

In [None]:
s = datetime.now()
ints_df = ints_df.sort_values(by='created_at')
print(datetime.now() - s)

In [None]:
len(ints_df)

In [None]:
s = datetime.now()
ints_df.reset_index(drop=True).to_feather(os.path.join(working_dir, 'ints_df.feather'))
print(datetime.now() - s)
s = datetime.now()
ints_df.to_csv(os.path.join(working_dir, 'ints_df.csv'), index=False)
print(datetime.now() - s)

In [None]:
# read interactions dataframe
s = datetime.now()
model_data_dir = '/home/lana/shared/caringbridge/data/projects/recsys-peer-match/model_data'
ints_df = pd.read_feather(os.path.join(model_data_dir, 'ints_df.feather'))
print(f"Read {len(ints_df)} rows ({len(set(ints_df.user_id))} unique users) in {datetime.now() - s}.")
ints_df.head()

In [None]:
ints_df[['interaction_type', 'parent_type', 'ancestor_type']].value_counts()

In [None]:
fig, ax = plt.subplots(1, 1, figsize=(10,4))

bins = []
year = 2005
month = 0
while year != 2020:
    if month == 12:
        year += 1
        month = 1
    else:
        month += 1
    bins.append(datetime.fromisoformat(f"{year}-{month:02}-01").replace(tzinfo=pytz.UTC).timestamp())

total_counts, bin_edges = np.histogram(ints_df[ints_df.interaction_type == 'amp'].created_at / 1000, bins=bins)
plt.plot(bin_edges[:-1], total_counts, linestyle='-', linewidth=2, label='Amps')
total_counts, bin_edges = np.histogram(ints_df[ints_df.interaction_type == 'guestbook'].created_at / 1000, bins=bins)
plt.plot(bin_edges[:-1], total_counts, linestyle='-', linewidth=2, label='Guestbooks')
total_counts, bin_edges = np.histogram(ints_df[ints_df.interaction_type == 'comment'].created_at / 1000, bins=bins)
plt.plot(bin_edges[:-1], total_counts, linestyle='-', linewidth=2, label='Comments')
ax.set_yscale('log')

plt.legend()
plt.axvline(datetime.fromisoformat(f"2014-01-01").replace(tzinfo=pytz.UTC).timestamp(), color='black', alpha=0.8, linestyle='--', linewidth=1)
plt.axvline(datetime.fromisoformat(f"2019-01-01").replace(tzinfo=pytz.UTC).timestamp(), color='black', alpha=0.8, linestyle='--', linewidth=1)

plt.ylabel("Interactions per month")
plt.title(f"{len(ints_df):,} interactions from {len(set(ints_df.user_id)):,} unique users on {len(set(ints_df.site_id)):,} unique sites")

newline = '\n'
xticks = [datetime.fromisoformat(f"{2005 + i}-01-01").replace(tzinfo=pytz.UTC).timestamp() for i in range((2020 - 2005) + 2)]
plt.xticks(
    xticks, 
    [f"{datetime.utcfromtimestamp(be).strftime('%Y')}" for i, be in enumerate(xticks)])
          
plt.show()

In [None]:
start_timestamp = datetime.fromisoformat(f"2014-01-01").replace(tzinfo=pytz.UTC).timestamp() * 1000
end_timestamp = datetime.fromisoformat(f"2019-01-01").replace(tzinfo=pytz.UTC).timestamp() * 1000
sdf = ints_df[(ints_df.created_at >= start_timestamp)&(ints_df.created_at <= end_timestamp)]
len(sdf)

In [None]:
sdf[['interaction_type', 'parent_type', 'ancestor_type']].value_counts()

In [None]:
# load the journal dataframe with the index
# this is all the new journal data
s = datetime.now()
journal_metadata_dir = "/home/lana/shared/caringbridge/data/derived/journal_metadata"
journal_metadata_filepath = os.path.join(journal_metadata_dir, "journal_metadata.df")
journal_df = pd.read_feather(journal_metadata_filepath)
print(datetime.now() - s)
len(journal_df)

In [None]:
journal_df.sample(n=10)

In [None]:
datetime.utcfromtimestamp(journal_df.created_at.quantile(0.0001) / 1000).isoformat(),\
datetime.utcfromtimestamp(journal_df.created_at.quantile(0.999999) / 1000).isoformat()

In [None]:
# journal updates over time, by month

start_date = "2002-04-01"
end_date = "2019-03-01"
sdate = datetime.fromisoformat(start_date)
edate = datetime.fromisoformat(end_date)
delta = edate - sdate
delta = relativedelta(edate, sdate)
bins = []
for i in range((delta.years*12) + delta.months + 1):
    day = sdate + relativedelta(months=i) #timedelta(months=i)
    bins.append(day.timestamp())

fig, ax = plt.subplots(1, 1, figsize=(10, 4))

total_counts, bin_edges = np.histogram(journal_df.created_at / 1000, bins=bins)
ax.axhline(0, color='gray', alpha=0.4)
ax.plot(bin_edges[:-1], total_counts, linestyle='-', linewidth=2)

# 5 year analysis period of relative normality, 2014-2019
ax.axvline(datetime.fromisoformat("2014-01-01").timestamp(), color='gray', linestyle='--', alpha=0.4)
ax.axvline(datetime.fromisoformat("2019-01-01").timestamp(), color='gray', linestyle='--', alpha=0.4)


use_autoloc = True
locs = bins
if use_autoloc:
    locs = ax.get_xticks()
labels = []
for xtick in locs:
    label = f"{datetime.utcfromtimestamp(xtick).strftime('%b%y')}"
    labels.append(label)
ax.set_xticks(locs)
ax.set_xticklabels(labels)

ax.set_yscale('log')
    
plt.show()

## Visualizing createdAt of guestbooks

`new_guestbook_createdAt.txt` created via `cut -f4 -d, new_guestbook_metadata_raw.csv > new_guestbook_createdAt.txt`

In [None]:
ca_arr = np.zeros(82854708)
with open(os.path.join(working_dir, "new_guestbook_createdAt.txt"), 'r') as infile:
    error_count = 0
    for i, line in tqdm(enumerate(infile), total=82854708):
        try:
            ca_arr[i] = int(line.strip())
        except:
            error_count += 1
            continue
error_count

In [None]:
ca_arr = ca_arr / 1000
ca_arr[:10]

In [None]:
np.min(ca_arr)

In [None]:
print(ca_arr.shape)
ca_arr = ca_arr[ca_arr > 0]
print(ca_arr.shape)

In [None]:
ca_arr_old = np.zeros(82980359)
with open(os.path.join(working_dir, "old_guestbook_createdAt.txt"), 'r') as infile:
    error_count = 0
    for i, line in tqdm(enumerate(infile), total=82854708):
        try:
            ca_arr_old[i] = int(line.strip())
        except:
            error_count += 1
            continue
error_count

In [None]:
ca_arr_old = ca_arr_old / 1000

In [None]:
fig, ax = plt.subplots(1, 1, figsize=(10,4))

bins = []
year = 2005
month = 0
while year != 2020:
    if month == 12:
        year += 1
        month = 1
    else:
        month += 1
    bins.append(datetime.fromisoformat(f"{year}-{month:02}-01").timestamp())

total_counts, bin_edges = np.histogram(ca_arr, bins=bins)
plt.plot(bin_edges[:-1], total_counts, linestyle='-', linewidth=2, label='Guestbooks (2019 data)')

total_counts, bin_edges = np.histogram(ca_arr_old, bins=bins)
plt.plot(bin_edges[:-1], total_counts, linestyle='-', linewidth=2, label='Guestbooks (2016 data)')

plt.axvline(datetime.fromisoformat(f"2016-06-01").timestamp(), color='black', alpha=0.8, linestyle='--', linewidth=1)

plt.ylabel("Guestbook count")

newline = '\n'
xticks = [datetime.fromisoformat(f"{2005 + i}-01-01").timestamp() for i in range((2020 - 2005) + 2)]
plt.xticks(
    xticks, 
    [f"{datetime.utcfromtimestamp(be).strftime('%Y')}" for i, be in enumerate(xticks)])
     
#plt.tight_layout(pad=0)
#plt.margins(0,0)
#plt.savefig(os.path.join(figures_dir, 'initiation_types_timeline.pdf'), dpi=200, pad_inches=0)
     
plt.show()

In [None]:
fig, ax = plt.subplots(1, 1, figsize=(10,4))

bins = []
year = 2005
month = 0
while year != 2020:
    if month == 12:
        year += 1
        month = 1
    else:
        month += 1
    bins.append(datetime.fromisoformat(f"{year}-{month:02}-01").timestamp())

total_counts, bin_edges = np.histogram(ca_arr, bins=bins)
total_counts_old, bin_edges = np.histogram(ca_arr_old, bins=bins)
plt.plot(bin_edges[:-1], total_counts - total_counts_old, linestyle='-', linewidth=2, label='Guestbooks (2019 - 2016 data)')

plt.axvline(datetime.fromisoformat(f"2016-06-01").timestamp(), color='black', alpha=0.8, linestyle='--', linewidth=1)

plt.ylabel("Guestbook count")

newline = '\n'
xticks = [datetime.fromisoformat(f"{2005 + i}-01-01").timestamp() for i in range((2020 - 2005) + 2)]
plt.xticks(
    xticks, 
    [f"{datetime.utcfromtimestamp(be).strftime('%Y')}" for i, be in enumerate(xticks)])
     
#plt.tight_layout(pad=0)
#plt.margins(0,0)
#plt.savefig(os.path.join(figures_dir, 'initiation_types_timeline.pdf'), dpi=200, pad_inches=0)
     
plt.show()

In [None]:
fig, ax = plt.subplots(1, 1, figsize=(10,4))

bins = []
year = 2016
month = 0
while year != 2020:
    if month == 12:
        year += 1
        month = 1
    else:
        month += 1
    bins.append(datetime.fromisoformat(f"{year}-{month:02}-01").timestamp())

total_counts, bin_edges = np.histogram(ca_arr, bins=bins)
plt.plot(bin_edges[:-1], total_counts, linestyle='-', linewidth=2, label='Guestbooks (2019 data)')

total_counts, bin_edges = np.histogram(ca_arr_old, bins=bins)
plt.plot(bin_edges[:-1], total_counts, linestyle='-', linewidth=2, label='Guestbooks (2016 data)')

plt.axvline(datetime.fromisoformat(f"2016-06-01").timestamp(), color='black', alpha=0.8, linestyle='--', linewidth=1)

plt.ylabel("Guestbook count")

newline = '\n'
xticks = [datetime.fromisoformat(f"{2016 + i}-01-01").timestamp() for i in range((2020 - 2016) + 2)]
plt.xticks(
    xticks, 
    [f"{datetime.utcfromtimestamp(be).strftime('%Y')}" for i, be in enumerate(xticks)])
     
#plt.tight_layout(pad=0)
#plt.margins(0,0)
#plt.savefig(os.path.join(figures_dir, 'initiation_types_timeline.pdf'), dpi=200, pad_inches=0)
     
plt.show()

In [None]:
# TODO look for match on guestbook_oid, site_id, and created_at