# Consumer: 2021 Air Quality USA by County

In [0]:
# Create a mount point
# Creating a mount point to write to

storageAccount = "gen10datafund2111"
storageContainer = "jadr-health-insights"
clientSecret = dbutils.secrets.get(scope = "jadr_blob", key = "clientSecret")
clientid = dbutils.secrets.get(scope = "jadr_blob", key = "clientid")
mount_point="/mnt/jadr"

configs = {"fs.azure.account.auth.type": "OAuth",
   "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
   "fs.azure.account.oauth2.client.id": clientid,
   "fs.azure.account.oauth2.client.secret": clientSecret, 
   "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/d46b54b2-a652-420b-aa5a-2ef7f8fc706e/oauth2/token",
   "fs.azure.createRemoteFileSystemDuringInitialization": "true"}

try:
    dbutils.fs.unmount(mount_point)
except:
    pass

dbutils.fs.mount(source = "abfss://"+storageContainer+"@"+storageAccount+".dfs.core.windows.net/", mount_point = mount_point, extra_configs = configs)
display(dbutils.fs.ls("/mnt/jadr"))

path,name,size,modificationTime
dbfs:/mnt/jadr/Data/,Data/,0,1643742636000
dbfs:/mnt/jadr/ML-Models/,ML-Models/,0,1643906451000
dbfs:/mnt/jadr/deleteme.txt,deleteme.txt,8,1643742578000


In [0]:
# Error Callback Functions
def error_cb(err):
    """ The error callback is used for generic client errors. These
        errors are generally to be considered informational as the client will
        automatically try to recover from all errors, and no extra action
        is typically required by the application.
        For this example however, we terminate the application if the client
        is unable to connect to any broker (_ALL_BROKERS_DOWN) and on
        authentication errors (_AUTHENTICATION). """

    print("Client error: {}".format(err))
    if err.code() == KafkaError._ALL_BROKERS_DOWN or \
       err.code() == KafkaError._AUTHENTICATION:
        # Any exception raised from this callback will be re-raised from the
        # triggering flush() or poll() call.
        raise KafkaException(err)

        
        
# Error Consumer Setup
from confluent_kafka import Consumer
from time import sleep
import uuid
from confluent_kafka import Producer, Consumer, KafkaError, KafkaException
import json



#KAFKA variables, Move to the OS variables or configuration
# This will work in local Jupiter Notebook, but in a databrick, hiding config.py is tougher. 
confluentClusterName = "stage3talent"
confluentBootstrapServers = "pkc-ldvmy.centralus.azure.confluent.cloud:9092"
confluentTopicName = "jadr-AQI"
schemaRegistryUrl = "https://psrc-gq7pv.westus2.azure.confluent.cloud"
confluentApiKey = dbutils.secrets.get(scope = "jadr_blob", key = "confluentApiKey")
confluentSecret = dbutils.secrets.get(scope = "jadr_blob", key = "confluentSecret")
confluentRegistryApiKey = dbutils.secrets.get(scope = "jadr_blob", key = "confluentRegistryApiKey")
confluentRegistrySecret = dbutils.secrets.get(scope = "jadr_blob", key = "confluentRegistrySecret")



#Kakfa Class Setup.
c = Consumer({
    'bootstrap.servers': confluentBootstrapServers,
    'sasl.mechanism': 'PLAIN',
    'security.protocol': 'SASL_SSL',
    'sasl.username': confluentApiKey,
    'sasl.password': confluentSecret,
    'group.id': str(uuid.uuid1()),  # this will create a new consumer group on each invocation.
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': True,
    'error_cb': error_cb,
})

c.subscribe([confluentTopicName])

In [0]:
# CONSUME

aString = {}

kafkaListDictionaries = []

while(True):
    try:
        msg = c.poll(timeout=1.0)
        if msg is None:
            break
        elif msg.error():
            print("Consumer error: {}".format(msg.error()))
            break
        else:
            aString=json.loads('{}'.format(msg.value().decode('utf-8')))
            kafkaListDictionaries.append(aString)
            c.commit()
    except Exception as e:
        print(e)

### Clean Kafka Data

In [0]:
# Convert list of dictionaries to pyspark dataframe. Remove duplicates.
df = spark.createDataFrame(kafkaListDictionaries)
df = df.distinct()

### Transform Kafka Data

In [0]:
# The dataset contains multiple measurements for each pollutant using different methods and metrics. We will narrow down the measurements used so we can get only one per site per year.
parameters = ['PM10 Total 0-10um STP',
              'Lead (TSP) LC',
              'PM2.5 - Local Conditions',
              'Sulfur dioxide',
              'Ozone',
              'Nitrogen dioxide (NO2)']
