In [13]:
import logging
import random
import string
import uuid
import time

from producer import KafkaJsonProducer
import json

import os

os.environ['Kafka_host'] = "url:9902"
os.environ['Cluster_key'] = 'key_num'
os.environ['Cluster_secret'] = 'secret'

ModuleNotFoundError: No module named 'producer'

In [None]:
class DataClass:
    def __init__(self, json_data):
        self.data = json_data

    def update_fields(self, first_name, cx_status, cx_substatus, loan_number):
        self.update_variable('FirstName', first_name)

        def update_numeric_value(data):
            if isinstance(data, dict):
                if 'fieldName' in data and data['fieldName'] == 'CX.STATUS':
                    if 'numericValue' in data:
                        data['numericValue'] = cx_status

                for value in data.values():
                    if isinstance(value, (dict, list)):
                        update_numeric_value(value)

        update_numeric_value(self.data)

        self.update_variable('CX.SUBSTATUS', cx_substatus)
        self.update_variable('encompassId', str(uuid.uuid4()))
        self.update_variable('loanNumber', loan_number)

    def update_variable(self, variable_name, new_value, data=None):
        if data is None:
            data = self.data

        if isinstance(data, dict):
            for key, value in data.items():
                if key == variable_name:
                    data[key] = new_value
                elif isinstance(value, (dict, list)):
                    self.update_variable(variable_name, new_value, value)

        elif isinstance(data, list):
            for item in data:
                self.update_variable(variable_name, new_value, item)


class JsonProducer:
    def __init__(self):
        self.json_producer = KafkaJsonProducer(producer_client_id='python-sample')

    def produce_json_message(self, data_class):
        key = str(uuid.uuid4())
        value = data_class.data
        self.json_producer.send(topic_name='example', key=key, value=value)


json_producer = JsonProducer()

# Specify the path to the JSON file
file_path = 'path_to_your_json_file.json'

start_time = time.time()

# Generate 500 unique messages with random values for "CX.STATUS", "CX.SUBSTATUS", "encompassId", and "loanNumber"
for _ in range(500):
    first_name = ''.join(random.choices(string.ascii_letters, k=8))
    cx_status = random.randint(1, 9)
    cx_substatus = random.choice(string.ascii_uppercase)
    loan_number = str(random.randint(10**8, 10**9-1))

    with open(file_path, 'r') as file:
        json_data = json.load(file)

    data = DataClass(json_data)
    data.update_fields(first_name, cx_status, cx_substatus, loan_number)

    json_producer.produce_json_message(data)

end_time = time.time()
execution_time = end_time - start_time

# Get the desired values
min_insync_replicas = os.environ.get('min_insync_replicas')
cleanup_policy = os.environ.get('cleanup_policy')
retention_ms = os.environ.get('retention_ms')
max_message_bytes = os.environ.get('max_message_bytes')

print(f"min_insync_replicas: {min_insync_replicas}")
print(f"cleanup_policy: {cleanup_policy}")
print(f"retention_ms: {retention_ms}")


In [None]:
from confluent_kafka.cimpl import TopicPartition

from mrckafka.commons.exceptions.MrcKafkaErrors import MrcKafkaSerializationException, MrcKafkaProducerException
from .KafkaProducer import KafkaProducer
import logging
from confluent_kafka import Producer
import json

