In [None]:
import pyspark.sql.functions
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql.functions import *
from pyspark.sql.types import StringType,DecimalType
from pyspark.sql.functions import input_file_name, substring
from pyspark.sql.functions import isnan, when, count, col

# Importing important info
from config import database, server, k_username, k_password

# Kafka producer and consumer imports
from confluent_kafka import Producer, Consumer, KafkaError, KafkaException, TopicPartition
from time import sleep
import uuid
import json
from confluent_kafka.admin import AdminClient, NewTopic

In [None]:
###### Mount Point 1 through Oauth security.
storageAccount = "gen10datafund2207"
storageContainer = "healthcare-capstone-group3"
clientSecret = "Cty8Q~AvEO_qC-MjvPvosYauiNsffOHKnMpj7cmd"
clientid = "2ca50102-5717-4373-b796-39d06568588d"
mount_point = "/mnt/healthcare/dataIn" # the mount point will be unique to you

configs = {"fs.azure.account.auth.type": "OAuth",
       "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
       "fs.azure.account.oauth2.client.id": clientid,
       "fs.azure.account.oauth2.client.secret": clientSecret,
       "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/d46b54b2-a652-420b-aa5a-2ef7f8fc706e/oauth2/token",
       "fs.azure.createRemoteFileSystemDuringInitialization": "true"}

try: 
    dbutils.fs.unmount(mount_point)
except:
    pass

dbutils.fs.mount(
source = "abfss://"+storageContainer+"@"+storageAccount+".dfs.core.windows.net/",
mount_point = mount_point,
extra_configs = configs)

/mnt/healthcare/dataIn has been unmounted.
Out[43]: True

In [None]:
State = spark.read.options(header = 'True').csv("/mnt/healthcare/dataIn/CleanedData/State.csv").sort('StateName').toPandas()
County = spark.read.options(header = 'True').csv("/mnt/healthcare/dataIn/CleanedData/County.csv").sort('CountyName').toPandas()

**Kafka Consumer**

In [None]:
# Error function

def error_cb(err):
    """ The error callback is used for generic client errors. These
        errors are generally to be considered informational as the client will
        automatically try to recover from all errors, and no extra action
        is typically required by the application.
        For this example however, we terminate the application if the client
        is unable to connect to any broker (_ALL_BROKERS_DOWN) and on
        authentication errors (_AUTHENTICATION). """

    print("Client error: {}".format(err))
    if err.code() == KafkaError._ALL_BROKERS_DOWN or \
       err.code() == KafkaError._AUTHENTICATION:
        # Any exception raised from this callback will be re-raised from the
        # triggering flush() or poll() call.
        raise KafkaException(err)

In [None]:
#KAFKA variables, Move to the OS variables or configuration
# This will work in local Jupyter Notebook, but in a databrick, hiding config.py is tougher. 
confluentClusterName = "stage3talent"
confluentBootstrapServers = "pkc-ldvmy.centralus.azure.confluent.cloud:9092"

# Topic name
confluentTopicName = "healthcare-insurance"

schemaRegistryUrl = "https://psrc-gq7pv.westus2.azure.confluent.cloud"
confluentApiKey = "YHMHG7E54LJA55XZ"
confluentSecret = "/XYn+w3gHGMqpe9l0TWvA9FznMYNln2STI+dytyPqtZ9QktH0TbGXUqepEsJ/nR0"
confluentRegistryApiKey = "YHMHG7E54LJA55XZ"
confluentRegistrySecret = "/XYn+w3gHGMqpe9l0TWvA9FznMYNln2STI+dytyPqtZ9QktH0TbGXUqepEsJ/nR0"

#Kakfa Class Setup.
c = Consumer({
    'bootstrap.servers': confluentBootstrapServers,
    'sasl.mechanism': 'PLAIN',
    'security.protocol': 'SASL_SSL',
    'sasl.username': confluentApiKey,
    'sasl.password': confluentSecret,
    'group.id': str(uuid.uuid1()),  # this will create a new consumer group on each invocation.
    'auto.offset.reset': 'earliest',
    'error_cb': error_cb,
})

c.subscribe([confluentTopicName])

**Health Insurance Characteristics Consumer**

