# Extractor

In [None]:
from recon_lw.interpretation.field_extractor import BasicConverterExtractor
from recon_lw.reporting.known_issues import Issue

"""
Message example:
  {
     'KeyField1': 'KeyValue1',
     'KeyField2': 'KeyValue2.text',
     'Field1': '1000'
  }
"""


def stream1_extractors_mapping():
    return {
        'key1': 'KeyField1', # Default dictionary field extraction
        'key2': lambda x: x['KeyField2'].split('.'), # Lambda based field extraction
        'field1': BasicConverterExtractor(
            field_name='KeyField1',
            converter=lambda x, _: int(x) / 10
        )
    }

"""
Messages example:
MessageType1:
    {
      'mt': 'MessageType1',
      'key_field_1': 'KeyValue1.text',
      'key_field_2': 'KeyValue2',
      'field_1': '10'
    }
MessageType2:
    {
      'mt': 'MessageType2',
      'key_field_1': 'KeyValue1',
      'key_field_2': 'KeyValue2',
      'field_1': '1000'
    }
"""
def stream2_mt1_extractors_mapping():
    return {
        'key1': lambda x: x['key_field_1'].split('.'), # Default dictionary field extraction
        'key2': 'key_field_2', # Default dictionary field extraction
        'field1': lambda x: int(x['Field1'])
    }

def stream2_mt2_extractors_mapping():
    return {
        'key1': 'key_field_1',
        'key2': 'key_field_2', # Default dictionary field extraction
        'field1': lambda x: int(x['Field1']) // 10
    }

# Adapters

In [None]:
from recon_lw.interpretation.adapter import SimpleAdapterBuilder, CompoundAdapter, SimpleAdapter

# Using simple adapter with mapping
stream1_adapter = SimpleAdapterBuilder()\
    .with_mapping(stream1_extractors_mapping())\
    .build()

# Using compound adapter which allows to define different mapping for different message types
stream2_adapter = CompoundAdapter(
    adapters = [
        (lambda x, _: x['mt'] == 'MessageType1', SimpleAdapter(stream2_mt1_extractors_mapping())),
        (lambda x, _: x['mt'] == 'MessageType2', SimpleAdapter(stream2_mt2_extractors_mapping()))
    ]
)

# Messages Comparisons

In [None]:
from recon_lw.interpretation.field_checker import SimpleFieldChecker
from recon_lw.interpretation.check_rule import EqualFieldCheckRule, FieldToCheck

# Defining simple equal field check rule that compares fields for equality
check_rule = EqualFieldCheckRule(stream1_adapter, stream2_adapter)

# Defining list of fields that should be checked for matched messages
fields_to_check = [
    FieldToCheck("key1", check_rule),
    FieldToCheck("key2", check_rule),
    FieldToCheck("field1", check_rule)
]


checker_rules = {
    ftc.field: ftc.field_checker
    for ftc in fields_to_check
}

# Defining Field checker implementations which defines how fields should be compared
fields_checker = SimpleFieldChecker(checker_rules)

# Key Function

In [None]:
from typing import Dict, Any, Set, List, Optional
from recon_lw.interpretation.adapter import Adapter
from recon_lw.interpretation.filter import FilterChain, SessionAliasFilter
from recon_lw.matching.key_functions import BasicOriginalKeyFunctionProvider

# Defining key fields that can be used to find pairs between streams

key_fields = {'key1', 'key2'}

# Defining key function provider using common library
stream1_key_function = BasicOriginalKeyFunctionProvider(
    filter_chain=FilterChain()\
        .add_filter(SessionAliasFilter(whitelisted_aliases={'stream1_session_alias'})), # Filtering only message with session alias from whitelist
    key_fields=key_fields
)

# Defining key function for stream2

def stream2_key_extractor(adapter: Adapter, message: Dict[str, Any], fields: Set[str]) -> Optional[List[str]]:
    if message['sessionId'] != 'stream2_session_alias':
        return None
    collected_keys = []
    for field in fields:
        collected_keys.append(adapter.get(message, field))
    return [":".join(collected_keys)]


# Interpretation Function

In [None]:
from recon_lw.core.type.types import Message
from recon_lw.interpretation.interpretation_functions.event_enhancement import ReconEventChainEnhancement
from recon_lw.interpretation.interpretation_functions import BasicInterpretationFunctionProvider

def dummy_enhancement(event: dict, _: Message, __: Adapter):
    event['body']['dummy_enhancement'] = 'dummy_enhancement'

# Default intepritation function takes matched messages and categorises them by 4 types: 
#   - miss left - there is message in stream2 stream with certain key and there is no message with same key in stream1 stream within horizon delay
#   - miss right - there is message in stream1 stream with certain key and there is no message with same key in stream2 stream within horizon delay
#   - match - there is message in stream1 and stream2 stream with same key and all fields to compare were equal
#   - match_diff - there is message in stream1 and stream2 stream with same key and some fields were not equal
# In result pickle with events ( dict object ) will be published.
interpretation_function = BasicInterpretationFunctionProvider(
    recon_name="stream1_vs_stream2",
    original_stream_adapter=stream1_adapter,
    copy_stream_adapter=stream2_adapter,
    fields_checker=fields_checker,
    enrich_event_with_messages=False,
    recon_event_chain_enhancement=ReconEventChainEnhancement()
        .add_enhancement(dummy_enhancement)
)

