In [0]:
def error_cb(err):
    """ The error callback is used for generic client errors. These
        errors are generally to be considered informational as the client will
        automatically try to recover from all errors, and no extra action
        is typically required by the application.
        For this example however, we terminate the application if the client
        is unable to connect to any broker (_ALL_BROKERS_DOWN) and on
        authentication errors (_AUTHENTICATION). """

    print("Client error: {}".format(err))
    if err.code() == KafkaError._ALL_BROKERS_DOWN or \
       err.code() == KafkaError._AUTHENTICATION:
        # Any exception raised from this callback will be re-raised from the
        # triggering flush() or poll() call.
        raise KafkaException(err)


def acked(err, msg):
    """ 
        Error callback is used for generic issues for producer errors. 
        
        Parameters:
            err (err): Error flag.
            msg (str): Error message that was part of the callback.
    """
    if err is not None:
        print("Failed to deliver message: %s: %s" % (str(msg), str(err)))
    else:
        print("Message produced: %s" % (str(msg)))

In [0]:
from confluent_kafka import Consumer
from time import sleep
import uuid
from confluent_kafka import Producer, Consumer, KafkaError, KafkaException
import json
from confluent_kafka.admin import AdminClient, NewTopic

#KAFKA variables, Move to the OS variables or configuration
# This will work in local Jupiter Notebook, but in a databrick, hiding config.py is tougher. 
confluentClusterName = "stage3talent"
confluentBootstrapServers = "pkc-ldvmy.centralus.azure.confluent.cloud:9092"
confluentTopicName = "Group3Data"
schemaRegistryUrl = "https://psrc-gq7pv.westus2.azure.confluent.cloud"
confluentApiKey = "YHMHG7E54LJA55XZ"
confluentSecret = "/XYn+w3gHGMqpe9l0TWvA9FznMYNln2STI+dytyPqtZ9QktH0TbGXUqepEsJ/nR0"
confluentRegistryApiKey = "YHMHG7E54LJA55XZ"
confluentRegistrySecret = "/XYn+w3gHGMqpe9l0TWvA9FznMYNln2STI+dytyPqtZ9QktH0TbGXUqepEsJ/nR0"

In [0]:
# Create topic

# Create admin_client
admin_client = AdminClient({
    'bootstrap.servers': confluentBootstrapServers,
    'sasl.mechanism': 'PLAIN',
    'security.protocol': 'SASL_SSL',
    'sasl.username': confluentApiKey,
    'sasl.password': confluentSecret,
    'group.id': str(uuid.uuid1()),  # this will create a new consumer group on each invocation.
    'auto.offset.reset': 'earliest',
    'error_cb': error_cb,
})

In [0]:
# # Add topic
# topic_list = []
# topic_list.append(NewTopic(confluentTopicName, 1, 3))
# admin_client.create_topics(topic_list)
# futures = admin_client.create_topics(topic_list)
# try:
#     record_metadata = []
#     for k, future in futures.items():
#         # f = i.get(timeout=10)
#         print(f"type(k): {type(k)}")
#         print(f"type(v): {type(future)}")
#         print(future.result())
# except KafkaError:
#     # Decide what to do if produce request failed...
#     print(traceback.format_exc())
#     result = 'Fail'
# finally:
#     print("finally")

In [0]:
#Create producer object

p = Producer({
    'bootstrap.servers': confluentBootstrapServers,
    'sasl.mechanism': 'PLAIN',
    'security.protocol': 'SASL_SSL',
    'sasl.username': confluentApiKey,
    'sasl.password': confluentSecret,
    'group.id': str(1),  # this will create a new consumer group on each invocation.
    'auto.offset.reset': 'earliest',
    'error_cb': error_cb,
})

In [0]:
%fs
ls /mnt/capstone-group3-raw/

path,name,size,modificationTime
dbfs:/mnt/capstone-group3-raw/DistrictDataCleaned.csv,DistrictDataCleaned.csv,48613,1659732238000
dbfs:/mnt/capstone-group3-raw/FederallyFundedDistricts.csv,FederallyFundedDistricts.csv,510945,1659754753000
dbfs:/mnt/capstone-group3-raw/NYCSchoolDistricts.csv,NYCSchoolDistricts.csv,347923,1659751742000
dbfs:/mnt/capstone-group3-raw/csvfiles/,csvfiles/,0,0
dbfs:/mnt/capstone-group3-raw/district-householdincome.csv,district-householdincome.csv,29882,1659730056000
dbfs:/mnt/capstone-group3-raw/districtdata.csv/,districtdata.csv/,0,1659751605000
dbfs:/mnt/capstone-group3-raw/districtdatafinal.csv/,districtdatafinal.csv/,0,1659837010000
dbfs:/mnt/capstone-group3-raw/districtdatafinalcleaning.csv/,districtdatafinalcleaning.csv/,0,1659835886000
dbfs:/mnt/capstone-group3-raw/districtdatamedianincomerevised.csv/,districtdatamedianincomerevised.csv/,0,1659792537000
dbfs:/mnt/capstone-group3-raw/districtdatanycrevision.csv/,districtdatanycrevision.csv/,0,1659835834000


In [0]:
df = spark.read.option("header",True).csv('dbfs:/mnt/capstone-group3-raw/districtdatafinal.csv/')

In [0]:
display(df)

DistrictName,GraduationPercentageRate,DropoutPercentageRate,FundingPerStudent,TotalEnrolled,MalesEnrolled,FemalesEnrolled,Percentage of Minority Students,HouseholdIncome,FederallyFunded
NYCGEOGDIST#9-BRONX,74.0,9.0,20785.08,29653.0,14989.0,14664.0,98.7,47668.0,Yes
NYCGEOGDIST#10-BRONX,80.0,7.0,18515.51,49381.0,25187.0,24194.0,94.1,67737.0,Yes
NYCGEOGDIST#11-BRONX,75.0,8.0,17704.26,35438.0,18643.0,16795.0,91.1,75889.0,Yes
NYCGEOGDIST#12-BRONX,63.0,12.0,20174.92,19427.0,10007.0,9420.0,98.6,52783.0,Yes
NYCGEOGDIST#13-BROOKLYN,90.0,2.0,16254.63,19994.0,10672.0,9322.0,82.8,146314.0,Yes
NYCGEOGDIST#14-BROOKLYN,82.0,5.0,19282.51,16583.0,8684.0,7899.0,85.1,100410.0,Yes
NYCGEOGDIST#15-BROOKLYN,71.0,5.0,18079.91,29846.0,14881.0,14965.0,69.9,136417.0,Yes
NYCGEOGDIST#16-BROOKLYN,60.0,5.0,24918.87,5425.0,2891.0,2534.0,97.3,78197.0,Yes
NYCGEOGDIST#17-BROOKLYN,74.0,6.0,19294.66,19019.0,9361.0,9658.0,96.9,87778.0,Yes
NYCGEOGDIST#18-BROOKLYN,67.0,6.0,20378.87,12417.0,6499.0,5918.0,96.6,71335.0,Yes


In [0]:
from time import sleep

fulldistrictinfo = []

for column in df.columns:
    columns.append(column)

    
for row in df.rdd.collect():
    
    districtdict = {}

    for index in range(len(df.columns)):

        districtdict[df.columns[index]] = row[index]
        
    fulldistrictinfo.append(districtdict)
    

for eachrow in range(len(fulldistrictinfo)):
    
    district = fulldistrictinfo[eachrow]
    p.produce(confluentTopicName,json.dumps(district))
    p.flush()
    print(district)
    
    
    
    
 