# PRMT-2059 Generate high level table of message patterns prevalance
We believe that being able to break down transfers by the set of messages that occur between the sending and receiving supplier will give us a better understanding of what the actual status of the transfer is. 

In particular, Pending transfers may have a technical issue or may be awaiting practice integration - we may be able to distinguish between these. 
For 6 months of transfers (September 2020 to Feb 2021), we wish to be able to see the list of messages in the form:
- The message creator (sending or requesting practice)
- The message type (interaction name)
- Any associate code (jdi event)

We then wish to break down all transfers by:
- The supplier pathway
- The Status
- The message chain
And order these in a table according to how common they are


In [1]:
import pandas as pd
import numpy as np
import time

In [2]:
overwrite_files = True

## Importing transfer data

In [3]:
# Import transfer files to extract whether message creator is sender or requester
# Using data generated from branch PRMT-1742-duplicates-analysis.
# This is needed to correctly handle duplicates.
# Once the upstream pipeline has a fix for duplicate EHRs, then we can go back to using the main output.
transfer_file_location = "s3://prm-gp2gp-data-sandbox-dev/transfers-duplicates-hypothesis/"
transfer_files = [
    "9-2020-transfers.parquet",
    "10-2020-transfers.parquet",
    "11-2020-transfers.parquet",
    "12-2020-transfers.parquet",
    "1-2021-transfers.parquet",
    "2-2021-transfers.parquet"
]

transfer_input_files = [transfer_file_location + f for f in transfer_files]
transfers_raw = pd.concat((
    pd.read_parquet(f)
    for f in transfer_input_files
))

# In the data from the PRMT-1742-duplicates-analysis branch, these columns have been added , but contain only empty values.
transfers_raw = transfers_raw.drop(["sending_supplier", "requesting_supplier"], axis=1)

# Given the findings in PRMT-1742 - many duplicate EHR errors are misclassified, the below reclassifies the relevant data
has_at_least_one_successful_integration_code = lambda errors: any((np.isnan(e) or e==15 for e in errors))
successful_transfers_bool = transfers_raw['request_completed_ack_codes'].apply(has_at_least_one_successful_integration_code)
transfers = transfers_raw.copy()
transfers.loc[successful_transfers_bool, "status"] = "INTEGRATED"

# Correctly interpret certain sender errors as failed.
# This is explained in PRMT-1974. Eventually this will be fixed upstream in the pipeline.
pending_sender_error_codes=[6,7,10,24,30,23,14,99]
transfers_with_pending_sender_code_bool=transfers['sender_error_code'].isin(pending_sender_error_codes)
transfers_with_pending_with_error_bool=transfers['status']=='PENDING_WITH_ERROR'
transfers_which_need_pending_to_failure_change_bool=transfers_with_pending_sender_code_bool & transfers_with_pending_with_error_bool
transfers.loc[transfers_which_need_pending_to_failure_change_bool,'status']='FAILED'

# Add integrated Late status
eight_days_in_seconds=8*24*60*60
transfers_after_sla_bool=transfers['sla_duration']>eight_days_in_seconds
transfers_with_integrated_bool=transfers['status']=='INTEGRATED'
transfers_integrated_late_bool=transfers_after_sla_bool & transfers_with_integrated_bool
transfers.loc[transfers_integrated_late_bool,'status']='INTEGRATED LATE'

# If the record integrated after 28 days, change the status back to pending.
# This is to handle each month consistently and to always reflect a transfers status 28 days after it was made.
# TBD how this is handled upstream in the pipeline
twenty_eight_days_in_seconds=28*24*60*60
transfers_after_month_bool=transfers['sla_duration']>twenty_eight_days_in_seconds
transfers_pending_at_month_bool=transfers_after_month_bool & transfers_integrated_late_bool
transfers.loc[transfers_pending_at_month_bool,'status']='PENDING'
transfers_with_early_error_bool=(~transfers.loc[:,'sender_error_code'].isna()) |(~transfers.loc[:,'intermediate_error_codes'].apply(len)>0)
transfers.loc[transfers_with_early_error_bool & transfers_pending_at_month_bool,'status']='PENDING_WITH_ERROR'

