# `CensusStat` Populator
*Populates all tables(`CensusStat`, `Demographic`, `State`, and `Metric`) related to Census and CDC data.* Replaces all individual populators.

## Step 1: Set-Up

In [1]:
# Imports
import uuid
from confluent_kafka.admin import AdminClient, NewTopic
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.types import FloatType
from pyspark.sql.types import StringType
import pandas as pd

# Get config
from config import user
from config import password

ModuleNotFoundError: No module named 'confluent_kafka'

In [None]:
# Mount point through Oauth security.
storageAccount = "gen10datafund2205"
storageContainer = "group5container"
clientSecret = "-ZS8Q~NwOKfwEpVOg3Teb1pPtxDbz616XjlXLbuU"
clientid = "2ca50102-5717-4373-b796-39d06568588d"
mount_point = "/mnt/jacklynn/census" 

# Configuration dictionary
configs = {"fs.azure.account.auth.type": "OAuth",
       "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
       "fs.azure.account.oauth2.client.id": clientid,
       "fs.azure.account.oauth2.client.secret": clientSecret,
       "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/d46b54b2-a652-420b-aa5a-2ef7f8fc706e/oauth2/token",
       "fs.azure.createRemoteFileSystemDuringInitialization": "true"}

# Unmount if exists
try: 
    dbutils.fs.unmount(mount_point)
except:
    pass

# Mount to database
dbutils.fs.mount(
    source = "abfss://"+storageContainer+"@"+storageAccount+".dfs.core.windows.net/",
    mount_point = mount_point,
    extra_configs = configs)

# Table variables
database = "group5database"
user = "jacklynn"
password  = 'Peanut-Hazel-Tails-1500-Cat!!'
server = "gen10-data-fundamentals-22-05-sql-server.database.windows.net"
port = "1433"

In [None]:
%fs 
ls /mnt/jacklynn/census

path,name,size,modificationTime
dbfs:/mnt/jacklynn/census/CGM_Data.csv,CGM_Data.csv,35978185,1659468631000
dbfs:/mnt/jacklynn/census/Diabetes Prevalence in the US by State and Demographic.csv,Diabetes Prevalence in the US by State and Demographic.csv,180068,1659497875000
dbfs:/mnt/jacklynn/census/Education by state.csv,Education by state.csv,3516,1659576860000
dbfs:/mnt/jacklynn/census/ExerciseData_2013_150min.csv,ExerciseData_2013_150min.csv,1544,1659645073000
dbfs:/mnt/jacklynn/census/Food Insecurity.csv,Food Insecurity.csv,6779,1659533925000
dbfs:/mnt/jacklynn/census/Income Brackets by State.csv,Income Brackets by State.csv,4675,1659578726000
dbfs:/mnt/jacklynn/census/U.S. NHANES Survey Data.csv,U.S. NHANES Survey Data.csv,1005266,1659710519000
dbfs:/mnt/jacklynn/census/chinese-diabetes-clean.csv,chinese-diabetes-clean.csv,33367142,1659541878000


In [None]:
# Helper function: read in table
def readInTable(table_name):
    df = spark.read.format("jdbc") \
        .option("url", f"jdbc:sqlserver://{server}:{port};databaseName={database};") \
        .option("dbtable", table_name).option("user", user).option("password", password) \
        .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
        .load()
    return df

# Helper function: read in file
def readInFile(f):
    df = spark.read.options(
        inferSchema='True',
        delimiter=',',
        header='True'
        ).csv(f)
    return df

# Helper function: write in table
def saveToTable(df, table, change='append'):
    df.write.format('jdbc').option("url", f"jdbc:sqlserver://{server}:1433;databaseName={database};") \
                .mode(change) \
                .option("dbtable", table) \
                .option("user", user) \
                .option("password", password) \
                .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
                .save()

# Helper function: convert table into dictionary converter
def formDictConverter(table, key, value):
    df = readInTable(table)
    converter = dict()
    data = df.select([key, value]).distinct().toPandas()[[key, value]]
    keys = data[key].to_list()
    for this_key in keys:
        converter[this_key] = data.loc[data[key] == this_key][value].to_list()[0]
    return converter

## Step 2: Populate `Metric` Database