class KafkaJsonProducer(KafkaProducer):
    def __init__(self, producer_client_id):
        self.err = None
        self.topic_partition = None
        super(KafkaJsonProducer, self).__init__(producer_client_id = producer_client_id)
        
        json_producer_logger = logging.getLogger('json_producer')
        json_producer_logger.setLevel(logging.DEBUG)
        
        logging.info("Setting up Kafka Json Producer")
        self.json_producer = Producer(self.producer_conf, logger = json_producer_logger)
        logging.info("kafka Json Producer Configured")
        
    def delivery_report(self, err, msg):
        """
        
        reports the failure or success of a message delivery
        Args:
            err (KafkaError): The error that occurred on None on success.
            msg (Message): The message that was produced or failed.
        """
        if err is not None:
            logging.error("Delivery failed for User record {}: {}".format{msg.key(), err})
            self.err = err
        else:
            logging.info(
                'Record successfully delivered -> TOPIC: {} - Partition [{}] - offset {}'.format(
                    msg.topic(), msg.partition(), msg.offset()))
            self.topic_partition = TopicPartition(topic = msg.topic(), partition = msg.partition(), offset = msg.offset())
            
        
    def validatePayload(self, topic_name, key, value):
        if not topic_name:
            raise MrcKafkaProducerException(message = "Topic Name cannot be null or empty")
        if not key:
            raise MrcKafkaProducerException(message = "Key cannot be null or empty")
        if not value:
            raise MrcKafkaProducerException(message = "Value cannot be null or empty")
    
    def send(self, topic_name, key, value, headers = None):
        self.validatePayload(topic_name,key, value)
        
        ##Convert to JSON String
        try:
            jsonValue = json.dumps(value.__dict__)
        except AttributeError as err:
            raise MrcKafkaSerializationException("Invalid class object sent. Please send a class. \n {}".format(err))
        
        self.json_producer.produce(topic = topic_name, key = key, value = jsonValue, headers = headers, on_delivery = (lambda err, msg: self.delivery_report(err,msg)))
        
        self.json_producer.flush()
        if self.err is None:
            traceId = None
            if headers is not None:
                if type(headers) is dict:
                    traceId = headers.get('traceId', None)
            if traceId is not None:
                logging.info('TraceId: {} | Message sent to topic {}'.format(traceId, topic_name))
            return self.topic_partition
        else:
            raise MrcKafkaProducerException(error = self.err, message = self.err.str())

In [None]:
import os
from time import sleep
from confluent_kafka import Consumer
import logging

from confluent_kafka.serialization import StringDeserializer

from mrckafka.commons.exceptions.MrcKafkaErrors import MrcKafkaConsumerException, MrcKafkaDeserializationException

from .ByteProducerFactory import ByteProducerFactory
from .KafkaConsumer import KafkaConsumer

from .utils.CommitCode import CommitCode
from .utils.CommitResult import CommitResult
from .utils.ConsumerRecord import ConsumerRecord
from .utils.JsonDeserializer import JsonDeserializier

class KafkaJsonConsumer(KafkaConsumer):
    def __init__(self,record_class, consumer_group_id):
        super(KafkaJsonConsumer,self).__init__(consumer_group_id)
        
        json_consumer_logger = logging.getLogger('json_consumer')
        json_consumer_logger.setLevel(logging.DEBUG)
        
        logging.info('Setting up Json Consumer')
        self.json_consumer = Consumer(self.consumer_conf,logger =json_consumer_logger)
        
        self._key_deserializer = StringDeserializer('utf-8')
        self._value_deserializer = JsonDeserializier(record_class)
        self._byte_product_factory = ByteProducerFactory()
        self._consumer_group_id = consumer_group_id
        
        self._retry_counter = 0
        self._max_retries = self.config.max_retries
        self._retry_interval = (self.config.retry_interval) / 1000.0
        
    def message_retry_handler(self, consumer_record,retry_topic, commit_result):

In [None]:
import logging
import random
import string
import json
from confluent_kafka import Producer
import uuid

