In [1]:
import argparse
import json
import os
import logging
import yaml
from kafka import KafkaProducer
from schema import Schema, SchemaError
log = logging.getLogger()
logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO"))

In [2]:
def add_value(key):
    match key:
        case 'technology':
            print(key)
            
        case 'hostedAt':
            print(key)

In [3]:
from schema import Schema, SchemaError, Optional, Hook, Or

schema_val = {
    "name": str,
    "description": str,
    "status": str,

    "consumers": {
        "name": str,
        "description": str,
        "type" : str
    },
    "containers": {
        "name": str,
        "sysnonyms": str,
        "description": str,
        Optional("technology", default= lambda : add_value('technology')): str,
        "parentSystem": str,
        "ciDataOwner": str,
        "productOwner": str,
        "applicationType": Or("Business", "Customer Facing", "External Service", "Infrastructure", "Interface", "Office", "Tool", "Unknown"),
        Optional("hostedAt", default = lambda : add_value('hostedAt')): Or("Amazon Web Services (AWS Cloud)", "AT&T", "Azure CF1", "Azure CF2", "Azure Cloud", "DXC", "Equinix", "Google Cloud Platform", "Hybric", "Inlumi", "Local server", "Multi-Cloud", "Not Applicable", "Other", "Salesforce", "ServiceNow", "Solvinity", "Unit4", "Unknown", "User device", "Azure"),
        "deploymentModel": Or("BPO", "CaaS", "IaaS", "On-Premise", "PaaS", "SaaS"),
        "personalData": bool,
        "confidentiality": str,
        "mcv": Or("Highly business critical", "Business critical", "Not business critical", "Not applicable"),
        "maxSeverityLevel": Or(1,2,3,4, "Not applicable"),
        Optional("sox", default= lambda : add_value('sox')): bool,
        Optional("icfr", default= lambda : add_value('icfr')): bool,
        "assignementGroup": str,
        "operationalStatus": Or("Pipelined", "Operational", "Non-Operational", "Submitted for decommissioning", "Decommissioned", "In decommissioning process"),
        "environments": Or("nl", "be"),
        "relationships": {
            "type": str,
            "container": {
                "name": str,
            },
        },
        "components": {
            "name": str,
            "description": str,
            "exposedAPIs": {
                "name": str,
                "description": str,
                "type": str,
                "status": str,
            },
            "consumedAPIs": {
                "name": str,
                "description": str,
                "status": str
            }
        },
    }
}

In [4]:
def validate_yaml(yaml_data):
    #schema = eval(open('./schema.yml', 'r').read())
    validator = Schema(schema_val)
    try:
        validator.validate(yaml_data)
        print('YML valid')
    except SchemaError as se:
        print(se)

In [5]:
def load_doc():
    with open('./test.yml', 'r', encoding='utf8') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as e:
            print(e)

In [6]:
import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
avro_schema = avro.schema.parse(open("avro_schema.avsc", "rb").read())

writer = DataFileWriter(open("users.avro", "wb"), DatumWriter(), avro_schema)
writer.append(load_doc())
writer.close()

reader = DataFileReader(open("users.avro", "rb"), DatumReader())

for item in reader:
    print(item)

os.remove("users.avro")


