Build Journal Metadata File
===

This script is for building a journal metadata file from which info can be easily extracted.

A few plausible approaches:
 - Sqlite database
 - HDF5
 - CSV
 - Dataframe, feathered
 
Currently, we build an sqlite database file in this file.

In [1]:
%reload_ext autoreload
%autoreload 2
%matplotlib inline
from IPython.core.display import display, HTML

import os
import numpy as np
import pandas as pd
import itertools

import matplotlib.pyplot as plt
import matplotlib.dates as md
import matplotlib
import pylab as pl

import datetime as dt
import time

from collections import Counter

import json
import os
import re
from html.parser import HTMLParser
import itertools
import multiprocessing as mp
from nltk import word_tokenize
from IPython.core.display import display, HTML
import datetime as dt

from tqdm import tqdm

import collections

In [2]:
working_dir = "/home/srivbane/shared/caringbridge/data/projects/qual-health-journeys/extract_site_features"
os.makedirs(working_dir, exist_ok=True)

flattened_journal_json_filename = os.path.join(working_dir, "journal_flat.json")
assert os.path.exists(flattened_journal_json_filename)

In [19]:
!wc -l {flattened_journal_json_filename}

15327592 /home/srivbane/shared/caringbridge/data/projects/qual-health-journeys/extract_site_features/journal_flat.json


In [14]:
import sqlite3

def get_db():
    db_filename = os.path.join(working_dir, "journal_metadata.db")
    db = sqlite3.connect(
            db_filename,
            detect_types=sqlite3.PARSE_DECLTYPES
        )
    db.row_factory = sqlite3.Row
    return db

In [35]:
try:
    db = get_db()
    db.execute("DROP TABLE IF EXISTS journalMetadata")
    create_table_command = """
    CREATE TABLE journalMetadata (
          id INTEGER PRIMARY KEY AUTOINCREMENT,
          site_id INTEGER NOT NULL,
          journal_oid TEXT NOT NULL,
          user_id INTEGER,
          created_at INTEGER NOT NULL,
          updated_at INTEGER,
          site_index INTEGER NOT NULL
        )
    """
    db.execute(create_table_command)
finally:
    db.close()

In [None]:
# DO NOT RUN
# It turned out to make much more sense to read from the bucket files, not the flattened json file
return
try:
    db = get_db()
    with open(flattened_journal_json_filename, 'r', encoding="utf-8") as infile:
        processed_count = 0
        for line in tqdm(infile, total=15327592):
            journal = json.loads(line)
            break
            db.execute(
                'INSERT INTO journalMetadata (site_id, journal_oid, created_at, updated_at) VALUES (?, ?, ?, ?, ?)',
                (site_id, journal_oid, username, annotation_type, value)
            )
            
            processed_count += 1
            if processed_count % 1000000 == 0:
                db.commit()
            if processed_count > 1000000:
                break
finally:
    db.close()

In [28]:
def get_bucket_filename(siteId):
    # The bucket size used in bucket_journals_by_siteId, needed to recover the appropriate bucket filename
    bucket_size = 1000
    working_dir = "/home/srivbane/shared/caringbridge/data/projects/classify_health_condition/vw_experiments"
    sorted_journal_bucket_dir = os.path.join(working_dir, "sorted_journal_buckets")
    bucket_name = "Unknown"
    if siteId:
        bucket_name = siteId // bucket_size
    path = os.path.join(sorted_journal_bucket_dir, "siteId_{bucket_name}.json".format(bucket_name=bucket_name))
    return path if os.path.exists(path) else None


def get_journals(siteId):
    journal_filename = get_bucket_filename(siteId)
    journals = []
    awaiting_first_journal = True
    if journal_filename:
        with open(journal_filename, 'r', encoding="utf8") as infile:
            for line in infile:
                journal = json.loads(line.strip())
                journal_siteId = int(journal["siteId"]) if "siteId" in journal else None
                if journal_siteId == siteId:
                    journals.append(journal)
                    if awaiting_first_journal:
                        awaiting_first_journal = False
                elif not awaiting_first_journal:
                    # We have already looked at all of journals for this site
                    break
    return journals

len(os.listdir("/home/srivbane/shared/caringbridge/data/projects/classify_health_condition/vw_experiments/sorted_journal_buckets"))

1068

