# Populating our SQL Database

**Abstract**

This notebook contains the code that we used to load our data (uploaded to a blob container on Azure upon transformation) to our SQL database. Any sensitive information has been removed from this notebook and replace with ''. This notebook was first generated in Azure Databricks and then exported to this Jupyter notebook. To be executed, this notebook must be exported back to Databricks. 

### Connection Strings to Database

In [None]:
database = ''
user = ''
password = ''
server = ''

### Creating a Mount Point to a Blob Container

In [None]:
### Mount Point through Oauth security.
client_id = ''
client_secret = ''
tenant_id = ''

storage_account = ''
storage_container = ''

### Name of the mount point
mount_point = "/mnt/data_vikings"

configs = {"fs.azure.account.auth.type": "OAuth",
       "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
       "fs.azure.account.oauth2.client.id": client_id,
       "fs.azure.account.oauth2.client.secret": client_secret,
       "fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{tenant_id}/oauth2/token",
       "fs.azure.createRemoteFileSystemDuringInitialization": "true"}

try: 
    dbutils.fs.unmount(mount_point)
except:
    pass

dbutils.fs.mount(
source = f"abfss://{storage_container}@{storage_account}.dfs.core.windows.net/",
mount_point = mount_point,
extra_configs = configs)

In [None]:
### Checking to see what files are in the blob container

display(dbutils.fs.ls("/mnt/data-vikings"))

### Reading in Data

**State Table**

In [None]:
### Reading in the file and storing the data in a dataframe

state_poverty_df = spark.read.csv('/mnt/data-vikings/state_poverty.csv', header = True, inferSchema= True)

In [None]:
### 1. Selecting the 'State' column
### 2. Dropping duplicates
### 3. Sorting the states in alphabetical order
### 4. Renaming the column

state_df = state_poverty_df.select('STATE').dropDuplicates().sort('STATE').withColumnRenamed('STATE', 'state_name')

**Occupation Group Table**

In [None]:
### Reading in the file and storing the data in a dataframe

national_employment_df = spark.read.csv('/mnt/data-vikings/national_salary.csv', header = True, inferSchema= True)

In [None]:
### 1. Selecting 'occupation group' column
### 2. Pulling out the unique values
### 3. Dropping the null value
### 4. Renaming the column

occ_group_df = national_employment_df.select('OCC_GROUP').distinct().dropna().withColumnRenamed('OCC_GROUP', 'occupation_group')

**National Poverty Table**

In [None]:
### 1. Reading in the file and storing data in a dataframe
### 2. Renaming columns

national_poverty_df = spark.read.csv('/mnt/data-vikings/national_poverty.csv', header = True, inferSchema= True)
national_poverty_df = national_poverty_df.withColumnRenamed('PR_ALL', 'pr_all').withColumnRenamed('PR_YOUTH', 'pr_youth').withColumnRenamed('MED_HH_INCOME', 'med_hh_income').withColumnRenamed('YEAR', 'year')

**State Poverty Table**

In [None]:
### Reading in the file and storing the data in a dataframe

state_poverty_df = spark.read.csv('/mnt/data-vikings/state_poverty.csv', header = True, inferSchema= True)

In [None]:
### Joining the state poverty data frame with the state dataframe

state_poverty_df = state_poverty_df.join(state_df, [state_poverty_df.STATE == state_df.state_name], 'inner')

In [None]:
### 1. Dropping columns
### 2. Renaming columns

state_poverty_df = state_poverty_df.drop('STATE', 'state_name').withColumnRenamed('PR_ALL', 'pr_all').withColumnRenamed('PR_YOUTH', 'pr_youth').withColumnRenamed('MED_HH_INCOME', 'med_hh_income').withColumnRenamed('YEAR', 'year')

**County Poverty Table**

In [None]:
### Reading in file and storing the data in a dataframe

county_poverty_df = spark.read.csv('/mnt/data-vikings/county_poverty.csv', header = True, inferSchema= True)

In [None]:
### Joining the county poverty dataframe with the state dataframe

county_poverty_df = county_poverty_df.join(state_df, [county_poverty_df.STATE_NAME == state_df.state_name], 'inner')

In [None]:
### 1. Dropping columns
### 2. Renaming columns

county_poverty_df = county_poverty_df.drop('state_name').withColumnRenamed('YEAR', 'year').withColumnRenamed('PR_ALL', 'pr_all').withColumnRenamed('PR_YOUTH', 'pr_youth').withColumnRenamed('MED_HH_INCOME', 'med_hh_income').withColumnRenamed('COUNTY_FULL', 'county').withColumnRenamed('LAT', 'latitude').withColumnRenamed('LNG', 'longitude').withColumnRenamed('POPULATION', 'population')

**National Employment Table**

In [None]:
### Reading in file and storing the data in a dataframe