{'name': 'poc-git-to-cmdb', 'description': 'POC to send information about the app to Kafka', 'status': 'pipelined', 'consumers': {'name': 'Developer', 'description': 'A developer who keeps properties file up to date', 'type': 'PERSON'}, 'containers': {'name': 'poc-git-tocmdb', 'sysnonyms': 'poc-git-to-kafka-cmdb-sync', 'description': 'POC', 'technology': 'Kafka', 'parentSystem': 'CMDB', 'ciDataOwner': 'Aede van der Weij', 'productOwner': 'Thomas de Vries', 'applicationType': 'Tool', 'hostedAt': 'Azure', 'deploymentModel': 'On-Premise', 'personalData': False, 'confidentiality': 'Internal use', 'mcv': 'Not business critical', 'maxSeverityLevel': 4, 'sox': False, 'icfr': False, 'assignementGroup': 'AMS_ITOnline_L3_SRE_Infra', 'operationalStatus': 'Pipelined', 'environments': 'nl', 'relationships': {'type': 'relBack', 'container': {'name': 'Container Name'}}, 'components': {'name': 'Component name', 'description': 'what the system does', 'exposedAPIs': {'name': 'Unique API name', 'descript

In [7]:
doc = load_doc()
validate_yaml(doc)

YML valid


In [25]:
schema = {
  "type" : "record",
  "namespace" : "com.test.avro",
  "name" : "SystemModel",
  "fields" : [ {
    "name" : "name",
    "type" : "string"
  }, {
    "name" : "description",
    "type" : "string"
  }, {
    "name" : "status",
    "type" : "string"
  }, {
    "name" : "consumers",
    "type" : {
      "type" : "record",
      "name" : "consumers",
      "fields" : [ {
        "name" : "name",
        "type" : "string"
      }, {
        "name" : "description",
        "type" : "string"
      }, {
        "name" : "type",
        "type" : "string"
      } ]
    }
  }, {
    "name" : "containers",
    "type" : {
      "type" : "record",
      "name" : "containers",
      "fields" : [ {
        "name" : "name",
        "type" : "string"
      }, {
        "name" : "sysnonyms",
        "type" : "string"
      }, {
        "name" : "description",
        "type" : "string"
      }, {
        "name" : "technology",
        "type" : "string"
      }, {
        "name" : "parentSystem",
        "type" : "string"
      }, {
        "name" : "ciDataOwner",
        "type" : "string"
      }, {
        "name" : "productOwner",
        "type" : "string"
      }, {
        "name" : "applicationType",
        "type" : "string"
      }, {
        "name" : "hostedAt",
        "type" : "string"
      }, {
        "name" : "deploymentModel",
        "type" : "string"
      }, {
        "name" : "personalData",
        "type" : "boolean"
      }, {
        "name" : "confidentiality",
        "type" : "string"
      }, {
        "name" : "mcv",
        "type" : "string"
      }, {
        "name" : "maxSeverityLevel",
        "type" : "long"
      }, {
        "name" : "sox",
        "type" : "boolean"
      }, {
        "name" : "icfr",
        "type" : "boolean"
      }, {
        "name" : "assignementGroup",
        "type" : "string"
      }, {
        "name" : "operationalStatus",
        "type" : "string"
      }, {
        "name" : "environments",
        "type" : "string"
      }, {
        "name" : "relationships",
        "type" : {
          "type" : "record",
          "name" : "relationships",
          "fields" : [ {
            "name" : "type",
            "type" : "string"
          }, {
            "name" : "container",
            "type" : {
              "type" : "record",
              "name" : "container",
              "fields" : [ {
                "name" : "name",
                "type" : "string"
              } ]
            }
          } ]
        }
      }, {
        "name" : "components",
        "type" : {
          "type" : "record",
          "name" : "components",
          "fields" : [ {
            "name" : "name",
            "type" : "string"
          }, {
            "name" : "description",
            "type" : "string"
          }, {
            "name" : "exposedAPIs",
            "type" : {
              "type" : "record",
              "name" : "exposedAPIs",
              "fields" : [ { "name" : "name", "type" : "string" }, 
              { "name" : "description", "type" : "string" }, 
              { "name" : "type", "type" : "string" }, 
              { "name" : "status", "type" : "string" } ]
            }
          }, {
            "name" : "consumedAPIs",
            "type" : {
              "type" : "record",
              "name" : "consumedAPIs",
              "fields" : [ {
                "name" : "name",
                "type" : "string"
              }, {
                "name" : "description",
                "type" : "string"
              }, {
                "name" : "status",
                "type" : "string"
              } ]
            }
          } ]
        }
      } ]
    }
  } 
  ]
}

In [27]:
schema_str = """
{
  "type" : "record",
  "namespace" : "com.test.avro",
  "name" : "SystemModel",
  "fields" : [ {
    "name" : "name",
    "type" : "string"
  }, {
    "name" : "description",
    "type" : "string"
  }, {
    "name" : "status",
    "type" : "string"
  }, {
    "name" : "consumers",
    "type" : {
      "type" : "record",
      "name" : "consumers",
      "fields" : [ {
        "name" : "name",
        "type" : "string"
      }, {
        "name" : "description",
        "type" : "string"
      }, {
        "name" : "type",
        "type" : "string"
      } ]
    }
  }, {
    "name" : "containers",
    "type" : {
      "type" : "record",
      "name" : "containers",
      "fields" : [ {
        "name" : "name",
        "type" : "string"
      }, {
        "name" : "sysnonyms",
        "type" : "string"
      }, {
        "name" : "description",
        "type" : "string"
      }, {
        "name" : "technology",
        "type" : "string"
      }, {
        "name" : "parentSystem",
        "type" : "string"
      }, {
        "name" : "ciDataOwner",
        "type" : "string"
      }, {
        "name" : "productOwner",
        "type" : "string"
      }, {
        "name" : "applicationType",
        "type" : "string"
      }, {
        "name" : "hostedAt",
        "type" : "string"
      }, {
        "name" : "deploymentModel",
        "type" : "string"
      }, {
        "name" : "personalData",
        "type" : "boolean"
      }, {
        "name" : "confidentiality",
        "type" : "string"
      }, {
        "name" : "mcv",
        "type" : "string"
      }, {
        "name" : "maxSeverityLevel",
        "type" : "long"
      }, {
        "name" : "sox",
        "type" : "boolean"
      }, {
        "name" : "icfr",
        "type" : "boolean"
      }, {
        "name" : "assignementGroup",
        "type" : "string"
      }, {
        "name" : "operationalStatus",
        "type" : "string"
      }, {
        "name" : "environments",
        "type" : "string"
      }, {
        "name" : "relationships",
        "type" : {
          "type" : "record",
          "name" : "relationships",
          "fields" : [ {
            "name" : "type",
            "type" : "string"
          }, {
            "name" : "container",
            "type" : {
              "type" : "record",
              "name" : "container",
              "fields" : [ {
                "name" : "name",
                "type" : "string"
              } ]
            }
          } ]
        }
      }, {
        "name" : "components",
        "type" : {
          "type" : "record",
          "name" : "components",
          "fields" : [ {
            "name" : "name",
            "type" : "string"
          }, {
            "name" : "description",
            "type" : "string"
          }, {
            "name" : "exposedAPIs",
            "type" : {
              "type" : "record",
              "name" : "exposedAPIs",
              "fields" : [ { "name" : "name", "type" : "string" }, 
              { "name" : "description", "type" : "string" }, 
              { "name" : "type", "type" : "string" }, 
              { "name" : "status", "type" : "string" } ]
            }
          }, {
            "name" : "consumedAPIs",
            "type" : {
              "type" : "record",
              "name" : "consumedAPIs",
              "fields" : [ {
                "name" : "name",
                "type" : "string"
              }, {
                "name" : "description",
                "type" : "string"
              }, {
                "name" : "status",
                "type" : "string"
              } ]
            }
          } ]
        }
      } ]
    }
  } 
  ]
}
"""

In [54]:
schema_key_str = """{
    "type": "record",
    "name": "TestObject",
    "namespace": "System-key",
    "fields": [{
        "name": "key",
        "type": "string"
    }]
}"""

In [9]:
data = load_doc()

In [60]:
from confluent_kafka import Producer
from confluent_kafka.serialization import StringSerializer, SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer

In [68]:
topic = "topic5"

with open("avro_schema.avsc") as f:
    schema_str = f.read()

schema_registry_client = SchemaRegistryClient({'url': 'http://10.152.183.242:8081'})

avro_serializer = AvroSerializer(schema_registry_client, schema_str)

string_serializer = StringSerializer('utf_8')

producer = Producer({'bootstrap.servers': '10.152.183.181:9094'})

producer.produce(topic=topic, key=string_serializer('testkey', None), value=avro_serializer(data, SerializationContext(topic, MessageField.VALUE)))

%4|1667317266.144|TERMINATE|rdkafka#producer-24| [thrd:app]: Producer terminating with 1 message (522 bytes) still in queue or transit: use flush() to wait for outstanding message delivery


In [59]:
from confluent_kafka.schema_registry import SchemaRegistryClient, Schema

avro_schema = Schema(schema_str, 'AVRO')

client = SchemaRegistryClient("http://10.152.183.242:8081")

schema_id = client.register_schema('test', avro_schema)

AttributeError: 'str' object has no attribute 'copy'

In [58]:
from confluent_kafka.avro import AvroProducer

producer = AvroProducer({'bootstrap.servers': '10.152.183.181:9094', 'schema.registry.url': 'http://10.152.183.242:8081'})

producer.produce(topic="topic4", value=data, value_schema=schema_str, key_schema=schema_key_str, key="testkey")

AttributeError: 'str' object has no attribute 'get'

In [11]:
producer = KafkaProducer(
                             value_serializer=lambda v: json.dumps(v).encode('utf-8'),
                             bootstrap_servers="10.152.183.181:9094")
producer.send('topic2', value=data)

INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=10.152.183.181:9094 <connecting> [IPv4 ('10.152.183.181', 9094)]>: connecting to 10.152.183.181:9094 [('10.152.183.181', 9094) IPv4]
INFO:kafka.conn:Probing node bootstrap-0 broker version
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=10.152.183.181:9094 <connecting> [IPv4 ('10.152.183.181', 9094)]>: Connection complete.
INFO:kafka.conn:Broker version identified as 2.5.0
INFO:kafka.conn:Set configuration api_version=(2, 5, 0) to skip auto check_version requests on startup
INFO:kafka.conn:<BrokerConnection node_id=0 host=192.168.0.121:30000 <connected> [IPv4 ('192.168.0.121', 30000)]>: Closing connection. 
INFO:kafka.conn:<BrokerConnection node_id=0 host=192.168.0.121:30000 <connecting> [IPv4 ('192.168.0.121', 30000)]>: connecting to 192.168.0.121:30000 [('192.168.0.121', 30000) IPv4]


<kafka.producer.future.FutureRecordMetadata at 0x7fef62ebbac0>

INFO:kafka.conn:<BrokerConnection node_id=0 host=192.168.0.121:30000 <connecting> [IPv4 ('192.168.0.121', 30000)]>: Connection complete.
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=10.152.183.181:9094 <connected> [IPv4 ('10.152.183.181', 9094)]>: Closing connection. 


In [21]:
from kafka_schema_registry import prepare_producer

producer = prepare_producer(bootstrap_servers=["10.152.183.181:9094"], avro_schema_registry="http://10.152.183.242:8081", topic_name="topic1", value_schema=schema, num_partitions=1, replication_factor=1)

producer.send("topic1",data)

INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=10.152.183.181:9094 <connecting> [IPv4 ('10.152.183.181', 9094)]>: connecting to 10.152.183.181:9094 [('10.152.183.181', 9094) IPv4]
INFO:kafka.conn:Probing node bootstrap-0 broker version
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=10.152.183.181:9094 <connecting> [IPv4 ('10.152.183.181', 9094)]>: Connection complete.
INFO:kafka.conn:Broker version identified as 2.5.0
INFO:kafka.conn:Set configuration api_version=(2, 5, 0) to skip auto check_version requests on startup
INFO:kafka.conn:Probing node bootstrap-0 broker version
INFO:kafka.conn:Broker version identified as 2.5.0
INFO:kafka.conn:Set configuration api_version=(2, 5, 0) to skip auto check_version requests on startup
INFO:kafka.conn:<BrokerConnection node_id=0 host=192.168.0.121:30000 <connecting> [IPv4 ('192.168.0.121', 30000)]>: connecting to 192.168.0.121:30000 [('192.168.0.121', 30000) IPv4]
INFO:kafka.conn:Probing node 0 broker version
INFO:kafka.co

<kafka.producer.future.FutureRecordMetadata at 0x7f1d80d845b0>

INFO:kafka.conn:<BrokerConnection node_id=0 host=192.168.0.121:30000 <connecting> [IPv4 ('192.168.0.121', 30000)]>: connecting to 192.168.0.121:30000 [('192.168.0.121', 30000) IPv4]
INFO:kafka.conn:<BrokerConnection node_id=0 host=192.168.0.121:30000 <connecting> [IPv4 ('192.168.0.121', 30000)]>: Connection complete.
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=10.152.183.181:9094 <connected> [IPv4 ('10.152.183.181', 9094)]>: Closing connection. 
INFO:kafka.conn:<BrokerConnection node_id=0 host=192.168.0.121:30000 <connecting> [IPv4 ('192.168.0.121', 30000)]>: connecting to 192.168.0.121:30000 [('192.168.0.121', 30000) IPv4]
INFO:kafka.conn:<BrokerConnection node_id=0 host=192.168.0.121:30000 <connecting> [IPv4 ('192.168.0.121', 30000)]>: Connection complete.
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=10.152.183.181:9094 <connected> [IPv4 ('10.152.183.181', 9094)]>: Closing connection. 