# Matching Rule Definition

In [None]:
from recon_lw.matching.stream_matcher.one_many import OneManyMatcher
from recon_lw.core.rule.one_many import OneManyRuleConfig

# Defining OneManyRule with OneManyMatcher, key functions and interpretation functions defined above.
stream1_vs_stream2_rule = OneManyRuleConfig.from_defaults(
    name="stream1_vs_stream2",
    horizon_delay=30,
    match_function=OneManyMatcher(),
    intepretation_function=interpretation_function.provide(),
    first_key_func=stream1_key_function.provide(stream1_adapter),
    second_key_func=stream2_key_extractor
)

# Reading data

In [None]:
from th2_data_services.data import Data

stream1_data = Data.from_json("data/stream1.json")
stream2_data = Data.from_cache_file("data/stream2.pickle")

# Clearing recon events

In [None]:
from pathlib import Path
from recon_lw.recon_lw_entrypoint import execute_standalone

# Cleaning recon events folder
local_events_folder_path = Path(Path("events"))
print("Cleaning Local Events Folder")
[f.unlink() for f in local_events_folder_path.glob("*") if f.is_file()]

# Recon entrypoing

In [None]:
# Executing previously defined rule on stream1 and stream2 data objects using rule defined above.

execute_standalone(
    message_pickle_path=None,
    sessions_list=None,
    result_events_path=local_events_folder_path,
    rules={stream1_vs_stream2_rule.name: stream1_vs_stream2_rule},
    data_objects=[stream1_data, stream2_data]
)
print('DONE')

# Recon Context

In [None]:
from th2_data_services.config import options
from recon_lw.reporting.recon_context.context import ReconContext

# Defining recon context which knows how to extract fields from event and messages and also knows how to get recon events
recon_context = ReconContext(
    local_events_folder_path,
    options.mfr,
    options.efr
)

# General events statistics

In [None]:
from recon_lw.reporting.stats.stats import EventStatisticsTableReport

# General Statistics Report Generator
stats = EventStatisticsTableReport(recon_context)

In [None]:
# Events table per event type
"""
+--------+--------------------+----------+---------+
|        | Event Type         | Status   |   count |
+========+====================+==========+=========+
|        | BasicReconMatch    | True     |     120 |
+--------+--------------------+----------+---------+
|        | BasicReconMissLeft | False    |      12 |
+--------+--------------------+----------+---------+
|        | LwReconRule        | True     |       1 |
+--------+--------------------+----------+---------+
|        | Microservice       | True     |       1 |
+--------+--------------------+----------+---------+
| count  |                    |          |       4 |
+--------+--------------------+----------+---------+
| totals |                    | 3/1      |     134 |
+--------+--------------------+----------+---------+
"""
stats.get_event_type_report_table()

In [None]:
# Events table per event name
"""
+--------+-------------------------------------+----------+---------+
|        | Event Name                          | Status   |   count |
+========+=====================================+==========+=========+
|        | stream1_vs_stream2                  | True     |       1 |
+--------+-------------------------------------+----------+---------+
|        | stream1_vs_stream2[match]           | True     |     120 |
+--------+-------------------------------------+----------+---------+
|        | stream1_vs_stream2[miss_original]   | False    |      12 |
+--------+-------------------------------------+----------+---------+
|        | recon_lw 2024-03-19T16:08:04.991471 | True     |       1 |
+--------+-------------------------------------+----------+---------+
| count  |                                     |          |       4 |
+--------+-------------------------------------+----------+---------+
| totals |                                     | 3/1      |     134 |
+--------+-------------------------------------+----------+---------+
"""
stats.get_event_names_report_table()

# Match Diff Categorisation and Report

In [None]:
from recon_lw.reporting.known_issues import IssueStatus
from recon_lw.reporting.match_diff.categorizer.basic import BasicErrorCategoriser
from recon_lw.reporting.match_diff.categorizer.event_category.base import ErrorCategoryStrategy
from recon_lw.reporting.match_diff.categorizer.event_category.basic import BasicDiffCategoryExtractor, \
    BasicEventCategoryExtractor

# Diff strategy which returns match diff category name based on event and matched messages and known issues
diff_strategy = BasicDiffCategoryExtractor(
    known_issues={
        "stream1_vs_stream2 | field 'field1' '10' != '100": Issue(
            code='ISSUE-121',
            description='Invalid field1 value for mt2 in stream2.',
            status=IssueStatus.APPROVED,
            status_update_date='19.03.2024'
        )
    }
)

event_strategy = BasicEventCategoryExtractor()

# custom category extractor
def miss_category_extractor(recon_name: str, orig, copy, event: dict):
    if recon_name == 'stream1_vs_stream2':
        return f'stream1_vs_stream2 | {recon_name}'

categorizer_strategy = ErrorCategoryStrategy(
    match_extractor=event_strategy,
    match_diff_extractor=event_strategy,
    miss_left_extractor=miss_category_extractor,
    miss_right_extractor=miss_category_extractor,
    diff_category_extractor=diff_strategy
)

