# Data Services for demo script
See more examples of using data services on the link: https://github.com/th2-net/th2-data-services-template

In [None]:
from pprint import pprint
import pandas as pd
from IPython.core.display import display, HTML
from datetime import datetime, timedelta
from th2_data_services.data_source import DataSource
from th2_data_services.data import Data
from th2_data_services.events_tree import EventsTree
from th2_data_services.utils import Utils
from pandas import DataFrame, Grouper
import pickle

# This settings for increase display jupyter notebook and dataframe table.
display(HTML("<style>.container { width:100% !important; }</style>"))
pd.options.display.max_rows = 1500
pd.set_option('display.expand_frame_repr', False)
pd.set_option('display.max_colwidth', 1000)

# For understand which event type on based name we get from stream.
def get_super_type(record, tree):
    name = record.get("eventName")
    parent_id = record.get("parentEventId")
    super_type = record.get("eventType")
    if super_type == "":
        if "Recon" in name:
            super_type = "Recon Folder"
        else:
            if not parent_id:
                super_type = "Test Run"
            else:
                parent_event = tree.get(parent_id)
                if parent_event:
                    parent_super_type = get_super_type(parent_event, tree)
                    if parent_super_type == "Test Run":
                        super_type = "Test Case"
                    elif parent_super_type == "Recon Folder":
                        super_type = "Recon Rule"
                    elif parent_super_type == "Recon Rule":
                        super_type = "Recon Status"
                    elif parent_super_type == "Recon Status":
                        super_type = "Recon Event"

    return super_type

# Base extract (transform function)
# record is required arguments.
def extract_basic(record):
    new_object = {}
    start_time = datetime.fromtimestamp(record.get("startTimestamp", {}).get("epochSecond", 0))
    start_time += timedelta(microseconds=record.get("startTimestamp", {}).get("nano", 0))
    end_time = datetime.fromtimestamp(record.get("endTimestamp", {}).get("epochSecond", 0))
    end_time += timedelta(microseconds=record.get("endTimestamp", {}).get("nano", 0))
    new_object.update(
        {
            "super_type": get_super_type(record, tree),
            "start_time": start_time,
            "end_time": end_time,
            "status": "SUCCESSFUL" if record.get("successful") else "FAILED",
            "eventName": record.get("eventName"),
            "eventId": record.get("eventId"),
            "parentEventId": record.get("parentEventId"),
            "body": record.get("body"),
            "messages_id": record.get("attachedMessageIds")
        }
    )
    if new_object['eventName'] is None:
        pprint(record)
    return new_object

## Start and finish test time setup

Get start and finish test time automatically

In [None]:
with open('start_datetime.pickle', 'rb') as f:
    START_TIME = pickle.load(f)
with open('finish_datetime.pickle', 'rb') as f:
    END_TIME = pickle.load(f)

You can also set the time manually.  
Just uncomment the following code block and set the time of interest.

In [None]:
# START_TIME = datetime(year=2021, month=6, day=20, hour=13, minute=44, second=41, microsecond=692724)
# END_TIME = datetime(year=2021, month=6, day=20, hour=13, minute=45, second=49, microsecond=28579)

In [None]:
DATA_PROVIDER_NODE_PORT = 30789
HOST = '10.64.66.66'  # th2-kube-demo
URL = F"http://{HOST}:{DATA_PROVIDER_NODE_PORT}"


data_source = DataSource(URL)
events_with_attached_msgs: Data = data_source.get_events_from_data_provider(
    startTimestamp=START_TIME,
    endTimestamp=END_TIME,
    metadataOnly=False,
    attachedMessages=True,
    cache=True
)
    
# We build events tree for further assistance.
events_tree = EventsTree(events_with_attached_msgs)
tree = events_tree.events

# Here we get events which doesn't exist in data source interval.
events_tree.recover_unknown_events(data_source)

### Number of Events in the tree

In [None]:
len(tree)

In [None]:
Utils.aggregate_a_group(events_with_attached_msgs.map(extract_basic), "super_type")

## Recons analysis

### [1] Summarize table
The table summarizes the work of the check2-recon th2 component.  
Shows the names of the rules, statuses, and how many there were.

In [None]:
def transform_output(record):
    recon_status = tree.get(record.get("parentEventId"))
    recon_rule = tree.get(recon_status.get("parentEventId"))
    recon_folder = tree.get(recon_rule.get("parentEventId"))
    new_obj = {
        "Recon Folder": recon_folder.get("eventName"),
        "Recon Rule" : recon_rule.get("eventName"),
        "Recon Status": recon_status.get("eventName"),
        "Number of Events": 1,
        "Start Time": record.get("start_time"),
        "End Time": record.get("end_time")
    }
    return new_obj

data: Data = events_with_attached_msgs\
        .map(extract_basic)\
        .filter(lambda record: record.get("super_type") == "Recon Event")\
        .map(transform_output)\

