Skip to content

Script: basic actions(Python)

alexander-zhilov edited this page Oct 6, 2021 · 1 revision

1. CommonFactory and Stubs

To work with external th2 box from script we should call a method from Stub object. All stub objects can be generated using special class in th2-common - CommonFactory which describes connections between script and th2 boxes.

We should provide to CommonFactory path to following configurations:

  • grpc.json with information about host and ports of act and check1 boxes. Data source: Kubernetes Dashboard → Services or kubectl get services

Example:

{
  "services": {
    "Act": {
      "service-class": "ActService",
      "endpoints": {
        "act": {
          "host": "<kubernetes cluster hostname>",
          "port": <external port of act pod>
        }
      },
      "strategy": {
        "name": "robin",
        "endpoints": ["act"]
      }
    },
    "Check1": {
      "service-class": "Check1Service",
      "endpoints": {
        "check1": {
          "host": "<kubernetes cluster hostname>",
          "port": <external port of check1 pod>
        }
      },
      "strategy": {
        "name": "robin",
        "endpoints": ["check1"]
      }
    }
  }
}
  • mq.json with information about routing-key from script-entry-point to estore. Data source: Kubernetes Dashboard → Config Maps → script-entry-point-app-config or RabbitMQ estore-pin queue.

Example:

{
  "queues": {
    "event-store-pin": {
      "attributes": [
        "event",
        "publish"
      ],
      "exchange": "<queue exchange. demo_exchange by default>",
      "filters": [],
      "name": "<routing-key from script-entry-point to estore>",
      "queue": "not_necessary"
    }
  }
}

Example:

{
  "host": "<kubernetes cluster hostname>",
  "vHost": "<vHost from RabbitMQ. Equal to namespace name by default.>",
  "port": "<external port of RabbitMQ>",
  "username": "<RabbitMQ username>",
  "password": "<RabbitMQ password>",
  "exchangeName": "<queue exchange. demo_exchange by default>"
}

Or you can get these configs automatically using kubernetes API: https://github.com/th2-net/th2-documentation/wiki/Connecting-external-box-to-cluster-using-kubectl

Then we should generate Stub objects for act, check1 and estore from this CommonFactory.

Python example:

from th2_common.schema.factory.common_factory import CommonFactory
from th2_grpc_act_template.act_template_service import ActService
from th2_grpc_check1.check1_service import Check1Service

factory = CommonFactory(config_path="./configs")
grpc_router = factory.grpc_router
act = grpc_router.get_service(ActService)
check = grpc_router.get_service(Check1Service)
estore = factory.event_batch_router

act, check and estore are stubs used for direct communication with th2 boxes.

2. Message Creation

In this part, we look at the creation of a message object for subsequent sending to the system. For future use, we must wrap our message in a Message object. Follow these steps to create a Message:

  1. Create instance of class Message from th2-grpc-common
  2. Fill field metadata with instance of class MessageMetadata from th2-grpc-common. Be sure to fill in the field message_type, because a connectivity encode our message based on the dictionary and on message_type.
  3. Fill fields with a dictionary where the keys are strings with the name of the field, and the values are instances of class Value.
from th2_grpc_common.common_pb2 import Message, MessageMetadata, ConnectionID

message = Message(
  metadata=MessageMetadata(message_type='NewOrderSingle', 
  id=MessageID(connection_id=ConnectionID(session_alias=session_alias))), 
  fields=message_fields)

Let’s take a closer look at the dictionary message_fields:

from th2_grpc_common.common_pb2 import Value

{  'ClOrdID': Value(simple_value="NewOrder1"),
'SecurityID': Value(simple_value="5221002222"),
'SecurityIDSource': Value(simple_value="8"),
'OrdType': Value(simple_value="2"),
'Side': Value(simple_value="1")}

The dictionary contains pairs, where the keys are strings with the name of the field, and the values are grpc objects named Value. In the example above, we see simple values without any repeating groups. Below, we can take a closer look at repeating groups in the grpc message:

from th2_grpc_common.common_pb2 import Value, ListValue, Message, MessageMetadata