In [36]:
try:
    db = get_db()
    bucket_files_dir = "/home/srivbane/shared/caringbridge/data/projects/classify_health_condition/vw_experiments/sorted_journal_buckets"
    bucket_files = os.listdir(bucket_files_dir)
    for bucket_file in tqdm(bucket_files, desc="Bucket Files Processed"):
        with open(os.path.join(bucket_files_dir, bucket_file), 'r', encoding="utf-8") as infile:
            # load into a different list for each siteId in this file
            site_journal_lists = collections.defaultdict(list)
            for line in infile:
                journal = json.loads(line)
                site_id = int(journal["siteId"])
                site_journal_lists[site_id].append(journal)
            for site_id in site_journal_lists.keys():
                journal_list = site_journal_lists[site_id]
                for i, journal in enumerate(journal_list):
                    journal_index = i
                    site_id = int(journal["siteId"])
                    journal_oid = journal["_id"]["$oid"]
                    created_at = journal["createdAt"]["$date"]
                    updated_at = journal["updatedAt"]["$date"]
                    if "userId" in journal:
                        user_id = int(journal["userId"])
                        db.execute(
                        'INSERT INTO journalMetadata (site_id, journal_oid, created_at, updated_at, site_index, user_id) VALUES (?, ?, ?, ?, ?, ?)',
                        (site_id, journal_oid, created_at, updated_at, journal_index, user_id))
                    else:
                        db.execute(
                        'INSERT INTO journalMetadata (site_id, journal_oid, created_at, updated_at, site_index) VALUES (?, ?, ?, ?, ?)',
                        (site_id, journal_oid, created_at, updated_at, journal_index))
        db.commit()  # commit after each file is processed
finally:
    db.close()

Bucket Files Processed: 100%|██████████| 1068/1068 [21:42<00:00,  1.22s/it]


In [38]:
# I also manually created the following indexes:
"""
CREATE INDEX journalMetadata_siteId ON journalMetadata (site_id);
CREATE INDEX journalMetadata_journalOid ON journalMetadata (journal_oid);
CREATE INDEX journalMetadata_siteId_journalOid ON journalMetadata (site_id, journal_oid);
"""

'\nCREATE INDEX journalMetadata_siteId ON journalMetadata (site_id);\nCREATE INDEX journalMetadata_journalOid ON journalMetadata (journal_oid);\nCREATE INDEX journalMetadata_siteId_journalOid ON journalMetadata (site_id, journal_oid);\n'

Full Journal Text File
===

In [3]:
#Build a separate database containing the journal text

import sqlite3

def get_db():
    db_filename = os.path.join(working_dir, "journal_text.db")
    db = sqlite3.connect(
            db_filename,
            detect_types=sqlite3.PARSE_DECLTYPES
        )
    db.row_factory = sqlite3.Row
    return db

try:
    db = get_db()
    db.execute("DROP TABLE IF EXISTS journalText")
    create_table_command = """
    CREATE TABLE journalText (
          id INTEGER PRIMARY KEY AUTOINCREMENT,
          site_id INTEGER NOT NULL,
          journal_oid TEXT NOT NULL,
          body TEXT
        )
    """
    db.execute(create_table_command)
finally:
    db.close()

In [5]:
# load the journal json, writing out the body text of the journals to the database
try:
    db = get_db()
    bucket_files_dir = "/home/srivbane/shared/caringbridge/data/projects/classify_health_condition/vw_experiments/sorted_journal_buckets"
    bucket_files = os.listdir(bucket_files_dir)
    for bucket_file in tqdm(bucket_files, desc="Bucket Files Processed"):
        with open(os.path.join(bucket_files_dir, bucket_file), 'r', encoding="utf-8") as infile:
            # load into a different list for each siteId in this file
            site_journal_lists = collections.defaultdict(list)
            for line in infile:
                journal = json.loads(line)
                site_id = int(journal["siteId"])
                site_journal_lists[site_id].append(journal)
            for site_id in site_journal_lists.keys():
                journal_list = site_journal_lists[site_id]
                for i, journal in enumerate(journal_list):
                    journal_index = i
                    site_id = int(journal["siteId"])
                    journal_oid = journal["_id"]["$oid"]
                    body = journal["body"] if "body" in journal else None
                    db.execute(
                    'INSERT INTO journalText (site_id, journal_oid, body) VALUES (?, ?, ?)',
                    (site_id, journal_oid, body))
        db.commit()  # commit after each file is processed
finally:
    db.close()

Bucket Files Processed: 100%|██████████| 1068/1068 [22:43<00:00,  1.28s/it]


In [6]:
try:
    db = get_db()
    index_command = """
    CREATE INDEX journalText_siteId_journalOid ON journalText (site_id, journal_oid);
    """
    db.execute(index_command)
finally:
    db.close()