class DataClass:
    def __init__(self, json_data):
        self.data = json_data

    def update_fields(self, first_name, cx_status, cx_substatus, loan_number):
        self.update_variable('FirstName', first_name)

        def update_numeric_value(data):
            if isinstance(data, dict):
                if 'fieldName' in data and data['fieldName'] == 'CX.STATUS':
                    if 'numericValue' in data:
                        data['numericValue'] = cx_status

                for value in data.values():
                    if isinstance(value, (dict, list)):
                        update_numeric_value(value)

        update_numeric_value(self.data)

        self.update_variable('CX.SUBSTATUS', cx_substatus)
        self.update_variable('encompassId', str(uuid.uuid4()))
        self.update_variable('loanNumber', loan_number)

    def update_variable(self, variable_name, new_value, data=None):
        if data is None:
            data = self.data

        if isinstance(data, dict):
            for key, value in data.items():
                if key == variable_name:
                    data[key] = new_value
                elif isinstance(value, (dict, list)):
                    self.update_variable(variable_name, new_value, value)

        elif isinstance(data, list):
            for item in data:
                self.update_variable(variable_name, new_value, item)

class JsonProducer:
    def __init__(self):
        self.json_producer = KafkaJsonProducer(producer_client_id='python-sample')
        self.logger = logging.getLogger(__name__)

    def produce_json_message(self, data_class):
        key = str(uuid.uuid4())
        value = data_class.data
        self.json_producer.send(topic_name='example', key=key, value=value)
        self.logger.info('Produced a message')  # Log the message production

# Specify the path to the JSON file
file_path = 'path_to_your_json_file.json'

start_time = time.time()

# Configure the logger
logging.basicConfig(filename='producer.log', level=logging.INFO)

# Generate 500 unique messages with random values for "CX.STATUS", "CX.SUBSTATUS", "encompassId", and "loanNumber"
for _ in range(500):
    first_name = ''.join(random.choices(string.ascii_letters, k=8))
    cx_status = random.randint(1, 9)
    cx_substatus = random.choice(string.ascii_uppercase)
    loan_number = str(random.randint(10**8, 10**9-1))

    with open(file_path, 'r') as file:
        json_data = json.load(file)

    data = DataClass(json_data)
    data.update_fields(first_name, cx_status, cx_substatus, loan_number)

    json_producer.produce_json_message(data)

end_time = time.time()
execution_time = end_time - start_time

# Get the desired values
min_insync_replicas = os.environ.get('min_insync_replicas')
cleanup_policy = os.environ.get('cleanup_policy')
retention_ms = os.environ.get('retention_ms')
max_message_bytes = os.environ.get('max_message_bytes')

print(f"min_insync_replicas: {min_insync_replicas}")
print(f"cleanup_policy: {cleanup_policy}")
print(f"retention_ms: {retention_ms}")


In [None]:
@Slf4j
@Component
public class StreamProcessor extends MRCKafkaStream{
    RestTemplates restTemplate = new RestTemplate();
    
    private static final String LEFT = "STREAMING.SAMPLE-APPLICATION.STRING-TOPIC";
    private static final String RIGHT = "STREAMING.SAMPLE-APPLICATION.JSON-TOPIC";
    private static final String OUT = "STREAMING.SAMPLE-APPLICATION.AVRO-TOPIC";
    
    private void employeeStreamProcessingLogic(StreamsBuilder streamsBuilder){
        Ktable<String, String> right = streamsBuilder.table(
            RIGHT,
            consumeWith(
                ContentType.STRING, String.class,
                ContentType.STRING, String.class
            ),
            Materialized.as("store-name")
        );
        KStream<String, String> left = streamsbuilder.stream(
            LEFT,
            consumedWith(
                ContentType.STRING, String.class,
                ContentType.STRING, String.class
            ))
                .join(right, (leftObject, rightObject) -> leftObject+rightObject,
                      Joined.with(
                          getSerde(ContentType.STRING, String.class, true),
                          getSerde(ContentType.STRING, String.class, false),
                          getSerde(ContentType.STRING, String.class, false)
                      ));
        streamTo(OUT, left, ContentType.STRING, String.class, ContentType.STRING, String.class);
    }
    
    @Override
    public void defineStreamProcessingLogic(SteamsBuilder streamsBuilder) {
        employeeStreamProcessingLogic(streamsBuilder)
    }
    
}