In [None]:
# Write function for adding metric data
def addMetrics(metrics, measurements, overwrite=False):
    
    # Get table connection already
    table_met = "dbo.Metric"
    df_met_existing = readInTable(table_met)
    
    # Get stating value
    j = 0
    if (overwrite != True):
        j = df_cats.agg({"metricID": "max"}).collect()[0]['max(metricID)'] + 1
    
    # Iterate through each metric
    data = []
    for i in range(len(metrics)):
        
        # Get this iterations metric and measurement
        metric = metrics[i]
        measurement = measurements[i]
        
        # Check to see if the metric is already in database
        if (df_met_existing.filter(df_met_existing['metric'] == metric).count() > 0 and overwrite == False):
            print(f'"{metric}" is already in database!')
        else:
            
            # Convert this metric to dictionary entry
            dict_metric = dict()
            dict_metric['metricID'] = j
            j += 1
            dict_metric['metric'] = metric
            dict_metric['unit'] = measurement
            data.append(dict_metric)
    
    # Convert metrics to dataframe
    df_metric = spark.createDataFrame(data) 
    
    # Overwrite save to SQL
    if (overwrite):
        saveToTable(df_metric, table_met, change='overwrite')
    
    # Append save to SQL
    else:
        saveToTable(df_metric, table_met)
    
    # Return dictionary with metric conversions
    return formDictConverter(table_met, 'metric', 'metricID')

In [None]:
# Add all metrics to database
metrics = ['diabetes', 'highest education', 'exercise (150+ min/wk)', 'food security', 'very low food security', 'income bracket']
measurements = ['percentage', 'percentage', 'percentage', 'percentage', 'percertage', 'percentage']
metricToID = addMetrics(metrics, measurements, overwrite=True)

## Step 3: Populate `State` Database

In [None]:
# Write function for adding states data
def addStates():
    
    # Get states data
    df_states = readInFile('/mnt/jacklynn/census/Diabetes Prevalence in the US by State and Demographic.csv')

    # Extract states data
    dict_states = []
    states = df_states.select(['LocationAbbr', 'LocationDesc']).distinct().toPandas()
    for state in states['LocationAbbr'].to_list():
        this_state = dict()
        this_state['stateID'] = state
        this_state['name'] = states.loc[states['LocationAbbr'] == state]['LocationDesc'].to_list()[0]
        dict_states.append(this_state)
    
    # Add Washington DC
    dc = dict()
    dc['stateID'] = 'DC'
    dc['name'] = 'Washington D.C.'
    dict_states.append(dc)

    # Convert to DataFrame
    df_states = spark.createDataFrame(dict_states)

    # Save state data to Demographic SQL table
    table_states = "dbo.State"
    saveToTable(df_states, table_states, change='overwrite')
    
    # Return dictionary with states conversions
    return formDictConverter(table_states, 'name', 'stateID')

In [None]:
# Add all states to database
statesToID = addStates()

# Add national and DC value
statesToID['National'] = statesToID['United States']
statesToID['District of Columbia'] = statesToID['Washington D.C.']
statesToID['U.S. total'] = statesToID['United States']
statesToID['U.S.'] = statesToID['United States']

## Part 4: Populate `Demographic` Database

In [None]:
# Get starting demoID
table_demo = "dbo.Demographic"
df_demo = readInTable(table_demo)
j = df_demo.agg({"demoID": "max"}).collect()[0]['max(demoID)'] + 1