# Functions from pandas.
df = DataFrame(data).groupby(['Recon Folder', "Recon Rule", 'Recon Status']).agg(
    {"Number of Events": "sum", "Start Time": "min", "End Time": "max"})

df = Utils.append_total_rows(df, {"Number of Events": "sum", "Start Time": "min", "End Time": "max"})
df["duration"] = df["End Time"] - df["Start Time"]
df

### [2] Table - Types of messages and their number processed by recon 

In [None]:
def is_recon_ancestor(record):
    parent_id = record.get("parentEventId")
    if parent_id is not None:
        ancestor = events_tree.get_ancestor_by_super_type(record, "Recon Folder", get_super_type)
        if ancestor:
            return True
    return False

def is_match_or_match_failed(record):
    ancestor = events_tree.get_ancestor_by_name(record, "No match")
    if ancestor and not ancestor.get("successful"):
        return True
    ancestor = events_tree.get_ancestor_by_name(record, "Matched passed")
    if ancestor:
        return True
    return False

def exctract_block(record):
    messages_id = record.get("messages_id")
    
    if not messages_id:
        return None
        
    messages = data_source.find_messages_by_id_from_data_provider(messages_id)
    output = [{"MsgType": message.get("body", {}).get("metadata", {}).get("messageType")} for message in messages]
    
    return output

data = events_with_attached_msgs\
        .map(extract_basic)\
        .filter(is_recon_ancestor)\
        .filter(is_match_or_match_failed)\
        .map(exctract_block)

df = DataFrame(data)
df
df.groupby("MsgType").size().reset_index(name="count")

## Script analysis

In [None]:
def extract_basic_script(record):
    new_object = {}
    start_time = datetime.fromtimestamp(record.get("startTimestamp", {}).get("epochSecond", 0))
    start_time += timedelta(microseconds=record.get("startTimestamp", {}).get("nano", 0)/1000)
    end_time = datetime.fromtimestamp(record.get("endTimestamp", {}).get("epochSecond", 0))
    end_time += timedelta(microseconds=record.get("endTimestamp", {}).get("nano", 0)/1000)
    new_object.update(
        {
            "start_time": start_time,
            "end_time": end_time,
            "super_type": get_super_type(record, tree),
            "eventName": record.get("eventName"),
            "parentEventId": record.get("parentEventId"),
            "status": "SUCCESSFUL" if record.get("successful") else "FAILED",
            "body": record.get("body"),
            "attachedMessageIds": record.get("attachedMessageIds")
        }
    )
    return new_object

### [1] Basic statistics by test cases
Shows how many test case are failed and are passed.

In [None]:
def transform_output(record):    
    new_obj = {
        "Test Case": 1,
        "Status": record.get("status")
    }
    return new_obj

data = events_with_attached_msgs\
        .map(extract_basic_script)\
        .filter(lambda record: record.get("super_type") == "Test Case")\
        .map(transform_output)

df = DataFrame(data=data)
df = df.groupby(["Status"]).sum()
df["Percent"] = df["Test Case"] / df["Test Case"].sum() * 100
df

### [2] Detail statistics by test cases
Shows each test cases name, status, time and duration.

In [None]:
def ancestor_is_test_case(record):
    if not record.get("parentEventId"):
        return False
    ancestor = events_tree.get_ancestor_by_super_type(record, "Test Case", get_super_type)
    if ancestor:
        return True
    return False

def transform_output(record):
    test_case = events_tree.get_ancestor_by_super_type(record, "Test Case", get_super_type)
    test_run = tree.get(test_case.get("parentEventId"))
    
    start_time = datetime.fromtimestamp(test_case.get("startTimestamp", {}).get("epochSecond", 0))
    start_time += timedelta(microseconds=test_case.get("startTimestamp", {}).get("nano", 0)/1000)
    
    message_id = record.get("attachedMessageIds")
    
    if not message_id:
        return None
    
    message = next(data_source.find_messages_by_id_from_data_provider(message_id))
    if not message:
        return None
    
    body = message.get("body", {})
    if not body:
        return None
    
    end_time = body.get("metadata", {}).get("timestamp")
    end_time = datetime.strptime(end_time, "%Y-%m-%dT%H:%M:%S.%fZ")
    end_time += timedelta(hours=3)
    
    new_obj = {
        "Test Run": test_run.get("eventName"),
        "Test Case": test_case.get("eventName"),
        "Status": "SUCCESSFUL" if test_case.get("successful") else "FAILED",
        'Start Time': start_time,
        'End Time': end_time,
    }
    return new_obj

data = events_with_attached_msgs\
        .map(extract_basic_script)\
        .filter(ancestor_is_test_case)\
        .filter(lambda record: record.get("super_type") in ["Verification", "message"])\
        .map(transform_output)

