## Notebook 2
## Read and Write data to Azure SQL Managed Instance

This notebook emphasizes the connectivity, read and write operations to Azure SQL Managed Instance using the [SQL Spark connector](https://github.com/microsoft/sql-spark-connector).<br> This demo uses a Business Critical SQL MI with a readable secondary replica.

> For the purpose of this demo, the connectivity to SQL MI is
> established using the **public endpoint**. Port 3342 needs to be specified in this case.<br> For SQL MI connectivity without
> public endpoints, consider [injecting Databricks into SQL MI
> VNET](https://docs.microsoft.com/en-us/azure/databricks/administration-guide/cloud-configurations/azure/vnet-inject)
> or alternatively [connecting with VPN gateway or VNET
> peering](https://docs.microsoft.com/en-us/azure/azure-sql/managed-instance/connect-application-instance).

Import all the necessary Spark SQL functions used in this notebook for data transformation

In [3]:
from pyspark.sql.functions import to_date, concat_ws

The SQL MI connection and credential information are stored as secrets in ***Azure Key Vault***. The following snippet retrieves the secrets and stores them in variables.<br>
> The secrets are always Redacted when displayed in the notebook

In [5]:
sqlmiconnection = dbutils.secrets.get(scope = "sqlmi-kv-secrets", key = "sqlmiconn")
sqlmiuser = dbutils.secrets.get(scope = "sqlmi-kv-secrets", key = "sqlmiuser")
sqlmipwd = dbutils.secrets.get(scope = "sqlmi-kv-secrets", key = "sqlmipwd")
dbname = "Covid19datamart"
servername = "jdbc:sqlserver://" + sqlmiconnection
database_name = dbname
url = servername + ";" + "database_name=" + dbname + ";"

Read *DimCountry* table from SQL MI's ***ReadOnly replica*** and save it in a dataframe for transformations later in this notebook.

In [7]:
table_name = "[Covid19datamart].[dbo].[DimCountry]"

try:
  dfCountry = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("applicationintent", "ReadOnly") \
        .option("user", sqlmiuser) \
        .option("port", 3342) \
        .option("password", sqlmipwd).load()
except ValueError as error :
    print("Connector read failed", error)

display(dfCountry)

DimCountryPK,CountryName,GeoID,CountryCode,Population,Continent,CountryISOCode
1,Afghanistan,AF,AFG,37172386,Asia,AF
2,Albania,AL,ALB,2866376,Europe,AL
3,Algeria,DZ,DZA,42228429,Africa,DZ
4,Andorra,AD,AND,77006,Europe,AD
5,Angola,AO,AGO,30809762,Africa,AO
6,Anguilla,AI,,0,America,AI
7,Antigua_and_Barbuda,AG,ATG,96286,America,AG
8,Argentina,AR,ARG,44494502,America,AR
9,Armenia,AM,ARM,2951776,Europe,AM
10,Aruba,AW,ABW,105845,America,AW


Read *StagingPredictedCovid19* table from SQL MI's ***ReadOnly replica*** and save it in a dataframe for transformations below. <br>This table contains the predicted values from the previous notebook.

In [9]:
table_name = "[Covid19datamart].[dbo].[StagingPredictedCovid19]"

try:
  dfStagingPredict = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("applicationintent", "ReadOnly") \
        .option("user", sqlmiuser) \
        .option("port", 3342) \
        .option("password", sqlmipwd).load().drop("Population")
except ValueError as error :
    print("Connector read failed", error)

display(dfStagingPredict)

Day,Month,Year,Cases,Deaths,Prediction,CountryName,CountryMLIndex
24,6,2020,338,20,12,Afghanistan,62
23,6,2020,310,17,9,Afghanistan,62
22,6,2020,409,12,9,Afghanistan,62
21,6,2020,546,21,10,Afghanistan,62
20,6,2020,346,2,15,Afghanistan,62
19,6,2020,658,42,19,Afghanistan,62
18,6,2020,564,13,8,Afghanistan,62
17,6,2020,783,13,13,Afghanistan,62
16,6,2020,761,7,9,Afghanistan,62
15,6,2020,664,20,14,Afghanistan,62


Join the predicted data with country data (left join) to create a *denormalised dataset* for reporting and visualisation.

In [11]:
dfJoinedPredict = dfStagingPredict.join(dfCountry, dfStagingPredict.CountryName == dfCountry.CountryName,how='left').drop(dfCountry.CountryName)
display(dfJoinedPredict)

Day,Month,Year,Cases,Deaths,Prediction,CountryName,CountryMLIndex,DimCountryPK,GeoID,CountryCode,Population,Continent,CountryISOCode
24,6,2020,2,0,0,Chad,159,41,TD,TCD,15477751,Africa,TD
23,6,2020,0,0,0,Chad,159,41,TD,TCD,15477751,Africa,TD
22,6,2020,0,0,0,Chad,159,41,TD,TCD,15477751,Africa,TD
21,6,2020,0,0,0,Chad,159,41,TD,TCD,15477751,Africa,TD
20,6,2020,4,0,0,Chad,159,41,TD,TCD,15477751,Africa,TD
19,6,2020,0,0,0,Chad,159,41,TD,TCD,15477751,Africa,TD
18,6,2020,1,0,0,Chad,159,41,TD,TCD,15477751,Africa,TD
17,6,2020,3,1,0,Chad,159,41,TD,TCD,15477751,Africa,TD
16,6,2020,0,0,0,Chad,159,41,TD,TCD,15477751,Africa,TD
15,6,2020,2,1,0,Chad,159,41,TD,TCD,15477751,Africa,TD


Include a date column with concatenated values from day, month and year;<br>
Cleanup the final dataset by dropping unwanted columns that are not required for reporting;

In [13]:
dfResult = dfJoinedPredict.withColumn("DateRecorded", to_date(concat_ws("-",dfJoinedPredict.Year,dfJoinedPredict.Month,dfJoinedPredict.Day)).cast('timestamp')) \
  .withColumnRenamed("Prediction","PredictedDeaths") \
  .drop("CountryMLIndex") \
  .drop("LoadDate") \
  .drop("DimCountryPK") \
  .drop("GeoID")

display(dfResult)

Day,Month,Year,Cases,Deaths,PredictedDeaths,CountryName,CountryCode,Population,Continent,CountryISOCode,DateRecorded
24,6,2020,2,0,0,Chad,TCD,15477751,Africa,TD,2020-06-24T00:00:00.000+0000
23,6,2020,0,0,0,Chad,TCD,15477751,Africa,TD,2020-06-23T00:00:00.000+0000
22,6,2020,0,0,0,Chad,TCD,15477751,Africa,TD,2020-06-22T00:00:00.000+0000
21,6,2020,0,0,0,Chad,TCD,15477751,Africa,TD,2020-06-21T00:00:00.000+0000
20,6,2020,4,0,0,Chad,TCD,15477751,Africa,TD,2020-06-20T00:00:00.000+0000
19,6,2020,0,0,0,Chad,TCD,15477751,Africa,TD,2020-06-19T00:00:00.000+0000
18,6,2020,1,0,0,Chad,TCD,15477751,Africa,TD,2020-06-18T00:00:00.000+0000
17,6,2020,3,1,0,Chad,TCD,15477751,Africa,TD,2020-06-17T00:00:00.000+0000
16,6,2020,0,0,0,Chad,TCD,15477751,Africa,TD,2020-06-16T00:00:00.000+0000
15,6,2020,2,1,0,Chad,TCD,15477751,Africa,TD,2020-06-15T00:00:00.000+0000


Write the final dataset to the table *FactCovid19* in SQL MI.<br>
Note the *applicationintent* is set to "*ReadWrite*" which is also the default when not specified.

In [15]:
table_name = "[Covid19datamart].[dbo].[FactCovid19]"

try:
  dfResult.write \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("user", sqlmiuser) \
        .option("port", 3342) \
        .option("password", sqlmipwd) \
        .option("applicationintent", "ReadWrite") \
        .mode("append") \
        .save()
except ValueError as error :
    print("Connector Write failed", error)


In [16]:
display(dfResult)

Day,Month,Year,Cases,Deaths,PredictedDeaths,CountryName,CountryCode,Population,Continent,CountryISOCode,DateRecorded
24,6,2020,2,0,0,Chad,TCD,15477751,Africa,TD,2020-06-24T00:00:00.000+0000
23,6,2020,0,0,0,Chad,TCD,15477751,Africa,TD,2020-06-23T00:00:00.000+0000
22,6,2020,0,0,0,Chad,TCD,15477751,Africa,TD,2020-06-22T00:00:00.000+0000
21,6,2020,0,0,0,Chad,TCD,15477751,Africa,TD,2020-06-21T00:00:00.000+0000
20,6,2020,4,0,0,Chad,TCD,15477751,Africa,TD,2020-06-20T00:00:00.000+0000
19,6,2020,0,0,0,Chad,TCD,15477751,Africa,TD,2020-06-19T00:00:00.000+0000
18,6,2020,1,0,0,Chad,TCD,15477751,Africa,TD,2020-06-18T00:00:00.000+0000
17,6,2020,3,1,0,Chad,TCD,15477751,Africa,TD,2020-06-17T00:00:00.000+0000
16,6,2020,0,0,0,Chad,TCD,15477751,Africa,TD,2020-06-16T00:00:00.000+0000
15,6,2020,2,1,0,Chad,TCD,15477751,Africa,TD,2020-06-15T00:00:00.000+0000


-- End of notebook --