preferredMethods = ['Hi-Vol - ICAP SPECTRA (ICP-MS); 0.45M HNO3 Boil30 min',
                    'INSTRUMENTAL - GAS PHASE CHEMILUMINESCENCE',
                   'INSTRUMENTAL - CHEMILUMINESCENCE',
                   'INSTRUMENTAL - ULTRA VIOLET ABSORPTION',
                   'INSTRUMENTAL - ULTRA VIOLET',
                   'Andersen RAAS2.5-300 PM2.5 SEQ w/WINS - GRAVIMETRIC',
                   'R & P Model 2025 PM-2.5 Sequential Air Sampler w/VSCC - Gravimetric',
                   'Multiple Methods Used',
                   'INSTRUMENTAL - Pulsed Fluorescent 43C-TLE/43i-TLE']
preferredMetrics = ['Daily Maximum 1-hour average',
                    'Daily maxima of observed hourly values (between 9:00 AM and 8:00 PM)',
                    'Daily Mean',
                    'Daily maximum 1-hour average',
                   'Observed Values']
df = df.filter(df.parameter.isin(parameters))
df = df.filter(df.method.isin(preferredMethods))
df = df.filter(df.metric_used.isin(preferredMetrics))

# Handle bringing in some SO2 types we don't want with 'Observed Values'
df = df.filter(~((df.parameter == 'Sulfur dioxide') & 
    (df.metric_used == 'Observed Values')))

In [0]:
# Convert rows of different measurements per county to columns. Should end up with one row per county per year with columns for every measurement
from pyspark.sql import functions as F
AQIDFAgg = df.groupBy( 'state', 'county','year','parameter').pivot('parameter').agg(F.mean('arithmetic_mean'),
                              F.mean('first_max_value'),
                              F.mean('ninety_ninth_percentile'), 
                              F.mean('standard_deviation'),
                              F.mean('second_max_value'),
                              F.first('method'),
                              F.first('metric_used'),                            
                              F.first('units_of_measure'))

AQIDFAgg = AQIDFAgg.withColumnRenamed("PM2.5 - Local Conditions_avg(ninety_ninth_percentile)","PM25 - Local Conditions_avg(ninety_ninth_percentile)")
AQIDFAgg = AQIDFAgg.withColumnRenamed("PM2.5 - Local Conditions_avg(arithmetic_mean)","PM25 - Local Conditions_avg(arithmetic_mean)")
AQIDFAgg = AQIDFAgg.withColumnRenamed("PM2.5 - Local Conditions_avg(first_max_value)","PM25 - Local Conditions_avg(first_max_value)")
AQIDFAgg = AQIDFAgg.withColumnRenamed("PM2.5 - Local Conditions_avg(standard_deviation)","PM25 - Local Conditions_avg(standard_deviation)")
AQIDFAgg = AQIDFAgg.withColumnRenamed("PM2.5 - Local Conditions_avg(second_max_value)","PM25 - Local Conditions_avg(second_max_value)")
AQIDFAgg = AQIDFAgg.withColumnRenamed("PM2.5 - Local Conditions_first(method)","PM25 - Local Conditions_first(method)")
AQIDFAgg = AQIDFAgg.withColumnRenamed("PM2.5 - Local Conditions_first(metric_used)","PM25 - Local Conditions_first(metric_used)")
AQIDFAgg = AQIDFAgg.withColumnRenamed("PM2.5 - Local Conditions_first(units_of_measure)","PM25 - Local Conditions_first(units_of_measure)")