national_employment_df = spark.read.csv('/mnt/data-vikings/national_salary.csv', header = True, inferSchema= True)

In [None]:
### Joining the national employment dataframe with occupation group dataframe

national_employment_df = national_employment_df.join(occupation_group_df, [national_employment_df.OCC_GROUP == occupation_group_df.occupation_group], 'left')

In [None]:
### 1. Dropping columns
### 2. Renaming columns

national_employment_df = national_employment_df.drop('OCC_GROUP', 'occupation_group').withColumnRenamed('OCC_TITLE', 'occ_title').withColumnRenamed('TOT_EMP', 'tot_emp').withColumnRenamed('H_MEAN', 'h_mean').withColumnRenamed('A_MEAN', 'a_mean').withColumnRenamed('H_MEDIAN', 'h_median').withColumnRenamed('A_MEDIAN', 'a_median').withColumnRenamed('YEAR', 'year')

In [None]:
### 1. Reading in file and storing data in a dataframe
### 2. Renaming columns

state_employment_df = spark.read.csv('/mnt/data-vikings/state_salary.csv', header = True, inferSchema= True)
state_employment_df = state_employment_df.withColumnRenamed('AREA', 'area').withColumnRenamed('OCC_TITLE', 'occ_title').withColumnRenamed('TOT_EMP', 'tot_emp').withColumnRenamed('JOBS_1000', 'jobs_1000').withColumnRenamed('LOC_QUOTIENT', 'loc_quotient').withColumnRenamed('H_MEAN', 'h_mean').withColumnRenamed('A_MEAN', 'a_mean').withColumnRenamed('H_MEDIAN', 'h_median').withColumnRenamed('A_MEDIAN', 'a_median').withColumnRenamed('YEAR', 'year')

In [None]:
### Joining state employment dataframe with state dataframe

state_employment_df = state_employment_df.join(state_df, [state_employment_df.STATE == state_df.state_name], 'left')

In [None]:
### Joining state employment dataframe with occupation group dataframe

state_employment_df = state_employment_df.join(occupation_group_df, [state_employment_df.OCC_GROUP == occupation_group_df.occupation_group], 'left')

In [None]:
### Dropping columns

state_employment_df = state_employment_df.drop('STATE', 'OCC_GROUP', 'state_name', 'occupation_group')

**Poverty Threshold Table**

In [None]:
### Reading in file and storing data in a dataframe

poverty_threshold_df = spark.read.csv('/mnt/data-vikings/poverty_threshold.csv', header = True, inferSchema= True)

In [None]:
### Renaming columns

poverty_threshold_df = poverty_threshold_df.withColumnRenamed('1 Person' , 'amount_for_one_person')

### Writing Data to SQL Database

In [None]:
table1 = 'dbo.state'
table2 = 'dbo.occupation_group'
table3 = 'dbo.national_poverty'
table4 = 'dbo.state_poverty'
table5 = 'dbo.county_poverty'
table6 = 'dbo.national_employment'
table7 = 'dbo.state_employment'
table8 = 'dbo.poverty_threshold'

In [None]:
state_df.write.format("jdbc").option(
    "url", f"jdbc:sqlserver://{server}:1433;databaseName={database};"
    ) \
    .mode("append") \
    .option("dbtable", table1) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .save()

occ_group_df.write.format("jdbc").option(
    "url", f"jdbc:sqlserver://{server}:1433;databaseName={database};"
    ) \
    .mode("append") \
    .option("dbtable", table1) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .save()

national_poverty_df.write.format("jdbc").option(
    "url", f"jdbc:sqlserver://{server}:1433;databaseName={database};"
    ) \
    .mode("append") \
    .option("dbtable", table3) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .save()

state_poverty_df.write.format("jdbc").option(
    "url", f"jdbc:sqlserver://{server}:1433;databaseName={database};"
    ) \
    .mode("append") \
    .option("dbtable", table4) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .save()

county_poverty_df.write.format("jdbc").option(
    "url", f"jdbc:sqlserver://{server}:1433;databaseName={database};"
    ) \
    .mode("append") \
    .option("dbtable", table5) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .save()

national_employment_df.write.format("jdbc").option(
    "url", f"jdbc:sqlserver://{server}:1433;databaseName={database};"
    ) \
    .mode("append") \
    .option("dbtable", table6) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .save()

state_employment_df.write.format("jdbc").option(
    "url", f"jdbc:sqlserver://{server}:1433;databaseName={database};"
    ) \
    .mode("append") \
    .option("dbtable", table5) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .save()

poverty_threshold_df.write.format("jdbc").option(
    "url", f"jdbc:sqlserver://{server}:1433;databaseName={database};"
    ) \
    .mode("append") \
    .option("dbtable", table8) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .save()