# Categoser which categorises events basing on strategies for different type of events.
categorizer = BasicErrorCategoriser(
    error_extractor_strategy=categorizer_strategy,
    recon_context=recon_context
)


In [None]:
from recon_lw.reporting.match_diff.viewer.content_provider.default import DefaultExampleContentProvider
from recon_lw.reporting.match_diff.viewer.style_provider.default import DefaultErrorExamplesStyleProvider
from recon_lw.reporting.match_diff.viewer.color_provider.default import DefaultCategoryColorProvider
from recon_lw.reporting.match_diff.viewer.category_displayer import ErrorExampleDisplayer, MatchDiffViewer

# Default Color Provider
color_provider = DefaultCategoryColorProvider()
# Non default color provider
not_default_color_provider = lambda category: "#000000" if 'field1' in category else "#ffffff"


# Defining object which can display error examples collected by error categoriser with default styles.
example_displayer = ErrorExampleDisplayer(
    category_color_provider=color_provider,
    error_examples_styles_provider=DefaultErrorExamplesStyleProvider()
)

# Running events categorisation
stats_context = categorizer.process_events(recon_context.get_recon_events())

# Message key extractor
def id(x):
    if x['sessionId'] == 'stream1_session_alias':
        return stream1_key_function.provide(stream1_adapter)(x)
    else:
        return stream2_key_extractor(stream2_adapter, x, key_fields)


# Defining match diff html report displayer to display categories found by categoriser
diff_viewer = MatchDiffViewer(
    recon_stats_context=stats_context,
    messages=stream1_adapter + stream2_adapter,
    data_objects=[stream1_data, stream2_data],
    message_business_ids_provider=id,
    message_content_provider=DefaultExampleContentProvider(),
    recon_context=recon_context,
    error_example_displayer=example_displayer
)
diff_viewer.display_report()

# Static misses categorisation

In [None]:
from recon_lw.reporting.missing_messages.viewer.missing_message import MissingMessagesDisplayer
from recon_lw.reporting.missing_messages.categorizer.matchers_impl import SimpleMatcherFlat
from recon_lw.reporting.known_issues import Issue, IssueStatus
from recon_lw.reporting.missing_messages.categorizer.rule import MissCategorizationRule
from recon_lw.reporting.missing_messages.categorizer.categorizer_impl import SimpleMissesCategorizer
from recon_lw.reporting.missing_messages.utils import MissedMessageHandler

# Defining categoriser
categorizer = SimpleMissesCategorizer(
    {
        'no_copy stream1_vs_stream2': [MissCategorizationRule(
            ticket=Issue(
                code="TEST-335",
                description="stream2 missing msgs with field 'field1' value greater than 1000 due {some reason}.",
                status=IssueStatus.APPROVED,
                status_update_date='2024.01.21'
            ),
            handler=lambda x: int(x['body']['messages'][0].get('field')) > 1000 
        )]
    }
)

# Defining handler
miss_messages_handler = MissedMessageHandler(
    recon_context=recon_context,
    miss_categoriser=categorizer
)

# Extracting missing messages and categories using categoriser
messages, classes = miss_messages_handler.categorise_and_filter(stream1_data + stream2_data)

# Showing categories in table view.
report = MissingMessagesDisplayer(classes)
report.display(messages)

# Dynamic misses and match diffs categorisation

In [None]:
def miss_orig_event_category(event) -> str:
    message = event['body']['messages'][0]
    return f'stream1 missing messages with key field {message["key_field_1"]} and "field1" {message["field1"]}'

def miss_copy_event_category(event) -> str:
    # your logic goes here
    return 'UNCATEGORISED'

def match_diff_event_category(event) -> str:
    stream1_msg = event['body']['messages'][0]
    stream2_msg = event['body']['messages'][1]
    # your logic goes here
    return 'UNCATEGORIZED'

In [None]:
# Displaing miss orig categories
from th2_data_services.utils.event_utils.totals import get_category_totals2
from th2_data_services.utils.category import Category
categories = [
    Category("recon_name", lambda e: e['recon_name']),
    Category("type", lambda e: e['eventType']),
    Category("miss type", miss_orig_event_category)
]
get_category_totals2(recon_context.get_recon_events().filter(lambda e: e['eventType'] == 'BasicReconMissLeft'), categories)

In [None]:
# Displaing miss copy categories
categories = [
    Category("recon_name", lambda e: e['recon_name']),
    Category("type", lambda e: e['eventType']),
    Category("miss type", miss_copy_event_category)
]
get_category_totals2(recon_context.get_recon_events().filter(lambda e: e['eventType'] == 'BasicReconMissRight'), categories)

In [None]:
# Displaing match diff categories
categories = [
    Category("recon_name", lambda e: e['recon_name']),
    Category("type", lambda e: e['eventType']),
    Category("miss type", miss_copy_event_category)
]
get_category_totals2(recon_context.get_recon_events().filter(lambda e: e['eventType'] == 'BasicReconMatch' and not e['successful']), categories)