AQIDFAgg2 = AQIDFAgg.groupBy('state','county','year').agg(F.first('Lead (TSP) LC_avg(arithmetic_mean)', ignorenulls=True).alias('LeadMean'),
                             F.first('Lead (TSP) LC_avg(first_max_value)', ignorenulls=True).alias('Lead1stMax'),
                              F.first('Lead (TSP) LC_avg(ninety_ninth_percentile)', ignorenulls=True).alias('Lead99perc'), 
                              F.first('Lead (TSP) LC_avg(standard_deviation)', ignorenulls=True).alias('LeadStd'),
                              F.first('Lead (TSP) LC_avg(second_max_value)', ignorenulls=True).alias('Lead2ndMax'),
                              F.first('Lead (TSP) LC_first(method)', ignorenulls=True).alias('LeadMethod'),
                              F.first('Lead (TSP) LC_first(metric_used)', ignorenulls=True).alias('LeadMetric'),                            
                              F.first('Lead (TSP) LC_first(units_of_measure)', ignorenulls=True).alias('LeadUnits'),
                              F.first('Nitrogen dioxide (NO2)_avg(arithmetic_mean)', ignorenulls=True).alias('NO2Mean'),
                              F.first('Nitrogen dioxide (NO2)_avg(first_max_value)', ignorenulls=True).alias('NO21stMax'),
                              F.first('Nitrogen dioxide (NO2)_avg(ninety_ninth_percentile)', ignorenulls=True).alias('NO299perc'),
                              F.first('Nitrogen dioxide (NO2)_avg(standard_deviation)', ignorenulls=True).alias('NO2Std'),
                              F.first('Nitrogen dioxide (NO2)_avg(second_max_value)', ignorenulls=True).alias('NO22ndMax'),
                              F.first('Nitrogen dioxide (NO2)_first(method)', ignorenulls=True).alias('NO2Method'),
                              F.first('Nitrogen dioxide (NO2)_first(metric_used)', ignorenulls=True).alias('NO2Metric'),
                              F.first('Nitrogen dioxide (NO2)_first(units_of_measure)', ignorenulls=True).alias('NO2Units'),
                              F.first('Ozone_avg(arithmetic_mean)', ignorenulls=True).alias('OzoneMean'),
                              F.first('Ozone_avg(first_max_value)', ignorenulls=True).alias('Ozone1stMax'),
                              F.first('Ozone_avg(ninety_ninth_percentile)', ignorenulls=True).alias('Ozone99perc'),
                              F.first('Ozone_avg(standard_deviation)', ignorenulls=True).alias('OzoneStd'),
                              F.first('Ozone_avg(second_max_value)', ignorenulls=True).alias('Ozone2ndMax'),
                              F.first('Ozone_first(method)', ignorenulls=True).alias('OzoneMethod'),
                              F.first('Ozone_first(metric_used)', ignorenulls=True).alias('OzoneMetric'),
                              F.first('Ozone_first(units_of_measure)', ignorenulls=True).alias('OzoneUnits'),
                              F.first('PM10 Total 0-10um STP_avg(arithmetic_mean)', ignorenulls=True).alias('PM10Mean'),
                              F.first('PM10 Total 0-10um STP_avg(first_max_value)', ignorenulls=True).alias('PM101stMax'),
                              F.first('PM10 Total 0-10um STP_avg(ninety_ninth_percentile)', ignorenulls=True).alias('PM1099perc'),
                              F.first('PM10 Total 0-10um STP_avg(standard_deviation)', ignorenulls=True).alias('PM10Std'),
                              F.first('PM10 Total 0-10um STP_avg(second_max_value)', ignorenulls=True).alias('PM102ndMax'),
                              F.first('PM10 Total 0-10um STP_first(method)', ignorenulls=True).alias('PM10Method'),
                              F.first('PM10 Total 0-10um STP_first(metric_used)', ignorenulls=True).alias('PM10Metric'),
                              F.first('PM10 Total 0-10um STP_first(units_of_measure)', ignorenulls=True).alias('PM10Units'),
                              F.first('PM25 - Local Conditions_avg(arithmetic_mean)', ignorenulls=True).alias('PM25Mean'),
                              F.first('PM25 - Local Conditions_avg(first_max_value)', ignorenulls=True).alias('PM251stMax'),
                              F.first('PM25 - Local Conditions_avg(ninety_ninth_percentile)', ignorenulls=True).alias('PM2599perc'),
                              F.first('PM25 - Local Conditions_avg(standard_deviation)', ignorenulls=True).alias('PM25Std'),
                              F.first('PM25 - Local Conditions_avg(second_max_value)', ignorenulls=True).alias('PM252ndMax'),
                              F.first('PM25 - Local Conditions_first(method)', ignorenulls=True).alias('PM25Method'),
                              F.first('PM25 - Local Conditions_first(metric_used)', ignorenulls=True).alias('PM25Metric'),
                              F.first('PM25 - Local Conditions_first(units_of_measure)', ignorenulls=True).alias('PM25Units'),
                              F.first('Sulfur dioxide_avg(arithmetic_mean)', ignorenulls=True).alias('SO2Mean'),                                     
                              F.first('Sulfur dioxide_avg(first_max_value)', ignorenulls=True).alias('SO21stMax'),
                              F.first('Sulfur dioxide_avg(ninety_ninth_percentile)', ignorenulls=True).alias('SO299perc'),
                              F.first('Sulfur dioxide_avg(standard_deviation)', ignorenulls=True).alias('SO2Std'),
                              F.first('Sulfur dioxide_avg(second_max_value)', ignorenulls=True).alias('SO22ndMax'),
                              F.first('Sulfur dioxide_first(method)', ignorenulls=True).alias('SO2Method'),
                              F.first('Sulfur dioxide_first(metric_used)', ignorenulls=True).alias('SO2Metric'),
                              F.first('Sulfur dioxide_first(units_of_measure)', ignorenulls=True).alias('SO2Units'))

### Load Kafka Data (to DataLake)

In [0]:
# Write to the DataLake
AQIDFAgg2.write.mode("append").json('/mnt/jadr/Data/aqi_stream')