## MicroLearning -> [VMLI02](https://confluence.gft.com/display/APACD/MicroLearnings)


> Description: Create a PIB with clientid-request id set to "some_unique_value". Can this request_id-client_id be reused for another PIB? What will happen if you try to do it? 
> 
> Purpose of this MicroLearning is to learn that,
> 1. The CreatePostingInstructionBatchAsync RESTful API endpoint is not idempotent and will create multiple AsyncOperation resources on duplicate calls. (This RESTful API endpoint is not supported by TM in production. Use Kafka API to post PIBs)
> 2. The Kafka API `vault.core.postings.requests.v1` is not idempotent, and will allow you to send duplicate requests.
> 3. For Vault - A request_id has to be unique within a client_id namespace. Client_id is same as [Posting API Client](https://docs.thoughtmachine.net/vault-core/4-6/EN/api/core_api/#Postings_API_clients-PostingsAPIClient).
> 3. Those duplicate PIB requests to Vault can be found on the following topic - `vault.core.postings.requests.v1` which is a Kafka topic on which Vault receives PIB requests from all Posting API Clients.
> 4. The Postings API is idempotent to duplicate PostingInstructionBatchRequest, and will respond idempotently to PostingInstructionBatchRequest messages with the same <client_id-request_id>. That means, 
> 5. Once the first PIB has been processed (ACCEPTED OR REJECTED by Vault), Vault will write an event to `vault.api.v1.postings.posting_instruction_batch.created` topic. For the second duplicate PIB, Vault will not process it, however, will stream the same event again to this topic, as it did the first time.
> 6. Vault will also produce an event to the response topic associated with the client_id in the PIB request. This event will also be produced twice, once for the first PIB, then for the duplicate PIB.
> 7. BEWARE - in case the client_id is invalid, or empty; or the PIB request cannot be successfully parsed; Vault does not process such PIBs and will route those PIBs to a DLQ topic - `vault.core.postings.requests.dlq.v1`.
> 8. Only ACCEPTED or REJECTED PIBs will find their way to Vault Postings Ledger, and can be viewed thru Ops Dashboard.
> 

#### Set up a Kafka Producer

In [9]:
import json
import os
from dataclasses import dataclass
from typing import Dict, Callable, Any
from kafka import KafkaConsumer, KafkaProducer
import requests
import logging

CONTENT_TYPE = 'application/json'

logging.basicConfig(level=logging.INFO)
KAFKA_ENDPOINT = 'bootstrap.kafka.partner-eph-6.tmachine.io:443'
CORE_API_URL = "https://core-api.partner-eph-6.tmachine.io"
WORKFLOW_API_URL = "https://workflows-api.partner-eph-6.tmachine.io"
X_AUTH_TOKEN = "A0006786022557907328897!rI1MCYa35TdEFt3kka7xh5edAoXEHfXGzntcA4vSxAseR+Cu+rseyz+j9Ql4WffZD8IsAZ9DUKDttPlqvSNsrfZd6To="

PIB_URL_BATCH_GET = "/v1/posting-instruction-batches:batchGet"
PIB_TOPIC = 'vault.api.v1.postings.posting_instruction_batch.created'
PIB_REQUEST_TOPIC = 'vault.core.postings.requests.v1'
GET_PAC = "/v1/postings-api-clients"

class TMApiClient:
    @dataclass
    class TMConnectionDetails:
        core_api_url: str
        workflow_api_url: str
        token: str
        kafka_url: str
        kafka_security_protocol: str = "SSL"
        content_type: str = CONTENT_TYPE

    def __init__(self, conn: TMConnectionDetails):
        self.connection_details = conn
        self.default_headers = {
            'Content-Type': conn.content_type,
            'X-Auth-Token': conn.token
        }

    def call_core_api(self, endpoint: str, query_params: Dict[str, str], method: str = 'get', body: Any = None,
                      headers: Dict[str, str] = {}):
        return self.__call_api(url=self.connection_details.core_api_url + endpoint,
                               query_params=query_params,
                               method=method,
                               body=body,
                               extra_headers=headers)

    def call_workflow_api(self, endpoint: str, query_params: Dict[str, str], method: str = 'get', body: Any = None,
                          headers: Dict[str, str] = {}):
        return self.__call_api(url=self.connection_details.workflow_api_url + endpoint,
                               query_params=query_params,
                               method=method,
                               body=body,
                               extra_headers=headers)

    def __call_api(self, url: str, query_params: Dict[str, str], method: str, body: Any, extra_headers: Dict[str, str]):
        response = requests.request(method=method,
                                    url=url,
                                    params=query_params,
                                    data=body,
                                    headers={**self.default_headers, **extra_headers}
                                    )
        if not response.ok:
            raise Exception('Failed getting response for request %s, %s %s' % (
                str(response.request), response.status_code, response.text))
        return response.json()

    def publish_kafka_message(self, topic: str, key: str, value: Any) -> None:
        """
        Publish message to particular kafka topic.
        :param topic: Topic to where send the message
        :param key: Key of the message converted to bytes
        :param value: Value of the message, that can be converted to json and the to bytes
        :return: None
        """
        producer = KafkaProducer(bootstrap_servers=[self.connection_details.kafka_url],
                                 security_protocol=self.connection_details.kafka_security_protocol,
                                api_version=(0, 8, 2))

        producer.send(topic, key=key.encode('utf-8'), value=json.dumps(value).encode('utf-8'))

    def subscribe_to_kafka_topic(self,
                                 topic: str,
                                 group_id: str = str(os.getpid()),
                                 offset: str = 'earliest',
                                 callback: Callable[[str, str], None] = lambda *args: None,
                                 ) -> None:
        """
        Subscribe particular kafka topic to receive messages in json format encoded in utf-8
        :param topic: topic name to subscribe the consumer
        :param group_id: configure group id of consumers. Default value is process Id.
        :param offset: configure the offset for consumer between earliest,latest or exact offset value.
        Default value is earliest.
        :param callback: function that consumes message as arguments of key and value.
        :return: None
        """
        consumer = KafkaConsumer(topic,
                                 group_id=group_id, auto_offset_reset=offset,
                                 bootstrap_servers=[self.connection_details.kafka_url],
                                 security_protocol=self.connection_details.kafka_security_protocol,
                                 api_version=(0, 8, 2))
        logging.debug("Consumer for topic {} has started".format(topic))
        for record in consumer:
            logging.debug("%s:%d:%d: key=%s value=%s" % (record.topic, record.partition,
                                                         record.offset, record.key,
                                                         record.value))
            callback(record.key.decode('utf8'), json.loads(record.value.decode('utf8').replace("'", '"')))

details = TMApiClient.TMConnectionDetails(core_api_url=CORE_API_URL,
                                             workflow_api_url=WORKFLOW_API_URL,
                                             token=X_AUTH_TOKEN,
                                             kafka_url=KAFKA_ENDPOINT)

#### Publish a posting

> client_id = vault.core.postings.requests.dlq.v1, and request_id = c1f70e14-9199-4370-a57d-9512ce02b107

In [10]:
client = TMApiClient(details)

request = json.loads("""{
  "request_id": "c1f70e14-9199-4370-a57d-9512ce02b107",
  "posting_instruction_batch": {
    "client_id": "DlscPostingClientID",
    "client_batch_id": "c1f70e14-9199-4370-a57d-9512ce02b107",
    "posting_instructions": [{
      "client_transaction_id": "c1f70e14-9199-4370-a57d-9512ce02b107",
      "instruction_details": null,
      "override": null,
      "transaction_code": null,
      "outbound_authorisation": null,
      "inbound_authorisation": null,
      "authorisation_adjustment": null,
      "settlement": null,
      "release": null,
      "inbound_hard_settlement": {
        "amount": "40.00",
        "denomination": "GBP",
        "target_account": {
          "account_id": "36744190-d5be-2f8b-9490-8f9189464706"
        },
        "internal_account_id": "e2e_A_1",
        "advice": true
      },

      "outbound_hard_settlement": null,
      "custom_instruction": null
    }],

    "batch_details": {
      "force_override": "true"
    },
    "value_timestamp": null,
    "api_type": "POSTINGS_API_TYPE_HIGH_PRIORITY"
  }
}
""")

client.publish_kafka_message(PIB_REQUEST_TOPIC, "40d78ae3-24f8-5393-251d-6ac5811c0433", request)

INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=bootstrap.kafka.partner-eph-6.tmachine.io:443 <connecting> [IPv4 ('172.16.0.1', 443)]>: connecting to bootstrap.kafka.partner-eph-6.tmachine.io:443 [('172.16.0.1', 443) IPv4]
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=bootstrap.kafka.partner-eph-6.tmachine.io:443 <handshake> [IPv4 ('172.16.0.1', 443)]>: Loading system default SSL CAs from DefaultVerifyPaths(cafile='/Library/Frameworks/Python.framework/Versions/3.9/etc/openssl/cert.pem', capath=None, openssl_cafile_env='SSL_CERT_FILE', openssl_cafile='/Library/Frameworks/Python.framework/Versions/3.9/etc/openssl/cert.pem', openssl_capath_env='SSL_CERT_DIR', openssl_capath='/Library/Frameworks/Python.framework/Versions/3.9/etc/openssl/certs')
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=bootstrap.kafka.partner-eph-6.tmachine.io:443 <handshake> [IPv4 ('172.16.0.1', 443)]>: Connection complete.
INFO:kafka.conn:<BrokerConnection node_id=0 host=broker-0.

#### Publish the same posting again

In [None]:
client = TMApiClient(details)
request = json.loads("""{
  "request_id": "c1f70e14-9199-4370-a57d-9512ce02b107",
  "posting_instruction_batch": {
    "client_id": "DlscPostingClientID",
    "client_batch_id": "c1f70e14-9199-4370-a57d-9512ce02b107",
    "posting_instructions": [{
      "client_transaction_id": "c1f70e14-9199-4370-a57d-9512ce02b107",
      "instruction_details": null,
      "override": null,
      "transaction_code": null,
      "outbound_authorisation": null,
      "inbound_authorisation": null,
      "authorisation_adjustment": null,
      "settlement": null,
      "release": null,
      "inbound_hard_settlement": {
        "amount": "40.00",
        "denomination": "GBP",
        "target_account": {
          "account_id": "36744190-d5be-2f8b-9490-8f9189464706"
        },
        "internal_account_id": "e2e_A_1",
        "advice": true
      },

      "outbound_hard_settlement": null,
      "custom_instruction": null
    }],

    "batch_details": {
      "force_override": "true"
    },
    "value_timestamp": null,
    "api_type": "POSTINGS_API_TYPE_HIGH_PRIORITY"
  }
}
""")

client.publish_kafka_message(PIB_REQUEST_TOPIC, "40d78ae3-24f8-5393-251d-6ac5811c0433", request)

#### Confirm that the above posted PIB was added to postings requests topic i.e. 'vault.core.postings.requests.v1'

In [7]:
from pymongo import MongoClient
from pprint import pprint

# Connect to MongoDB
client = MongoClient('mongodb://localhost:27017')
db = client['vault_events']
collection = db['postings_requests']

# Define the query
query = { "posting_instruction_batch.posting_instructions.client_transaction_id": "c1f70e14-9199-4370-a57d-9512ce02b107" }

# Fetch the documents
result = collection.find(query)

# Iterate over the fetched documents
for document in result:
    pprint(document)

# Close the MongoDB connection
client.close()


{'_id': ObjectId('64a7b2b6612eee1335bdfef1'),
 'posting_instruction_batch': {'api_type': 'POSTINGS_API_TYPE_HIGH_PRIORITY',
                               'batch_details': {'force_override': 'true'},
                               'client_batch_id': 'c1f70e14-9199-4370-a57d-9512ce02b107',
                               'client_id': 'DlscPostingClientID',
                               'posting_instructions': [{'authorisation_adjustment': None,
                                                         'client_transaction_id': 'c1f70e14-9199-4370-a57d-9512ce02b107',
                                                         'custom_instruction': None,
                                                         'inbound_authorisation': None,
                                                         'inbound_hard_settlement': {'advice': True,
                                                                                     'amount': '40.00',
                                                    

#### Confirm that the above posted PIB was added to response topic mapped to the Posting API CLient