# Write function for adding states data
def addDemographics():
    
    # Useful variable
    schema = ['demoID', 'demo_group', 'category']
    table_demo = "dbo.Demographic"
    
    # Helper function
    
    # Helper function: create category
    def createCategory(group, category):
        global j
        demoID = j
        j += 1
        return [demoID, group, category]
    
    # Helper function: remove duplicates
    def dedup(starting_list):
        used_list = readInTable("dbo.Demographic").select(['demo_group']).toPandas()['demo_group'].to_list()
        for demo in starting_list:
            if demo in used_list or demo.capitalize() in used_list or demo.lower() in used_list:
                starting_list.remove(demo)
        return starting_list
    
    # DIABETES DATA
    
    # Get diabetes prevalence demographics
    df_diabetes = readInFile('/mnt/jacklynn/census/Diabetes Prevalence in the US by State and Demographic.csv')
    
    # Get categories
    diabetes_demos = df_diabetes.select('Stratification1').distinct().toPandas()['Stratification1'].to_list()
    
    # Iterate through each one and remove those that are already in list
    diabetes_demos = dedup(diabetes_demos)
    diabetes_demos.remove('Overall') # Removed to be replaced with general class
    
    # Only if values exist save
    if (len(diabetes_demos) != 0):
    
        # Add all of demos under 'race/ethnicity' category
        diabetes_demo_map = map(lambda x: createCategory(x, 'race/ethnicity'), diabetes_demos)
        df_diabetes_demos = spark.createDataFrame(data = diabetes_demo_map, schema = schema)

        # Save to table
        saveToTable(df_diabetes_demos, table_demo)
    
    # EDUCATION DATA
    
    # Get education demographics
    df_education = readInFile('/mnt/jacklynn/census/Education by state.csv')
    
    # Get demos
    education_demos = df_education.schema.names
    for rem_cat in ['_c0', 'State', 'Total Population']:
        education_demos.remove(rem_cat)
        
    # Iterate through each one and remove those that are already in list
    education_demos = dedup(education_demos)
    
    # Only if values exist save
    if (len(education_demos) != 0):
    
        # Add all of demos under 'education level' category
        education_demo_map = map(lambda x: createCategory(x, 'education level'), education_demos)
        df_education_demos = spark.createDataFrame(data = education_demo_map, schema = schema)

        # Save to table
        saveToTable(df_education_demos, table_demo)
    
    # EXERCISE DATA (and FOOD SECURITY DATA)
    
    # Add only general category
    exercise_demos = ['General']
    exercise_demos = dedup(exercise_demos)
    
    # Only if values exist save
    if (len(exercise_demos) != 0):
    
        education_demo_map = map(lambda x: createCategory(x, 'general'), exercise_demos)
        df_exercise_demos = spark.createDataFrame(data = education_demo_map, schema = schema)

        # Save to table
        saveToTable(df_exercise_demos, table_demo)
    
    # INCOME DATA
    
    # Get income demographics
    df_income = readInFile('/mnt/jacklynn/census/Income Brackets by State.csv')
    
    # Get demos
    income_demos = df_income.schema.names
    for rem_cat in ['_c0', 'State', 'Total']:
        income_demos.remove(rem_cat)
        
    # Iterate through each one and remove those that are already in list
    income_demos = dedup(income_demos)
    
    # Only if values exist save
    if (len(income_demos) != 0):
    
        # Add all of demos under 'income bracket' category
        income_demo_map = map(lambda x: createCategory(x, 'income bracket'), income_demos)
        df_income_demos = spark.createDataFrame(data = income_demo_map, schema = schema)

        # Save to table
        saveToTable(df_income_demos, table_demo)
    
    # Return dictionary with states conversions
    return formDictConverter(table_demo, 'demo_group', 'demoID')

In [None]:
# Add all demos to database
demosToID = addDemographics()

# Add in uppercase versions of 'Female' and 'Male'
demosToID['Female'] = demosToID['female']
demosToID['Male'] = demosToID['male']

# Add 'Overall' to track to 'General'
demosToID['Overall'] = demosToID['General']

## Part 5: Populate `CensusStat` Database

In [None]:
# Helper function: rename tables
def renameCols(df, prev_cols, new_cols):
    for i in range(len(prev_cols)):
        df = df.withColumnRenamed(prev_cols[i], new_cols[i])
    return df

# Helper function: drop columns
def dropCols(df, drop_cols):
    for this_cols in drop_cols:
        try:
            df = df.drop(col(this_cols))
        except:
            df = df.drop(this_cols)
    return df

# Helper function: add columns with all the same values
def addCols(df, colNames, addValues):
    for i in range(len(colNames)):
        df = df.withColumn(colNames[i], lit(addValues[i]))
    return df

# Helper function: convert demo to demoID
def catToID(df, merge_val, cat_cols, dictionary):
    df_replace = df.select(cat_cols + merge_val).toPandas()
    for cat in cat_cols:
        df_replace =  df_replace.replace({cat: dictionary})
        try:
            df = df.drop(col(cat))
        except:
            df = df.drop(cat)
    df_replace = spark.createDataFrame(df_replace)
    df = df.join(df_replace, on=merge_val)
    return df

In [None]:
# DIABETES DATA
df_diabetes = readInFile('/mnt/jacklynn/census/Diabetes Prevalence in the US by State and Demographic.csv')

# Replace values of interest
df_diabetes = catToID(df_diabetes, ['_c0'], ['Stratification1'], demosToID)
df_diabetes = renameCols(df_diabetes, ['LocationAbbr', 'Stratification1', 'YearEnd', 'DataValue'], ['stateID', 'demoID', 'year', 'value'])
df_diabetes = dropCols(df_diabetes, ['LocationDesc', 'personID', '_c0'])
df_diabetes = df_diabetes.withColumn("metricID", lit(metricToID['diabetes']))