df = DataFrame(data=data)
df = df.groupby(["Test Run", "Test Case", "Status"]).agg({"Start Time": "min", "End Time": "max"}).reset_index()
df["duration"] = df["End Time"] - df["Start Time"]
df.sort_values(by=["Start Time"])

### [3] Failed Verifications

In [None]:
def is_test_case_ancestor(record):
    parent_id = record.get("parentEventId")
    if parent_id is not None:
        ancestor = events_tree.get_ancestor_by_super_type(record, "Test Case", get_super_type)
        if ancestor:
            return True
    return False

def extract_failed_tags(record):
    # .get("parentEventId")
    new_obj = {
        "Event Id": record.get("eventId"),
        "Event Name": record.get("eventName"),
        "Test Run": events_tree.get_ancestor_by_super_type(record, "Test Run", get_super_type).get("eventName"),
        "Test Case": events_tree.get_ancestor_by_super_type(record, "Test Case", get_super_type).get("eventName"),
    }
    tags = []
    for content in record.get("body"):
        Utils.get_failed_tags(content, tags)
    new_obj.update({"tags": tags})
    return new_obj

data = events_with_attached_msgs\
        .map(extract_basic)\
        .filter(is_test_case_ancestor)\
        .filter(lambda record: record.get("super_type") == "Verification")\
        .filter(lambda record: record.get("status") == "FAILED")\
        .map(extract_failed_tags)

transform_data = []
for i in data:
    common = {
        "Event Name": i.get("Event Name"),
        "Test Run": i.get("Test Run"),
        "Test Case": i.get("Test Case"),
        "Event_Id": i.get("Event Id"),
    }
    for tag in i["tags"]:
        transform_data.append({**common, **tag})

# From pandas for comforted view
failed_verifications = DataFrame(data=transform_data)
failed_verifications

### [4] Plot all data aggregated by supertype into a single chart with filters

In [None]:
def time_calc(record: dict):
    r = record.copy()
    r['time'] = record['start_time']
    return r
        
df = Utils.aggregate_a_group_by_intervals(events_with_attached_msgs.map(extract_basic).map(time_calc), "super_type", "10s", filter=["Recon Event", "Verification"])

Utils.create_tick_diagram(df)  # The plot may not be shown if you have not restarted the notebook.

### [5] Latency density
Searches pairs messages with type NewOrderSingle and ExecutionReport. Then calculates latency and demonstrates on plot.

In [None]:
def is_new_single_order_or_execution_report(record):
    body = record.get("body")
    if body:
        message_type = body.get("metadata", {}).get("messageType")
        if message_type in ["NewOrderSingle", "ExecutionReport"]:
            return True
    return False

def clear_unnecessery_fields(record):
    new_obj = None
    body = record.get("body")
    if body:
        fields = body.get("fields", {})
        clOrdID = fields.get("ClOrdID", {}).get("simpleValue")
        ord_status = fields.get("OrdStatus", {}).get("simpleValue")
        
        metadata = body.get("metadata", {})
        message_type = metadata.get("messageType")
        session_alias = metadata.get("id", {}).get("connectionId", {}).get("sessionAlias")
        time = metadata.get("timestamp")
        
        new_obj = {
            "clOrdID": clOrdID,
            "OrdStatus": ord_status,
            "MessageType": message_type,
            "sessionAlias": session_alias,
            "time": time,
        }
    return new_obj

streams = set()
for record in events_tree.events.values():
    messages = record.get("attachedMessageIds")
    for msg in messages:
        streams.add(msg.split(":")[0])
        
messages = data_source.get_messages_from_data_provider(
    startTimestamp=START_TIME,
    endTimestamp=END_TIME,
    stream=list(streams)
)
data = messages\
        .filter(is_new_single_order_or_execution_report)\
        .map(clear_unnecessery_fields)

roundtrips = {}
latency = []

for record in data:
    msg_type = record.get("MessageType")
    clOrdID = record.get("clOrdID")
    
    if msg_type == "NewOrderSingle":
        if clOrdID not in roundtrips:
            roundtrips[clOrdID] = record.get("time")
    elif msg_type == "ExecutionReport":
        if record.get("OrdStatus") == '0':
            if clOrdID in roundtrips:
                current_latency = datetime.strptime(record.get("time"), "%Y-%m-%dT%H:%M:%S.%fZ") -  datetime.strptime(roundtrips[clOrdID], "%Y-%m-%dT%H:%M:%S.%fZ")
                latency.append({"latency": 1, "time": datetime.strptime(str(current_latency), "%H:%M:%S.%f")})

df = DataFrame(data=latency).set_index("time").groupby(Grouper(freq="10ms")).sum()
df.index = df.index.strftime("%S.%f")

Utils.create_tick_diagram(df)  # The plot may not be shown if you have not restarted the notebook.