# Supplier name mapping
supplier_renaming = {
    "EGTON MEDICAL INFORMATION SYSTEMS LTD (EMIS)":"EMIS",
    "IN PRACTICE SYSTEMS LTD":"Vision",
    "MICROTEST LTD":"Microtest",
    "THE PHOENIX PARTNERSHIP":"TPP",
    None: "Unknown"
}

asid_lookup_file = "s3://prm-gp2gp-data-sandbox-dev/asid-lookup/asidLookup-Mar-2021.csv.gz"
asid_lookup = pd.read_csv(asid_lookup_file)
lookup = asid_lookup[["ASID", "MName", "NACS","OrgName"]]

transfers = transfers.merge(lookup, left_on='requesting_practice_asid',right_on='ASID',how='left')
transfers = transfers.rename({'MName': 'requesting_supplier', 'ASID': 'requesting_supplier_asid', 'NACS': 'requesting_ods_code','OrgName':'requesting_practice_name'}, axis=1)
transfers = transfers.merge(lookup, left_on='sending_practice_asid',right_on='ASID',how='left')
transfers = transfers.rename({'MName': 'sending_supplier', 'ASID': 'sending_supplier_asid', 'NACS': 'sending_ods_code','OrgName':'sending_practice_name'}, axis=1)

transfers["sending_supplier"] = transfers["sending_supplier"].replace(supplier_renaming.keys(), supplier_renaming.values())
transfers["requesting_supplier"] = transfers["requesting_supplier"].replace(supplier_renaming.keys(), supplier_renaming.values())

## Stage 1

For each of the message from the Spine raw data, we extract the following:
- messageCreator: "sender", "requestor"
- messageType: interactionName (e.g. "request completed")
- errorCode: None / int
To produce for each message the following list: e.g. `["sender", "interactionName", None]`

Then for each transfer, we merged with the above to produce the following:
- conversationID: str
- messages: list of messages in the transfer e.g. `[["sender", "interactionName", None], ["sender", "interactionName", None]]`

We then save this as a parquet file to the following location: s3://prm-gp2gp-data-sandbox-dev/extra-fields-data-from-splunk/Sept_20_Feb_21_conversations_extended_interaction_messages.parquet

In [4]:
# conversation_id, asid, supplier_type
requesting_supplier_type_map = transfers_raw[["conversation_id", "requesting_practice_asid", "date_requested"]].drop_duplicates()
sending_supplier_type_map = transfers_raw[["conversation_id", "sending_practice_asid", "date_requested"]].drop_duplicates()

requesting_supplier_type_map["supplier_type"] = "requestor"
sending_supplier_type_map["supplier_type"] = "sender"

requesting_supplier_type_map = requesting_supplier_type_map.rename({"requesting_practice_asid": "practice_asid"}, axis=1)
sending_supplier_type_map = sending_supplier_type_map.rename({"sending_practice_asid": "practice_asid"}, axis=1)

supplier_type_mapping = pd.concat([requesting_supplier_type_map, sending_supplier_type_map])
supplier_type_mapping["practice_asid"] = supplier_type_mapping["practice_asid"].astype(int)

In [5]:
# Define a list of files to be loaded in
folder="s3://prm-gp2gp-data-sandbox-dev/spine-gp2gp-data/"
files=["Sept-2020","Oct-2020"]
# ,"Nov-2020","Dec-2020","Jan-2021","Feb-2021","Mar-2021"
full_filenames=[folder + file + ".csv.gz" for file in files]

In [6]:
# Rename message types to be human readable
interaction_name_mapping={"urn:nhs:names:services:gp2gp/RCMR_IN010000UK05":"req start",
"urn:nhs:names:services:gp2gp/RCMR_IN030000UK06":"req complete",
"urn:nhs:names:services:gp2gp/COPC_IN000001UK01":"COPC",
"urn:nhs:names:services:gp2gp/MCCI_IN010000UK13":"app ack"}