{'TradingParty': Value(
  message_value=Message(
    fields={'NoPartyIDs': Value(
      list_value=ListValue(
        values=[
Value(message_value=Message(
          metadata=MessageMetadata(messageType='TradingParty_NoPartyIDs'), 
          fields={
            'PartyID': Value(simple_value='Trader1'),
            'PartyIDSource': Value(simple_value="D"),
            'PartyRole': Value(simple_value="76")
            })), 
Value(message_value=Message(
          metadata=MessageMetadata(messageType='TradingParty_NoPartyIDs'), 
          fields={
            'PartyID': Value(simple_value='0'),
            'PartyIDSource': Value(simple_value="P"),
            'PartyRole': Value(simple_value="3")
            })),   
# and other repeating groups 
        ]))}))}

This part is quite complex, but let’s figure it out and start from the inner wrap. We can see a dictionary that contains simple values of one of the repeating groups in NoPartIDs. We must wrap this dictionary in a Message with the type TradingParty_NoPartyIDs, pass it to a Value object and pass this object to a list. The list should contain all repeating groups wrapped this way. After that, we should pass the list to the ListValue object and then pass this object to a Value object as a list_value. This Value object should be added to another dictionary as a value pair with the 'NoPartyIDs' key and pass the dictionary as a Message without a message type to a Value object.

After we create a grpc message, we can send it to act.

3. Act Component and Message Sending

To send messages in the system, we should send a grpc request to the act component. act sends our message to the connectivity queue and waits for a response message based on the act rule that we use in our request. For example: if we send a NewOrderSingle message and use the corresponding act rule, then act will wait for the ExecutionReport message as a response and return it as a result of rule execution. We can also send any message to the connectivity queue without waiting for a response using the sendMessage method. Follow these steps to send a Message:

  1. Create an instance of class PlaceMessageRequest from th2-grpc-act-template.
  2. Create an instance of class ActStub using the CommonFactory.
  3. Call a suitable method of ActStub and pass the PlaceMessageRequest to it.

Let's look closer at these steps on the example:

from th2_grpc_act_template.act_template_pb2 import PlaceMessageRequest

request_to_act = PlaceMessageRequest(
  message=message, 
  connection_id=connectivity, 
  parent_event_id=event_id, 
  description=send_description)

Parameters from the example above:

  1. message - in this parameter we should pass our message created in section 2.
  2. connection_id(DEPRECATED) - we should pass the instance of ConnectionID class from th2-grpc-common: ConnectionID(session_alias=connectivity) where connectivity is a string with the name of the connectivity component that we want to use.
  3. parent_event_id - in this parameter, we should pass the instance of EventID class from th2-grpc-common with a uuid.EventID(id=str(uuid.uuid1())) EventID is an important part of report creation, and we should use the same EventID for the actions that we want to put together in a report.
  4. description - in this parameter, we should pass a string with the description of this action for the report. It is recommended that business logic should be described.

Then we should call a method that corresponds to the act rule. In the example above, we call the placeOrderFIX rule that should wait for the ExecutionReport message and return it to us.

act_response = act.placeOrderFIX(request_to_act)

As a result of placeOrderFIX, we get a response from act. This response can contain a message from the system that matched the rule used. If we used the sendMessage rule, then the response will contain the information on the message we sent being added to the queue.

4. Message Filter Creation

In this part, we will review message filters. Filter creation for the check1 and message creation for act are similar. But with filters, we should use other objects.

Follow these steps to create a RootMessageFilter:

  1. Create an instance of class RootMessageFilter.
  2. Fill field messageType with string value of expected message type.
  3. Fill field message_filter with instance of class MessageFilter
  4. Fill fields fields in the instance of class MessageFilter with a dictionary where the keys are strings with the name of the expected field, and the values are instances of class ValueFilter.
from th2_grpc_common.common_pb2 import RootMessageFilter
from th2_grpc_common.common_pb2 import MessageFilter

