In [2]:
!pip3 install names

Collecting names
  Downloading names-0.3.0.tar.gz (789 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m789.1/789.1 KB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hBuilding wheels for collected packages: names
  Building wheel for names (setup.py) ... [?25ldone
[?25h  Created wheel for names: filename=names-0.3.0-py3-none-any.whl size=803699 sha256=031df6b2ea3887609cf8146403c2d3b731827a0cd8cdf97f1551ca2da0054844
  Stored in directory: /root/.cache/pip/wheels/d0/35/f7/c72132a4f3878b82018a3e61bf2a35e6b63cebe1dd9f72ec1e
Successfully built names
Installing collected packages: names
Successfully installed names-0.3.0
You should consider upgrading via the '/usr/local/bin/python -m pip install --upgrade pip' command.[0m[33m
[0m

In [3]:
from collections import OrderedDict
from datetime import datetime
from itertools import chain
import json
from random import choice, randint, random

import names
import numpy
import pandas as pd

from shinewave_webapp.database_connector import get_conn, get_random_key, run_query


FIRST_NAME_LIST = [names.get_first_name() for i in range(500)]
LAST_NAME_LIST = [names.get_last_name() for i in range(500)]
NODES = run_query('SELECT DISTINCT object_id FROM workflow_nodes', return_data_format=dict)
NODE_WEIGHTS = {object_id: random() for object_id in NODES['object_id']}


def add_progression_event(
    event_id,
    node_master_type,
    node_parent_type,
    node_detail_type,
    recipient_id,
    current_node_time,
    added_days,
    added_minutes,
    workflow_node_id
):
    run_query(
        """
            INSERT INTO events (id, event_type, event_subtype, recipient_id, event_time, workflow_node_id, custom_data)
            VALUES (?, ?, ?, ?, ?::TIMESTAMP + INTERVAL '? DAYS' + INTERVAL '? MINUTES', ?, '{}')
        """,
        sql_parameters=[
            event_id,
            f'{node_master_type}.{node_parent_type}',
            node_detail_type,
            recipient_id,
            current_node_time,
            added_days,
            added_minutes,
            workflow_node_id
        ],
        commit=True
    )
    

def add_outreach_entry(
    workflow_id,
    current_id,
    recipient_id,
    workflow_node_id,
    current_node_time,
    added_days,
    pending_triggers,
    pending_outreaches,
    pending_markers,
    active
):
    if not isinstance(pending_triggers, str):
        pending_triggers = json.dumps(pending_triggers)
    if not isinstance(pending_outreaches, str):
        pending_outreaches = json.dumps(pending_outreaches)
    if not isinstance(pending_markers, str):
        pending_markers = json.dumps(pending_markers)

    run_query(
        """
            INSERT INTO outreach_lists (
                id,
                account_id,
                workflow_id,
                source_event_id,
                recipient_id,
                current_node_id,
                current_node_time,
                pending_triggers,
                pending_outreaches,
                pending_markers,
                active
            )
            VALUES (
                (SELECT MAX(id) + 1 FROM outreach_lists), -- id
                1, -- account_id
                ?, -- workflow_id
                ?, -- source_event_id
                ?, -- recipient_id
                ?, -- current_node_id
                ?::TIMESTAMP + INTERVAL '? DAYS', -- current_node_time
                ?, -- pending_triggers
                ?, -- pending_outreaches
                ?, -- pending_markers
                ? -- active
            )
        """,
        sql_parameters=[
            workflow_id,
            current_id,
            recipient_id,
            workflow_node_id,
            current_node_time,
            added_days,
            pending_triggers,
            pending_outreaches,
            pending_markers,
            active
        ],
        commit=True
    )
    

def deactivate_outreach_list(outreach_list_id):
    run_query(
        "UPDATE outreach_lists SET active = 'FALSE' WHERE id = ?",
        sql_parameters=[outreach_list_id],
        commit=True
    )


def get_node_data(workflow_id, object_id):
    next_node_data = run_query(
        """
            SELECT
                id,
                node_type,
                outputs AS pending_nodes
            FROM workflow_nodes
            WHERE
                workflow_id = ?
                AND object_id = ?
                AND UPPER(active) = 'TRUE'
        """,
        sql_parameters=[workflow_id, object_id],
        return_data_format=dict
    )
    
    workflow_node_id = next_node_data['id'][0]
    node_type = next_node_data['node_type'][0]
    pending_nodes = json.loads(next_node_data['pending_nodes'][0])
    
    return (workflow_node_id, node_type, pending_nodes)
    

def progress_outreach(outreach_list_id, node_weights=NODE_WEIGHTS):
    current_state = run_query(
        """
            SELECT
                workflow_id,
                recipient_id,
                current_node_id IS NULL AS is_entry_node,
                current_node_time,
                pending_triggers,
                pending_outreaches,
                pending_markers
            FROM outreach_lists
            WHERE id = ? AND UPPER(active) = 'TRUE'
        """,
        sql_parameters=[outreach_list_id],
        return_data_format=dict
    )
    try:
        workflow_id = current_state['workflow_id'][0]
    except:
        return False
    is_entry_node = current_state['is_entry_node'][0]
    recipient_id = current_state['recipient_id'][0]
    current_node_time = current_state['current_node_time'][0]
    pending_triggers = json.loads(current_state['pending_triggers'][0])
    pending_outreaches = json.loads(current_state['pending_outreaches'][0])
    pending_markers = json.loads(current_state['pending_markers'][0])
    
    if pending_markers:
        for node in pending_markers:
            workflow_node_id, node_type, null = get_node_data(workflow_id, node)

            node_master_type, node_parent_type, node_detail_type = node_type.split('.')
            current_id = run_query("SELECT COALESCE(MAX(id), 0) FROM events")[0][0] + 1

            add_progression_event(
                current_id,
                node_master_type,
                node_parent_type,
                node_detail_type,
                recipient_id,
                current_node_time,
                0,
                0,
                workflow_node_id
            )

            add_outreach_entry(
                workflow_id,
                current_id,
                recipient_id,
                workflow_node_id,
                current_node_time,
                0,
                [],
                [],
                [],
                'FALSE'
            )
    
    if pending_outreaches:
        next_node = pending_outreaches[0]
        added_days = 0
        added_minutes = randint(0, 10)

    elif pending_triggers:
        if (not is_entry_node) and (random() < .2):
            pending_trigger_node_types = run_query(
                """
                    SELECT node_type
                    FROM workflow_nodes
                    WHERE
                        workflow_id = ?
                        AND object_id IN ?
                        AND UPPER(active) = 'TRUE'
                """,
                sql_parameters=[workflow_id, tuple(pending_triggers)],
                return_data_format=dict
            )
            if not 'nodes.trigger.TimeElapsedTrigger' in pending_trigger_node_types['node_type']:
                deactivate_outreach_list(outreach_list_id)
                return True

        node_weights = [node_weights[i] for i in pending_triggers]
        node_weights = [i/sum(node_weights) for i in node_weights]
        next_node = numpy.random.choice(pending_triggers, 1, p=node_weights)[0]
        added_days = randint(1, 8)
        added_minutes = 0
    
    workflow_node_id, node_type, pending_nodes = get_node_data(workflow_id, next_node)

    if pending_nodes:
        pending_node_data = run_query(
            """
                SELECT
                    object_id,
                    node_type
                FROM workflow_nodes
                WHERE
                    workflow_id = ?
                    AND object_id IN ?
                    AND UPPER(active) = 'TRUE'
            """,
            sql_parameters=[workflow_id, tuple(pending_nodes)],
            return_data_format=dict
        )

        pending_node_data = list(zip(pending_node_data['object_id'], pending_node_data['node_type']))
        pending_triggers = [
            object_id for object_id, node_type in pending_node_data if node_type.startswith('nodes.trigger')
        ]
        pending_outreaches = [
            object_id for object_id, node_type in pending_node_data if node_type.startswith('nodes.outreach')
        ]
        pending_markers = [
            object_id for object_id, node_type in pending_node_data if node_type.startswith('nodes.marker')
        ]
        active = 'TRUE'
    else:
        pending_triggers = pending_outreaches = pending_markers = []
        active = 'FALSE'

    node_master_type, node_parent_type, node_detail_type = node_type.split('.')
    
    current_id = run_query("SELECT COALESCE(MAX(id), 0) FROM events")[0][0] + 1

    add_progression_event(
        current_id,
        node_master_type,
        node_parent_type,
        node_detail_type,
        recipient_id,
        current_node_time,
        added_days,
        added_minutes,
        workflow_node_id
    )
    
    add_outreach_entry(
        workflow_id,
        current_id,
        recipient_id,
        workflow_node_id,
        current_node_time,
        added_days,
        pending_triggers,
        pending_outreaches,
        pending_markers,
        active
    )
    
    deactivate_outreach_list(outreach_list_id)

    return True


def generate_drug():
    prefixes = [
        'Lorem',
        'Ipsum',
        'Dolor',
        'Sit',
        'Amet',
        'Elit',
        'Sed',
        'Do',
        'Eiusmod',
        'Tempor'
    ]
    suffixes = [
        'afil',
        'asone',
        'bicin',
        'bital',
        'caine',
        'cillin',
        'cycline',
        'dazole',
        'dipine',
        'dronate',
        'eprazole',
        'fenac',
        'floxacin',
        'gliptin',
        'glitazone',
        'iramine',
        'lamide',
        'mab',
        'mustine',
        'mycin',
        'nacin',
        'nazole',
        'olol',
        'olone',
        'onide',
        'oprazole',
        'phylline',
        'pramine',
        'pril',
        'profen',
        'ridone',
        'sartan',
        'semide',
        'setron',
        'statin',
        'tadine',
        'tadine',
        'terol',
        'thiazide',
        'tinib',
        'trel',
        'triptan',
        'tyline',
        'vir',
        'vir',
        'vir',
        'vudine',
        'zepam',
        'zodone',
        'zolam',
        'zosin'
    ]
    
    return choice(prefixes) + choice(suffixes)


def generate_members_data(number_of_rows, starting_id=5, first_name_list=FIRST_NAME_LIST, last_name_list=LAST_NAME_LIST):
    rows = []
    member_id = starting_id
    for i in range(number_of_rows):
        provider_id = get_random_key([])[:11]
        phone_number = int(''.join([str(randint(0, 9)) for i in range(10)]))
        first_name = choice(first_name_list)
        last_name = choice(last_name_list)
        email_address = f'{first_name}.{last_name}@test.com'

        prescription_1 = generate_drug()
        prescription_2 = [generate_drug(), ''][randint(0, 1)]
        custom_data = {'prescription_1': prescription_1, 'prescription_2': prescription_2}
        custom_data = json.dumps(custom_data)

        key_date = None
        while key_date is None:
            try:
                key_date = datetime(randint(2021, 2022), randint(1,12), randint(1, 31))
                key_date = str(key_date).split(' ')[0]
            except:
                key_date = None

        key_hour = '{:02d}'.format(randint(0, 23))
        key_minute = '{:02d}'.format(randint(0, 59))
        key_time = f'{key_hour}:{key_minute}:00'
        row = [
            member_id,
            1,
            provider_id,
            first_name,
            last_name,
            phone_number,
            email_address,
            key_date,
            key_time,
            'US/Pacific',
            None,
            None,
            custom_data,
            True
        ]
        member_id += 1
        rows.append(row)

    return rows

def generate_lists_data(starting_id=0):
    def _val_to_json(val):
        if val:
            return json.dumps(val.split(','))
        else:
            return json.dumps([])

    recipients = run_query(
        f"""
            WITH base_data AS (
                SELECT
                    ROW_NUMBER() OVER (PARTITION BY NULL ORDER BY NULL) - 1 + {starting_id} AS id,
                    account_id,
                    2 AS workflow_id,
                    NULL AS source_event_id,
                    id AS recipient_id,
                    NULL AS current_node_id,
                    '2021-01-01 10:00'::TIMESTAMP + RANDOM() * (INTERVAL '700 DAYS') AS current_node_time,
                    NULL AS pending_triggers,
                    TRUE as active
                FROM recipients
                WHERE
                    account_id = 1
                    AND active
            )
            SELECT
                bd.*,
                STRING_AGG(wn1.object_id, ',') AS pending_triggers,
                STRING_AGG(wn2.object_id, ',') AS pending_outreaches,
                STRING_AGG(wn3.object_id, ',') AS pending_markers
            FROM base_data bd
            LEFT JOIN workflow_nodes wn1 ON
                bd.workflow_id = wn1.workflow_id
                AND wn1.active = 'TRUE'
                AND wn1.inputs = '[]'
                AND wn1.node_type LIKE ?
            LEFT JOIN workflow_nodes wn2 ON
                bd.workflow_id = wn2.workflow_id
                AND wn2.active = 'TRUE'
                AND wn2.inputs = '[]'
                AND wn2.node_type LIKE ?
            LEFT JOIN workflow_nodes wn3 ON
                bd.workflow_id = wn3.workflow_id
                AND wn3.active = 'TRUE'
                AND wn3.inputs = '[]'
                AND wn3.node_type LIKE ?
            LEFT JOIN outreach_lists ol ON
                bd.recipient_id = ol.recipient_id
                AND bd.workflow_id = ol.workflow_id
            WHERE ol.id IS NULL
            GROUP BY
                bd.id,
                bd.account_id,
                bd.workflow_id,
                bd.source_event_id,
                bd.recipient_id,
                bd.current_node_id,
                bd.current_node_time,
                bd.pending_triggers,
                bd.active
        """,
        return_data_format=dict,
        sql_parameters=['nodes.trigger%', 'nodes.outreach%', 'nodes.marker%']
    )
    
    recipients_df = pd.DataFrame(recipients)
    recipients_df['pending_triggers'] = recipients_df['pending_triggers'].map(_val_to_json)
    recipients_df['pending_outreaches'] = recipients_df['pending_outreaches'].map(_val_to_json)
    recipients_df['pending_markers'] = recipients_df['pending_markers'].map(_val_to_json)
    return recipients_df

for repeater in range(100):
    print(f'\nREPEATER {repeater}\n------------------------')
    table_gen_dict = OrderedDict({
        'recipients': {'gen_fn': generate_members_data, 'args': [100]}
    })

    for table_name, details in table_gen_dict.items():

        starting_id = run_query(
            f"SELECT MAX(id) FROM {table_name}",
            return_data_format=list
        )[0][0] or 0

        starting_id += 1

        gen_func = details['gen_fn']
        args = details.get('args', [])
        results = gen_func(*args, starting_id=starting_id)
        table_gen_dict[table_name]['results'] = results

    for table_name, details in table_gen_dict.items():
        rows = []
        insert_statement = f"INSERT INTO {table_name} VALUES"
        for row in details['results']:
            insert_statement += '\n    (' + ', '.join(['?' for i in row]) + '),'
            rows += row
        insert_statement = insert_statement[:-1]
        run_query(insert_statement, sql_parameters=rows, commit=True)

    df = generate_lists_data()
    conn = get_conn()
    df.to_sql('outreach_lists', conn, if_exists='append', index=False)
    conn.close()

    progress_id_list = list(chain(*run_query(
        "SELECT id FROM outreach_lists WHERE UPPER(active) = 'TRUE'", return_data_format=list
    )))

    iteration = 0
    while progress_id_list:
        iteration += 1
        if True: # iteration % 10 == 0 or iteration == 1:
            print(f'iteration {iteration}. {len(progress_id_list)} items to process.')
        [progress_outreach(i) for i in progress_id_list]
        progress_id_list = list(chain(*run_query(
            "SELECT id FROM outreach_lists WHERE UPPER(active) = 'TRUE'", return_data_format=list
        )))

    run_query("""
        SELECT pg_terminate_backend(pid) FROM pg_stat_activity
        WHERE datname = 'shinewavebackend'
        AND pid <> pg_backend_pid()
        AND state in ('idle');
    """)


print('done')


REPEATER 0
------------------------
iteration 1. 102 items to process.


KeyboardInterrupt: 

In [387]:
import pandas as pd

outreach_data = run_query(
    """
        SELECT
            ol.id,
            ol.workflow_id,
            ol.recipient_id,
            ol.current_node_time::DATE AS current_node_date,
            ol.current_node_id,
            wn.name AS node_name
        FROM outreach_lists ol
        INNER JOIN workflow_nodes wn ON ol.current_node_id = wn.id
        WHERE wn.node_type NOT LIKE ?
    """,
    sql_parameters="nodes.trigger%",
    return_data_format=dict
)

outreach_df = pd.DataFrame(outreach_data)
pivoted_df = outreach_df.groupby(['current_node_date', 'workflow_id', 'recipient_id', 'id']).first()

In [419]:
def get_numbered_columns(df, number):
    return [f'{col}_{number}' for col in df.columns]

def id_range_cleaner(row, col_index):
    if row[f'id_{col_index - 1}'] is None:
        return None
    if row[f'id_{col_index}'] > row[f'id_{col_index - 1}']:
        return row[f'id_{col_index}']
    else:
        return None

cur_index = 2
left_df = outreach_df.copy()
left_df.columns = get_numbered_columns(left_df, 1)
merge_valid = True

while merge_valid:
    holdouts = left_df[pd.isnull(left_df[f'id_{cur_index - 1}'])]
    left_df.set_index(f'recipient_id_1', inplace=True)

    right_df = outreach_df.copy()
    right_df.columns = get_numbered_columns(right_df, cur_index)
    right_df.set_index(f'recipient_id_{cur_index}', inplace=True)

    zipped_df = left_df.join(right_df, how='left')
    zipped_df.reset_index(drop=False, inplace=True)

    zipped_df[f'validity_sorter_{cur_index}'] = zipped_df[f'id_{cur_index - 1}'] < zipped_df[f'id_{cur_index}']
    zipped_df[f'validity_sorter_{cur_index}'] = ~zipped_df[f'validity_sorter_{cur_index}']
    zipped_df.sort_values([f'validity_sorter_{cur_index}', f'id_{cur_index}'], inplace=True)

    sort_cols = [col for col in zipped_df.columns if not col.endswith(f'_{cur_index}')]

    zipped_df = zipped_df.groupby(sort_cols).first()
    zipped_df.reset_index(inplace=True)

    for i in list(range(2, cur_index + 1)):
        zipped_df[f'id_{i}'] = zipped_df.apply(lambda row: id_range_cleaner(row, i), axis=1)

    if len(zipped_df) == 0:
        zipped_df = holdouts[[col for col in holdouts.columns if not col.endswith(f'_{cur_index - 1}')]]
        merge_valid = False
    else:
        left_df = pd.concat([holdouts, zipped_df])
        cur_index += 1

rename_dict = {}
group_cols = []
for col in zipped_df.columns:
    if col.startswith('node_name_') or col.startswith('current_node_date'):
        new_name = col.replace('node_name_', 'stage')
        rename_dict[col] = new_name
        group_cols.append(new_name)
    elif col == 'id_1':
        new_name = 'value'
        rename_dict[col] = new_name
    elif col == 'workflow_id_1':
        new_name = col.rsplit('_', 1)[0]
        rename_dict[col] = new_name
        group_cols.append(new_name)
    else:
        rename_dict[col] = 'delete'

zipped_df = zipped_df.rename(columns=rename_dict)
del zipped_df['delete']
zipped_df.fillna('', inplace=True)
agg_df = zipped_df.groupby(group_cols).count()
agg_df.reset_index(inplace=True)

for col in agg_df:
    agg_df[col] = agg_df[col].map(lambda val: val if str(val) else None)

agg_df.to_csv('sankey_outreach_activity_data.csv', index=False)