In [7]:
def generate_single_frame(file):
    a=time.perf_counter()
    print("Now Processing " + file)
    df=pd.read_csv(file, compression='gzip',error_bad_lines=False)
    df=df.sort_values(by='_time')

    # filter out conversations that are not september transfers & maps whether supplier who sent message is the requesting or sending practice
    df = df.merge(supplier_type_mapping, left_on=["conversationID", "messageSender"], right_on=["conversation_id", "practice_asid"], how="left")
    
    # filter out conversations that took place more than 28 days after the date requested
    in_time_message_bool = (pd.to_datetime(df["_time"]).dt.tz_localize(None) - df["date_requested"]).dt.seconds <= twenty_eight_days_in_seconds
    df = df.loc[in_time_message_bool]
    
    # overview of supplier type mapping
    df['interaction_name']=df['interactionID'].replace(interaction_name_mapping)
    df["jdiEvent"] = df["jdiEvent"].replace("NONE", "")
    df["messages"] = list(zip(df["supplier_type"], df["interaction_name"], df["jdiEvent"]))
    df["messages"] = df["messages"].apply(list)
    df=df[["conversation_id", "messages"]]
    print(time.perf_counter()-a)
    return df

In [8]:
field_data=[generate_single_frame(file) for file in full_filenames]

Now Processing s3://prm-gp2gp-data-sandbox-dev/spine-gp2gp-data/Sept-2020.csv.gz
64.21175881700037
Now Processing s3://prm-gp2gp-data-sandbox-dev/spine-gp2gp-data/Oct-2020.csv.gz
56.95174786300049


In [9]:
print('Now Concatenating')
a=time.perf_counter()
full_field_data=pd.concat(field_data,axis=0)
print(time.perf_counter()-a)

print('Now identifying and removing NaN conversation IDs')
a=time.perf_counter()
valid_conversation_bool=~full_field_data['conversation_id'].isna()
full_field_data=full_field_data.loc[valid_conversation_bool]
print(time.perf_counter()-a)

print('Now Grouping by conversation')
a=time.perf_counter()
full_field_data=full_field_data.groupby('conversation_id')['messages'].apply(list)
print(time.perf_counter()-a)

Now Concatenating
0.3748552999968524
Now identifying and removing NaN conversation IDs
1.0954096030000073
Now Grouping by conversation
25.15250265099894


In [10]:
print('Now Saving Data')

if overwrite_files:
    pd.DataFrame(full_field_data).to_parquet('s3://prm-gp2gp-data-sandbox-dev/extra-fields-data-from-splunk/Sept_20_Feb_21_conversations_extended_interaction_messages.parquet')

Now Saving Data


## Stage 2.

a. We load in the original transfer data and for each transfer, and for each transfer we will have a list all messages associated (by joining in the message list dataset from above)

b. We group by supplier pathway, status and message
     - We count for each one, order it, add percentages and this is the final output

In [11]:
conversations_extended_interaction_messages=pd.read_parquet('s3://prm-gp2gp-data-sandbox-dev/extra-fields-data-from-splunk/Sept_20_Feb_21_conversations_extended_interaction_messages.parquet')
conversations_extended_interaction_messages["messages"]=conversations_extended_interaction_messages["messages"].apply(lambda message_list: tuple([tuple(message) for message in message_list]))

In [12]:
transfers_with_message_list = transfers.merge(conversations_extended_interaction_messages, left_on="conversation_id", right_index=True)