In [None]:
healthstring = {}
healthListDict = []

for i in range(2_520):
    try:
        message = c.poll(timeout=1.0)
        if message is None:
            break
        elif message.error():
            print("Consumer error: {}".format(message.error()))
            break
        else:
            healthstring = json.loads('{}'.format(message.value().decode('utf-8')))
            healthListDict.append(healthstring)

    except Exception as e:
        print(e)

print(json.dumps(healthListDict, indent=1, sort_keys=True))

[
 {
  "African_American": "17668",
  "American_Indian": "0",
  "Asian": "0",
  "Bachelors_Degree_or_Higher": "51187",
  "County": "Baldwin County",
  "Female": "113950",
  "Foreign_Born": "11002",
  "High_School_or_Equivalet": "39988",
  "Hispanic": "10517",
  "Less_than_High_School": "14600",
  "Male": "106961",
  "Native_Born": "209909",
  "Naturalized": "4495",
  "Not_a_Citizen": "6507",
  "Over_100000S": "76781",
  "Pacific_Islander": "0",
  "Population_Category": "Total",
  "Some_College": "49486",
  "Some_Other_Race": "0",
  "State": "Alabama",
  "Total_Population": "220911",
  "Under_25000S": "28456",
  "Under_6_Y": "13169",
  "White": "189325",
  "_19_to_25_Y": "16549",
  "_25000_to_49999S": "52417",
  "_26_to_34_Y": "21835",
  "_35_to_44_Y": "27872",
  "_45_to_54_Y": "27174",
  "_50000_to_74999S": "26888",
  "_55_to_64_Y": "31542",
  "_65_to_74_Y": "28682",
  "_6_to_18_Y": "35932",
  "_75000_to_99999S": "35705",
  "_75_and_Older": "18156"
 },
 {
  "African_American": "16020",

In [None]:
health = spark.createDataFrame(healthListDict).select('State', 'County', 'Population_Category', 'Total_Population', 'Under_6_Y', '_6_to_18_Y', '_19_to_25_Y', '_26_to_34_Y', '_35_to_44_Y', '_45_to_54_Y', '_55_to_64_Y', '_65_to_74_Y', '_75_and_Older', 'Male', 'Female', 'White', 'African_American', 'American_Indian', 'Asian', 'Pacific_Islander', 'Some_Other_Race', 'Hispanic', 'Native_Born', 'Foreign_Born', 'Naturalized', 'Not_a_Citizen', 'Less_than_High_School', 'High_School_or_Equivalet', 'Some_College', 'Bachelors_Degree_or_Higher', 'Under_25000S', '_25000_to_49999S', '_50000_to_74999S', '_75000_to_99999S', 'Over_100000S').toPandas()

*Replacing State and County name with State and County ID from tables*

In [None]:
for i in State.StateName:
    health['State'].replace(i, int(State.loc[State['StateName'] == i, 'StateID']), inplace = True)

for i in County.CountyName:
    health['County'].replace(i, int(County.loc[County['CountyName'] == i, 'CountyID']), inplace = True)

# Renaming the columns
health.columns = ['StateID', 'CountyID', 'Insurance_Category', 'Total_Population',
       'Under_6Y', '_6_to_18Y', '_19_to_25Y', '_26_to_34Y', '_35_to_44Y',
       '_45_to_54Y', '_55_to_64Y', '_65_to_74Y', '_75_and_Older', 'Male',
       'Female', 'White', 'African_American', 'American_Indian', 'Asian',
       'Pacific_Islander', 'Some_Other_Race', 'Hispanic', 'Native_Born',
       'Foreign_Born', 'Naturalized', 'Not_a_Citizen', 'Less_than_High_School',
       'High_School_or_Equivalent', 'Some_College',
       'Bachelors_or_Higher', 'Under_25000S', '_25000_to_49999S',
       '_50000_to_74999S', '_75000_to_99999S', 'Over_100000S']

# List of columns to be changed to integers
columns = ['Total_Population',
       'Under_6Y', '_6_to_18Y', '_19_to_25Y', '_26_to_34Y', '_35_to_44Y',
       '_45_to_54Y', '_55_to_64Y', '_65_to_74Y', '_75_and_Older', 'Male',
       'Female', 'White', 'African_American', 'American_Indian', 'Asian',
       'Pacific_Islander', 'Some_Other_Race', 'Hispanic', 'Native_Born',
       'Foreign_Born', 'Naturalized', 'Not_a_Citizen', 'Less_than_High_School',
       'High_School_or_Equivalent', 'Some_College',
       'Bachelors_or_Higher', 'Under_25000S', '_25000_to_49999S',
       '_50000_to_74999S', '_75000_to_99999S', 'Over_100000S']

for i in columns:
    health[i] = health[i].astype('float')

healthID = []
for i in range(1, health.shape[0]+1):
    healthID.append(i)

health2 = health.sort_values(by = ['StateID', 'CountyID']).copy()

health2['HealthID'] = healthID
health2 = health2[['HealthID', 'StateID', 'CountyID', 'Insurance_Category', 'Total_Population',
       'Under_6Y', '_6_to_18Y', '_19_to_25Y', '_26_to_34Y', '_35_to_44Y',
       '_45_to_54Y', '_55_to_64Y', '_65_to_74Y', '_75_and_Older', 'Male',
       'Female', 'White', 'African_American', 'American_Indian', 'Asian',
       'Pacific_Islander', 'Some_Other_Race', 'Hispanic', 'Native_Born',
       'Foreign_Born', 'Naturalized', 'Not_a_Citizen', 'Less_than_High_School',
       'High_School_or_Equivalent', 'Some_College',
       'Bachelors_or_Higher', 'Under_25000S', '_25000_to_49999S',
       '_50000_to_74999S', '_75000_to_99999S', 'Over_100000S']]

health = spark.createDataFrame(health)
health2 = spark.createDataFrame(health2)

In [None]:
States = spark.createDataFrame(State)
Counties = spark.createDataFrame(County)

**SQL Connection**

In [None]:
table = 'dbo.County'

CountyCheck = spark.read.format("jdbc") \
    .option("url", f"jdbc:sqlserver://{server}:1433;databaseName={database};") \
    .option("dbtable", table) \
    .option("user", k_username) \
    .option("password", k_password) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .load()

if CountyCheck.count() != 1_999: # Total number of rows in dataset
    Counties.select('CountyName').sort('CountyName').write.format('jdbc').option("url", f"jdbc:sqlserver://{server}:1433;databaseName={database};") \
        .mode("append") \
        .option("dbtable", table) \
        .option("user", k_username) \
        .option("password", k_password) \
        .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
        .save()

In [None]:
table = "dbo.State"

StateCheck = spark.read.format("jdbc") \
    .option("url", f"jdbc:sqlserver://{server}:1433;databaseName={database};") \
    .option("dbtable", table) \
    .option("user", k_username) \
    .option("password", k_password) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .load()

if StateCheck.count() != 56: # Total number of rows in dataset
    States.select('StateName').sort('StateName').write.format('jdbc').option("url", f"jdbc:sqlserver://{server}:1433;databaseName={database};") \
        .mode("append") \
        .option("dbtable", table) \
        .option("user", k_username) \
        .option("password", k_password) \
        .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
        .save()

In [None]:
table = "dbo.HealthInsurance"

healthNew = spark.read.format("jdbc") \
    .option("url", f"jdbc:sqlserver://{server}:1433;databaseName={database};") \
    .option("dbtable", table) \
    .option("user", k_username) \
    .option("password", k_password) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .load()

# Adding the first 
if healthNew.count() != 2520: # Total number of rows in original dataset
    health.sort('StateID', 'CountyID').write.format('jdbc').option("url", f"jdbc:sqlserver://{server}:1433;databaseName={database};") \
        .mode("append") \
        .option("dbtable", table) \
        .option("user", k_username) \
        .option("password", k_password) \
        .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
        .save()

else:
    health2.sort('StateID', 'CountyID').write.format('jdbc').option("url", f"jdbc:sqlserver://{server}:1433;databaseName={database};") \
        .mode("overwrite") \
        .option("dbtable", table) \
        .option("user", k_username) \
        .option("password", k_password) \
        .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
        .save()