## Kafka Producer
**Intended Utility**
>This databrick is one piece of a "producer - consumer" pair. <br>
>It is intended to simulate a stream of data between a source and a datalake.<br>

>In this scenario, the source is an Azure Blob, and the data is Home Price Data, Mortgage Rates, and New Building Permits.

**Configuration:**
> These cells are responsible for configuring the primary aspects of the databrick.<br>

**Config Part 1:** Import Libraries

In [0]:
import json
import random
# import os.path
# import requests
from time import sleep
from pyspark.sql.functions import when, col


**Config Part 2:** Designate Mount Points

In [0]:
# Mounting container to read from z-arctic-analysts-capstone-backup file main_table_prepped_for_kafka.json

def mount_storage(mount_goal):
    storageAccount = mount_goal['account']
    storageContainer = mount_goal['container']
    clientSecret = "B4g8Q~1VyZJa5WszLHwdEQNq4YIaHmT4DevRBcwI"
    clientid = "2ca50102-5717-4373-b796-39d06568588d"
    mount_point = mount_goal['mount']

    configs = {"fs.azure.account.auth.type": "OAuth",
           "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
           "fs.azure.account.oauth2.client.id": clientid,
           "fs.azure.account.oauth2.client.secret": clientSecret,
           "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/d46b54b2-a652-420b-aa5a-2ef7f8fc706e/oauth2/token",
           "fs.azure.createRemoteFileSystemDuringInitialization": "true"}

    try: 
        dbutils.fs.unmount(mount_point)
    except:
        pass

    dbutils.fs.mount(
    source = "abfss://"+storageContainer+"@"+storageAccount+".dfs.core.windows.net/",
    mount_point = mount_point,
    extra_configs = configs)
    
    return mount_point
    
in_path = 'gen10datafund2202/z-arctic-analysts-capstone-backup'.split("/") 

storage_info = {
    'read': {'account': in_path[0], 'container': in_path[1], 'mount': "/mnt/main_table_prepped_for_kafka.json"},
}

try:
    read_path = mount_storage(storage_info['read'])
except Exception as E:
    print(E[:-50])
    
print(f'Read Path: {read_path}')

**Config Part 3:** Define Custom Error Messages

In [0]:
def error_cb(err):
    """ The error callback is used for generic client errors. These
        errors are generally to be considered informational as the client will
        automatically try to recover from all errohttps://adb-902605293018646.6.azuredatabricks.net/?o=902605293018646#rs, and no extra action
        is typically required by the application.
        For this example however, we terminate the application if the client
        is unable to connect to any broker (_ALL_BROKERS_DOWN) and on
        authentication errors (_AUTHENTICATION). """

    print("Client error: {}".format(err))
    if err.code() == KafkaError._ALL_BROKERS_DOWN or \
       err.code() == KafkaError._AUTHENTICATION:
        # Any exception raised from this callback will be re-raised from the
        # triggering flush() or poll() call.
        raise KafkaException(err)


def acked(err, msg):
    """ 
        Error callback is used for generic issues for producer errors. 
        
        Parameters:
            err (err): Error flag.
            msg (str): Error message that was part of the callback.
    """
    if err is not None:
        print("Failed to deliver message: %s: %s" % (str(msg), str(err)))
    else:
        print("Message produced: %s" % (str(msg)))

**Data Lake Read:** Get the data from the data lake.

In [0]:
main_table = spark.read.json(read_path)
results = main_table.toJSON().map(lambda j: json.loads(j)).collect()


In [0]:
import uuid
import json
from confluent_kafka import Consumer
from confluent_kafka.admin import AdminClient, NewTopic
from confluent_kafka import Producer, Consumer, KafkaError, KafkaException

In [0]:
#KAFKA variables, Move to the OS variables or configuration
# This will work in local Jupiter Notebook, but in a databrick, hiding config.py is tougher. 
confluentClusterName = "stage3talent"
confluentBootstrapServers = "pkc-ldvmy.centralus.azure.confluent.cloud:9092"
confluentTopicName = "arctic_analysts_main_table"
schemaRegistryUrl = "https://psrc-gq7pv.westus2.azure.confluent.cloud"
confluentApiKey = "YHMHG7E54LJA55XZ"
confluentSecret = "/XYn+w3gHGMqpe9l0TWvA9FznMYNln2STI+dytyPqtZ9QktH0TbGXUqepEsJ/nR0"
confluentRegistryApiKey = "YHMHG7E54LJA55XZ"
confluentRegistrySecret = "/XYn+w3gHGMqpe9l0TWvA9FznMYNln2STI+dytyPqtZ9QktH0TbGXUqepEsJ/nR0"


admin_client = AdminClient({
    'bootstrap.servers': confluentBootstrapServers,
    'sasl.mechanism': 'PLAIN',
    'security.protocol': 'SASL_SSL',
    'sasl.username': confluentApiKey,
    'sasl.password': confluentSecret, 
    # this will create a new consumer group on each invocation.
    'auto.offset.reset': 'earliest',
    'error_cb': error_cb,
})


In [0]:
#delete a topic

# try:
#     topics =['arctic_analysts_zillow']
#     fs = admin_client.delete_topics(topics, request_timeout=30)

#     for topic, f in fs.items():
#         try:
#             f.result()  # The result itself is None
#             print("Topic {} deleted".format(topic))
#         except Exception as e:
#             print("Failed to delete topic {}: {}".format(topic, e))
# except Exception as e:
#     print(e)

In [0]:
topic_list = []

topic_list.append(NewTopic(confluentTopicName, 1, 3)) 
admin_client.create_topics(topic_list)
futures = admin_client.create_topics(topic_list)


try:
    record_metadata = []
    for k, future in futures.items():
        # f = i.get(timeout=10)
        print(f"type(k): {type(k)}")
        print(f"type(v): {type(future)}")
#         print(future.result())

except KafkaError:
    # Decide what to do if produce request failed...
    print(traceback.format_exc())
    result = 'Fail'
finally:
    print("finally")

In [0]:
#Kakfa Class Setup.
p = Producer({
    'bootstrap.servers': confluentBootstrapServers,
    'sasl.mechanism': 'PLAIN',
    'security.protocol': 'SASL_SSL',
    'sasl.username': confluentApiKey,
    'sasl.password': confluentSecret,
    'group.id': str(1),# this will create a new consumer group on each invocation.
    'auto.offset.reset': 'earliest',
    'error_cb': error_cb,
})

In [0]:
# Sending messages

for i in range(len(results)):
    if i % 50 == 0:
        print(f'Messages Sent: {i} | {round(i/len(results) * 100,2)}% completed.') 
        
    p.produce(confluentTopicName,json.dumps(results[i]))
    p.flush()
    
    # Jed added this to test a functionality. You can comment it out if you need to
#     if (i > 10 and i % 100 == 0):
#         sleep_duration = random.randint(35,45)
#     else:
#         sleep_duration = random.choices([1,1.5,2,3,4,5,6,7], weights = [.68, .155, .05, .037, .035, .023, .01, .01])[0]
        
#     sleep(sleep_duration)
    i = i + 1
print(f'Messages Sent: {i} | {round(i/len(results) * 100, 2)}% completed.') 