In [13]:
message_list_prevelance_table = transfers_with_message_list.groupby(["requesting_supplier", "sending_supplier", "status", "messages"]).agg({"conversation_id": "count"})
message_list_prevelance_table = message_list_prevelance_table.rename({"conversation_id": "Total Number of transfers"}, axis=1).sort_values(by="Total Number of transfers", ascending=False)
message_list_prevelance_table

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,Unnamed: 3_level_0,Total Number of transfers
requesting_supplier,sending_supplier,status,messages,Unnamed: 4_level_1
EMIS,EMIS,INTEGRATED,"((requestor, req start, ), (sender, req complete, ), (sender, app ack, ), (requestor, app ack, ))",153686
TPP,EMIS,INTEGRATED,"((requestor, req start, ), (sender, req complete, ), (sender, app ack, ), (requestor, app ack, ))",50165
EMIS,TPP,INTEGRATED,"((requestor, req start, ), (sender, req complete, ), (sender, app ack, ), (requestor, app ack, ))",16743
EMIS,EMIS,INTEGRATED,"((requestor, req start, ), (sender, req complete, ), (sender, app ack, ), (requestor, COPC, ), (sender, COPC, ), (requestor, app ack, ), (requestor, app ack, ))",13244
EMIS,EMIS,INTEGRATED,"((requestor, req start, ), (sender, req complete, ), (sender, app ack, ), (requestor, app ack, 15))",10296
EMIS,EMIS,INTEGRATED,...,...
EMIS,EMIS,INTEGRATED,"((requestor, req start, ), (sender, req complete, ), (sender, app ack, ), (requestor, COPC, ), (sender, COPC, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (requestor, app ack, ), (sender, COPC, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (requestor, app ack, ), (sender, COPC, ), (sender, COPC, ), (sender, COPC, ), (requestor, app ack, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (sender, COPC, ), (requestor, app ack, ), (requestor, app ack, ), (requestor, app ack, ))",1
EMIS,EMIS,INTEGRATED,"((requestor, req start, ), (sender, req complete, ), (sender, app ack, ), (requestor, COPC, ), (sender, COPC, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (requestor, app ack, ), (sender, COPC, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (sender, req complete, ), (sender, app ack, ), (requestor, COPC, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, 12), (requestor, app ack, ), (requestor, app ack, ))",1
EMIS,EMIS,INTEGRATED,"((requestor, req start, ), (sender, req complete, ), (sender, app ack, ), (requestor, COPC, ), (sender, COPC, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (requestor, app ack, ), (sender, COPC, ), (sender, COPC, ), (requestor, app ack, ), (requestor, app ack, ), (sender, COPC, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (sender, COPC, ), (requestor, app ack, ), (requestor, app ack, ), (sender, COPC, ), (sender, COPC, ), (requestor, app ack, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (requestor, app ack, ), (requestor, app ack, ), (requestor, app ack, ), (requestor, app ack, ))",1
EMIS,EMIS,INTEGRATED,"((requestor, req start, ), (sender, req complete, ), (sender, app ack, ), (requestor, COPC, ), (sender, COPC, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (requestor, app ack, ), (sender, COPC, ), (sender, COPC, ), (requestor, app ack, ), (requestor, app ack, ), (sender, COPC, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (sender, COPC, ), (requestor, app ack, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (requestor, app ack, ), (requestor, app ack, ), (sender, COPC, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (sender, COPC, ), (requestor, app ack, ), (requestor, app ack, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), (sender, COPC, ), (requestor, app ack, ), (sender, COPC, ), ...)",1


In [14]:
total_transfer_count = message_list_prevelance_table["Total Number of transfers"].sum()
total_transfer_count

508081

In [15]:
message_list_prevelance_table.shape

(95978, 1)

In [16]:
message_list_prevelance_table["% Transfers"] = (message_list_prevelance_table["Total Number of transfers"] / total_transfer_count).multiply(100).round(2)

In [17]:
reduced_message_list_prevelance_table_bool = message_list_prevelance_table["Total Number of transfers"] > 1
reduced_message_list_prevelance_table = message_list_prevelance_table[reduced_message_list_prevelance_table_bool]
reduced_message_list_prevelance_table.shape

(5494, 2)

In [18]:
if overwrite_files:
    pd.DataFrame(reduced_message_list_prevelance_table).to_csv('s3://prm-gp2gp-data-sandbox-dev/notebook-outputs/36--PRMT-2059-high-level-table-of-message-patterns-reduced.csv')