# Save database
table_census = "dbo.CensusStat"
saveToTable(df_diabetes, table_census, change='overwrite')

In [None]:
# EDUCATION DATA
df_education = readInFile('/mnt/jacklynn/census/Education by state.csv')

# Convert state values
df_education = catToID(df_education, ['_c0'], ['State'], statesToID)

# Get columns to iterate through
cats = df_education.schema.names
for rem_cat in ['_c0', 'State', 'Total Population']:
    cats.remove(rem_cat)

# Add each category individually
for cat in cats:
    
    # Get percentage of population
    this_df_perc = df_education.select(['Total Population', cat, 'State']).toPandas()
    population = (this_df_perc[cat].str.replace(',','').astype(int) / this_df_perc['Total Population'].str.replace(',','').astype(int) * 100)
    population = population.to_frame()
    population = pd.concat([this_df_perc['State'], population], axis=1)
    this_df = spark.createDataFrame(population, schema=['stateID', 'value']) 
    
    # Add other columns
    names = ['metricID', 'demoID', 'year']
    vals = [metricToID['highest education'], demosToID[cat], 2017]
    this_df = addCols(this_df, names, vals)

    # Add the data to SQL
    saveToTable(this_df, table_census)

In [None]:
# EXERCISE DATA
df_exercise = readInFile('/mnt/jacklynn/census/ExerciseData_2013_150min.csv')

# Drop unnecessary columns
df_exercise = dropCols(df_exercise, ['Low_Confidence_Limit', 'High_Confidence_Limit', 'Sample_Size'])

# Rename columns to correct names
df_exercise = renameCols(df_exercise, ['YearStart', 'Data_value', 'LocationDesc'], ['year', 'value', 'stateID'])

# Convert state values
df_exercise = catToID(df_exercise, ['_c0'], ['stateID'], statesToID)

# Add other columns
names = ['metricID', 'demoID']
vals = [metricToID['exercise (150+ min/wk)'], demosToID['General'], 2017]
df_exercise = addCols(df_exercise, names, vals)

# Remove unnecessary column
df_exercise = dropCols(df_exercise, ['_c0'])

# Add the data to SQL
saveToTable(df_exercise, table_census)

In [None]:
# FOOD SECURITY DATA
df_food = readInFile('/mnt/jacklynn/census/Food Insecurity.csv')

# Convert state values
df_food = catToID(df_food, ['_c0'], ['State'], statesToID)

# Rename columns to correct names
df_food = renameCols(df_food, ['Year', 'State'], ['year', 'stateID'])

# Separate data into two parts
for col in ['Food insecurity prevalence', 'Very low food security prevalence']:
    
    # Get this df
    this_df = df_food.select(['year', col, 'stateID'])
    
    # Format
    this_df = this_df.withColumnRenamed(col, 'value')
    
    # Last metrics
    this_df = this_df.withColumn("demoID", lit(demosToID['General']))
    if (col == 'Food insecurity prevalence'): 
        this_df = this_df.withColumn("metricID", lit(metricToID['food security']))
    else:
        this_df = this_df.withColumn("metricID", lit(metricToID['very low food security']))
    
    # Add the data to SQL
    saveToTable(this_df, table_census)

In [None]:
# INCOME DATA
df_income = readInFile('/mnt/jacklynn/census/Income Brackets by State.csv')

# Convert state values
df_income = catToID(df_income, ['_c0'], ['State'], statesToID)

# Get categories
cats = df_income.schema.names
for rem_cat in ['_c0', 'Total', 'State', 'Median income (dollars)']:
    cats.remove(rem_cat)

# Iterate trhoguh categories
for cat in cats:
    
    # Prepare the data
    this_df = df_income.select([cat, 'State'])
    this_df = addCols(this_df, ['metricID', 'demoID', 'year'], [metricToID['income bracket'], demosToID[cat], 2017])
    this_df = renameCols(this_df, ['State', cat], ['stateID', 'value'])
    
    # Remove percent mark
    df_replace = this_df.select(['stateID', 'year', 'value']).toPandas()
    for category in ['value']:
        df_replace['value'] = df_replace['value'].str.replace('%', '')
        df_replace['value'] = df_replace['value'].astype(float)
        try:
            this_df = this_df.drop(col('value'))
        except:
            this_df = this_df.drop('value')
    df_replace = spark.createDataFrame(df_replace)
    this_df = this_df.join(df_replace, on=['stateID', 'year'])
    
    # Add the data to SQL
    saveToTable(this_df, table_census)