filter = RootMessageFilter(
  messageType='ExecutionReport',
  message_filter=MessageFilter(
    fields=filter_fields)

This part is similar and filter_fields is a dictionary, but now we should use the ValueFilter component instead of Value. Let's move on to the dictionary.

from th2_grpc_common.common_pb2 import ValueFilter, FilterOperation

{'ClOrdID': ValueFilter(simple_filter='clordid', key=True),
'Side': ValueFilter(simple_filter='1'),
'Price': ValueFilter(operation=FilterOperation.NOT_EMPTY),
'LeavesQty': ValueFilter(simple_filter='0', operation=FilterOperation.NOT_EQUAL),
'OrderID': ValueFilter(simple_filter=act_response.response_message.fields['OrderID'].simple_value),
'AbstractEmptyList': ValueFilter(list_filter=ListValueFilter(values=[ValueFilter(operation=FilterOperation.EMPTY)]))}

In the example above we can see some variation of the field checks.

  1. We mark the field 'ClOrdID' as key, that means that check1 will make a comparison only if the 'ClOrdID' field values are the same. All other fields are not keys, that means if the value differs from what is expected, we can look at the comparison table in the report.
  2. We expect a ‘Side’ field with value '1'.
  3. We expect non-empty value in the Price field. (similar with Sailfish *)
  4. We expect that the LeavesQty field is not '0'.
  5. We expect that the OrderID field is equal to the OrderID field in the message that comes as a response from act. Please note that using this method for same message verification (if the message from act response is the same message that we need to verify) is unacceptable.
  6. We expect that the AbstractEmptyList is a empty list.

Let’s look at the repeating group description in the filter.

from th2_grpc_common.common_pb2 import ValueFilter, MessageFilter, ListValueFilter, FilterOperation

{'TradingParty': ValueFilter(
  message_filter=MessageFilter(
    fields={'NoPartyIDs': ValueFilter(
      list_filter=ListValueFilter(
        values=[
ValueFilter(
          message_filter=MessageFilter(
            fields={
              'PartyID': ValueFilter(simple_filter='Trader1'),
              'PartyIDSource': ValueFilter(simple_filter="D"),
              'PartyRole': ValueFilter(simple_filter="76")
              })), 
ValueFilter(
          message_filter=MessageFilter(
            fields={
              'PartyID': ValueFilter(simple_filter='0'),
              'PartyIDSource': ValueFilter(simple_filter="P"),
              'PartyRole': ValueFilter(simple_filter="3")
              })),
 # and other repeating groups 
        ]))}))}

It looks similar to a message object, but we don't need to set a message type for repeating groups. After creating the filter object, we can use it as part of the request to check1.

5. Check1 and Several Ways to Verify Messages

check1 receives requests from the script and performs verification of messages in the queue based on the filter and checkpoints we send. Follow these steps to send the verification request:

  1. Create an instance of class CheckRuleRequest/CheckSequenceRuleRequest from th2-grpc-check1.
  2. Create an instance of class VerifierStub using the CommonFactory.
  3. Call a suitable method of VerifierStub and pass the CheckRuleRequest/CheckSequenceRuleRequest to it.

submitCheckRule is a simple message verification method of VerifierStub. We should create CheckRuleRequest for this method.

from th2_grpc_check1.check1_pb2 import CheckRuleRequest

check_request = CheckRuleRequest(
  connectivity_id=connectivity,
  root_filter=filter,
  checkpoint=act_response.checkpoint_id,
  timeout=3000, 
  parent_event_id=event_id,
  description=verify_description)

Let’s look closer at the parameters from the example above.

  1. root_filter - in this parameter, we should pass our filter created in section 4.
  2. connectivity_id - we should pass ConnectionID object: ConnectionID(session_alias=connectivity) where connectivity is a string with the name of the connectivity component that we want to use.
  3. checkpoint - we should pass the checkpoint, starting from which we will search and verify messages. We can get this checkpoint from the act response.
  4. timeout - time in milliseconds, during which we will look for our message.
  5. parent_event_id - in this parameter, we should pass the instance of EventID class from th2-grpc-common with a uuid. EventID(id=str(uuid.uuid1())) EventID is an important part of report creation, and we should use the same EventID for the actions that we want to put together in a report.
  6. description - in this parameter, we should pass a string with the description of this action for the report. It is recommended that business logic should be described.

submitCheckSequenceRule is a method that combines simple message verification and verification of the message order with the ability to check for extra messages. Please note that submitCheckSequenceRule already performs the functions of submitCheckRule, so using both functions at once is not needed. We should create CheckSequenceRuleRequest for this method.

from th2_grpc_check1.check1_pb2 import CheckSequenceRuleRequest

check_sequence_request = CheckSequenceRuleRequest(
  pre_filter=preFilter,
  root_message_filters=messageFilters,
  checkpoint=act_response.checkpoint_id,
  timeout=timeout,
  connectivity_id=connectivity,
  parent_event_id=parentEventId,
  description=verify_description,
  check_order=checkorder)

Some parameters have already been mentioned previously.

  1. pre_filter - in this parameter, we should pass the PreFilter object with a dictionary of fields and expected values.
from th2_grpc_check1.check1_pb2 import PreFilter
from th2_grpc_common.common_pb2 import ValueFilter, MessageFilter, FilterOperation

PreFilter(
  fields={
    'header': ValueFilter(
      message_filter=MessageFilter(
        fields={'MsgType': ValueFilter(
          simple_filter='0',operation=FilterOperation.NOT_EQUAL)}))})
#In the example above, we pass the condition, under which we expect that message type is not Heartbeat.
  1. root_message_filters - in this parameter, we should pass a list of filters that we examined in section 4. If we use ordering, then the message filters in this list should be in the same order as the one in which we expect the messages from the system. Any message that passes PreFilter and is not described in this list will fail the verification.
  2. check_order - in this parameter, we should pass the boolean expression True, if we want to check the message order, or False if not.

After we examined the components of the CheckSequenceRule request, let's take a look at the logic of this rule. CheckSequenceRule looks at the queue that we indicated in the connectivity_id, from the checkpoint to the end of the timeout. All messages in this interval are filtered through the pre_filter and then the filtered message selection is compared with messages in message_filters. If check_order is True, then the order of messages in the selection (the order in which they came from the system) must match the order of filters in message_filters.

Then we should call a method from Check1Stub that corresponds to the check1 rule. submitCheckRule for CheckRuleRequest and submitCheckSequenceRule for CheckSequenceRuleRequest.

check1_response = check.submitCheckRule(check_request)
#or
check1_response = check.submitCheckSequenceRule (check_sequence_request)

We can find in the response the status of the request (not the verification status). The verification status will be displayed in the GUI. Also we can get chain_id from response.

6. estore and report creation

estore is a th2 component, with one of its functions be the report creation. Using this component, we can create reports with act and check1 events and divide these events into sub-reports. To create a report or a sub-report, follow these steps:

  1. Create instance of class Event.
  2. Create instance of class EventBatch using Event.
  3. Create instance of class EventBatchRouter using CommonFactory.
  4. Call a send method from EventBatchRouter and pass EventBatch to it.
from th2_grpc_common.common_pb2 import Event, EventBatch

event = Event(
        id=event_id,
        name=report_name,
        status=status,
        body=body,
        start_timestamp=start_timestamp,
        end_timestamp=current_timestamp,
        parent_id=parent_id)
event_batch = EventBatch(parent_event_id=event_id, events=[event])

Let’s take a closer look at each parameter of Event.

  1. id - in this parameter, we should pass the EventID object with a uuid. EventID(id=str(uuid.uuid1())) We have already used this value for act and check1. Now, using this id, we can store events from act and check1 in the report.
  2. name - we should pass a string with a name of an event that will store all actions with the same id.
  3. start_timestamp - we should pass a grpc Timestamp object with the time of the script execution start in the epoch format.
  4. end_timestamp - we should pass a grpc Timestamp object which contains the time of the end of the script execution in the epoch format.
  5. parent_id - it is an optional parameter. If we want to create a sub-report, then we should use an object like the one we used in the id parameter.

And parameters of EventBatch.

  1. events - is a list of instances of class Event.
  2. parent_event_id - is an instance of EventID class.

When creating an EventBatch, remember that it must match one of the next formats:

  • The EventBatch hasn’t got a parent_event_id and has one Event. This event will be stored as a single event, root, or intermediate depending on the presence of the field.
  • The EventBatch has a parent_event_id and one or several events. These events will be stored as a batch event. EventBatch refs to outside event_id via parent_event_id value. Every Event in that batch must have a parent_id which must ref to an Event in this batch or a parent_event_id in the EventBatch . At least one event in the batch must refer to the parent_event_id of the EventBatch.

We should pass a EventBatch object with the Event objects with parameters to a send method.

estore.send(event_batch)

The EventBatch will be sent using MQ, which means that the estore does not imply a reply to the request.

Clone this wiki locally