In [2]:
import pyodbc
import json
import datetime

from kafka import KafkaProducer
from kafka.vendor.six import b

import argparse
from uuid import uuid4

#from six.moves import input

from confluent_kafka import SerializingProducer
from confluent_kafka.serialization import StringSerializer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.json_schema import JSONSerializer


schema_str = """
{
  "$schema": "http://json-schema.org/draft-04/schema#",
  "title": "User",
  "properties": {
    "amount": {
      "type": "number"
    },
    "currency": {
      "type": "string"
    },
    "ext_acc": {
      "type": "integer"
    },
    "file_path": {
      "type": "string"
    },
    "modified_by": {
      "type": "string"
    },
    "trade_date": {
      "type": "string"
    },
    "modified_date": {
      "type": "string"
    }
  },
  "required": [
    "trade_date",
    "ext_acc",
    "modified_by",
    "currency",
    "amount",
    "file_path",
    "modified_date"
  ],
  "type": "object"
}
"""




class User(object):
    """
    User record
    Args:
        name (str): User's name
        favorite_number (int): User's favorite number
        favorite_color (str): User's favorite color
        address(str): User's address; confidential
    """
    def __init__(self, ext_acc, modified_by, currency, amount, file_path, trade_date, modified_date):
        self.ext_acc = ext_acc
        self.modified_by = modified_by
        self.currency = currency
        self.amount = amount
        self.file_path = file_path
        self.trade_date = trade_date
        self.modified_date = modified_date

        
def user_to_dict(user, ctx):
    """
    Returns a dict representation of a User instance for serialization.
    Args:
        user (User): User instance.
        ctx (SerializationContext): Metadata pertaining to the serialization
            operation.
    Returns:
        dict: Dict populated with user attributes to be serialized.
    """
    return dict(ext_acc=user.ext_acc,
                modified_by=user.modified_by,
                currency=user.currency,
                amount=user.amount,
                file_path=user.file_path,
                trade_date=user.trade_date,
                modified_date=user.modified_date)


def delivery_report(err, msg):
    """
    Reports the failure or success of a message delivery.
    Args:
        err (KafkaError): The error that occurred on None on success.
        msg (Message): The message that was produced or failed.
    Note:
        In the delivery report callback the Message.key() and Message.value()
        will be the binary format as encoded by any configured Serializers and
        not the same object that was passed to produce().
        If you wish to pass the original object(s) for key and value to delivery
        report callback we recommend a bound callback or lambda where you pass
        the objects along.
    """
    if err is not None:
        print("Delivery failed for User record {}: {}".format(msg.key(), err))
        return
    print('User record {} successfully produced to {} [{}] at offset {}'.format(
        msg.key(), msg.topic(), msg.partition(), msg.offset()))



schema_registry_conf = {'url':'http://localhost:8081' }
schema_registry_client = SchemaRegistryClient(schema_registry_conf)

json_serializer = JSONSerializer(schema_str, schema_registry_client, user_to_dict)

producer_conf = {'bootstrap.servers': 'localhost:9092',
                         'key.serializer': StringSerializer('utf_8'),
                         'value.serializer': json_serializer}

producer = SerializingProducer(producer_conf)

server = 'localhost' 
database = 'TestDB' 
username = 'sa' 
password = 'Password1#' 
cnxn = pyodbc.connect('DRIVER={ODBC Driver 17 for SQL Server};SERVER='+server+';DATABASE='+database+';UID='+username+';PWD='+ password)
cursor = cnxn.cursor()

rows = cursor.execute('SELECT trade_date,ext_acc,modified_by,currency,amount,file_path FROM citadel ')

for row in rows :

    items = [dict(zip([key[0] for key in cursor.description], row))  for row in rows ]
    
    class DateTimeEncoder(json.JSONEncoder):
        def default(self, z):
            if isinstance(z, datetime.datetime):
                return (str(z))
            else:
                return super().default(z)

    for item in items:
        
        item_key=str(item["ext_acc"]) 
    
        user = User(ext_acc=item['ext_acc'],
                modified_by=item['modified_by'],
                currency=item['currency'],
                amount=item['amount'],
                file_path=item['file_path'],
                trade_date=item['trade_date'].strftime("%Y-%m-%d %H:%M:%S"),
                modified_date=item['modified_date'].strftime("%Y-%m-%d %H:%M:%S"))
        producer.produce('excesscash_topic_new', key=str(uuid4()), value=user, on_delivery=delivery_report)
        print (item_key, item)

print("\nStarted Flushing records..............\n")
producer.flush()
print("\nCompelted Flushing records..............\n")


%3|1630120727.177|FAIL|rdkafka#producer-2| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1630120728.104|FAIL|rdkafka#producer-2| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) suppressed)


OperationalError: ('HYT00', '[HYT00] [Microsoft][ODBC Driver 17 for SQL Server]Login timeout expired (0) (SQLDriverConnect)')

%3|1630120758.106|FAIL|rdkafka#producer-2| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT, 30 identical error(s) suppressed)
%3|1630120788.107|FAIL|rdkafka#producer-2| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT, 30 identical error(s) suppressed)
%3|1630120818.109|FAIL|rdkafka#producer-2| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused (after 0ms in state CONNECT, 30 identical error(s) suppressed)
