### Setting up Kafka Consumer

In [None]:
#error handling for consumer
def error_cb(err):
    """ The error callback is used for generic client errors. These
        errors are generally to be considered informational as the client will
        automatically try to recover from all errors, and no extra action
        is typically required by the application.
        For this example however, we terminate the application if the client
        is unable to connect to any broker (_ALL_BROKERS_DOWN) and on
        authentication errors (_AUTHENTICATION). """

    print("Client error: {}".format(err))
    if err.code() == KafkaError._ALL_BROKERS_DOWN or \
       err.code() == KafkaError._AUTHENTICATION:
        # Any exception raised from this callback will be re-raised from the
        # triggering flush() or poll() call.
        raise KafkaException(err)

In [None]:
#creating consumer
from confluent_kafka import Consumer
from time import sleep
import uuid
from confluent_kafka import Producer, Consumer, KafkaError, KafkaException
import json

#KAFKA variables, Move to the OS variables or configuration
# This will work in local Jupiter Notebook, but in a databrick, hiding config.py is tougher. 
confluentClusterName = "stage3talent"
confluentBootstrapServers = "pkc-ldvmy.centralus.azure.confluent.cloud:9092"
confluentTopicName = "g6ft3"
schemaRegistryUrl = "https://psrc-gq7pv.westus2.azure.confluent.cloud"
confluentApiKey = "YHMHG7E54LJA55XZ"
confluentSecret = "/XYn+w3gHGMqpe9l0TWvA9FznMYNln2STI+dytyPqtZ9QktH0TbGXUqepEsJ/nR0"
confluentRegistryApiKey = "YHMHG7E54LJA55XZ"
confluentRegistrySecret = "/XYn+w3gHGMqpe9l0TWvA9FznMYNln2STI+dytyPqtZ9QktH0TbGXUqepEsJ/nR0"

#Kakfa Class Setup.
c = Consumer({
    'bootstrap.servers': confluentBootstrapServers,
    'sasl.mechanism': 'PLAIN',
    'security.protocol': 'SASL_SSL',
    'sasl.username': confluentApiKey,
    'sasl.password': confluentSecret,
    'group.id': str(uuid.uuid1()),  # this will create a new consumer group on each invocation.
    'auto.offset.reset': 'earliest',
    'error_cb': error_cb,
})

c.subscribe([confluentTopicName])


### Consuming messages from kafka topic, and saving them as a list of dictionaries

In [None]:
#consuming message
aString = {}
kafkaListDictionaries = [] #consumed entities are stored in this list

while(True):
    try:
        msg = c.poll(timeout=1.0)
        if msg is None:
            break
        elif msg.error():
            print("Consumer error: {}".format(msg.error()))
            break
        else:
            aString=json.loads('{}'.format(msg.value().decode('utf-8')))
            aString['timestamp'] = msg.timestamp()[1]
            kafkaListDictionaries.append(aString)
    except Exception as e:
        print(e)

#keeping this for debugging; view entities consumed from the kafka topic
    for message in kafkaListDictionaries:
        print(message)


In [None]:
#list of consumed entites is converted to dataframe
vgsales_consumed = spark.createDataFrame(kafkaListDictionaries)
vgsales_consumed.show()
vgsales_consumed.count()

### Setting up mount point for backup storage 

In [None]:
#creating mount point for storage through Oauth security ---> mounting point is created for access back to data lake for storage 
storageAccount = "gen10dbcdatalake"
storageContainer = "group6-capstone"
clientSecret = "~bJ7Q~KslVT~sAmHkOLXL0oeTp1ZkAcndtHPr"
clientid = "2ca50102-5717-4373-b796-39d06568588d"
mount_point = "/mnt/ghcst1"

configs = {"fs.azure.account.auth.type": "OAuth",
       "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
       "fs.azure.account.oauth2.client.id": clientid,
       "fs.azure.account.oauth2.client.secret": clientSecret,
       "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/d46b54b2-a652-420b-aa5a-2ef7f8fc706e/oauth2/token",
       "fs.azure.createRemoteFileSystemDuringInitialization": "true"}

try: 
    dbutils.fs.unmount(mount_point)
except:
    pass

dbutils.fs.mount(
source = "abfss://"+storageContainer+"@"+storageAccount+".dfs.core.windows.net/",
mount_point = mount_point,
extra_configs = configs)

In [None]:
#data frame is stored as a .csv and .json as backup in the datalake 
vgsales_consumed.write.mode("overwrite").json("/mnt/ghcst1/vgsales_consumed_factoryrun")
vgsales_consumed.write.mode("overwrite").csv("/mnt/ghcst1/vgsales_consumed_factoryrun",header=True)

### Writing data from dataframe to sql database

In [None]:
#saving the dataframe (consumed entities from kafka topic) to a sql database, automated in a data factory via triggered runs

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

#Create spark configuration object
conf = SparkConf()
conf.setMaster("local").setAppName("My app")
 
#Create spark context and sparksession
sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession(sc)
 
#set variable to be used to connect the database
database = "group6"
table = "FactoryDump_vg"
user = "group6user"
password  = "everythingIsAwesome!"
server = "database2108.database.windows.net"
 
#write the dataframe into a sql table
vgsales_consumed.write.format("jdbc") \
       .mode("overwrite") \
       .option("url", f"jdbc:sqlserver://{server}:1433;databaseName={database};") \
       .option("dbtable", table) \
       .option("user", user) \
       .option("password", password) \
       .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
       .save()