# The code in this notebook explores the Spark Autoloader feature and Structured Streaming API

In [2]:
# Configure SPN direct access to ADLS Gen 2. Permission is based on File or folder based ACL assignments to the Data Lake filesystem (container) .
# RBAC assignments to the top level Azure Data Lake resource is not required.
spark.conf.set("fs.azure.account.auth.type.dstore.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.dstore.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.dstore.dfs.core.windows.net", dbutils.secrets.get("myscope", key="clientid"))
spark.conf.set("fs.azure.account.oauth2.client.secret.dstore.dfs.core.windows.net", dbutils.secrets.get("myscope", key="clientsecret"))
spark.conf.set("fs.azure.account.oauth2.client.endpoint.dstore.dfs.core.windows.net", "https://login.microsoftonline.com/{}/oauth2/token".format(dbutils.secrets.get("myscope", key="tenantid")))

In [3]:
# Read a single csv file into dataframe to retrieve current schema
df_csv = spark.read.format("csv").option("inferSchema", True).option("header", True).load("abfss://rawcontainer@dstore.dfs.core.windows.net/mockcsv/file1.csv")

In [4]:
# Display schema
df_csv.schema

In [5]:
# Display the object type
type(df_csv.schema)

In [6]:
# Save the schema to a json file
import json
with open("/dbfs/tmp/csvschema.json", "w") as f:
          json.dump(df_csv.schema.jsonValue(), f)

In [7]:
# Test reading the schema from the json file
from pyspark.sql.types import StructType
with open("/dbfs/tmp/csvschema.json") as sch_file:
    dataset_schema = StructType.fromJson(json.load(sch_file))
    #print(dataset_schema.simpleString())
dataset_schema

In [8]:
# Test the schema object type retrieved from the json file
type(dataset_schema)

In [9]:
# Import required modules
from pyspark.sql import functions as f
from datetime import datetime
from pyspark.sql.types import StringType

In [10]:
connection_string = dbutils.secrets.get("myscope", key="storageconnstr")

In [11]:
df_stream_in = spark.readStream.format("cloudFiles").option("cloudFiles.useNotifications", True).option("cloudFiles.format", "csv")\
            .option("cloudFiles.connectionString", connection_string)\
            .option("cloudFiles.resourceGroup", "rganalytics")\
            .option("cloudFiles.subscriptionId", dbutils.secrets.get("myscope", key="subid"))\
            .option("cloudFiles.tenantId", dbutils.secrets.get("myscope", key="tenantid"))\
            .option("cloudFiles.clientId", dbutils.secrets.get("myscope", key="clientid"))\
            .option("cloudFiles.clientSecret", dbutils.secrets.get("myscope", key="clientsecret"))\
            .option("cloudFiles.region", "eastus")\
            .schema(dataset_schema)\
            .option("cloudFiles.includeExistingFiles", True).load("abfss://rawcontainer@dstore.dfs.core.windows.net/mockcsv/")

In [12]:
# Custom function
def time_col():
  pass
  return datetime.now().strftime("%d/%m/%Y %H:%M:%S")

In [13]:
# Creat a UDF Function from the custom function above
time_col_udf = spark.udf.register("time_col_sql_udf", time_col, StringType())

In [14]:
# Add new constant column using the UDF above
df_transform = df_stream_in.withColumn("time_col", f.lit(time_col_udf()))

In [15]:
# Output Dataframe to JSON files
df_out = df_transform.writeStream\
  .trigger(once=True)\
  .format("json")\
  .outputMode("append")\
  .option("checkpointLocation", "abfss://checkpointcontainer@dstore.dfs.core.windows.net/checkpointcsv")\
  .start("abfss://rawcontainer@dstore.dfs.core.windows.net/autoloaderjson")

In [16]:
# Custom function to output streaming dataframe to Azure SQL DB
def output_sqldb(batch_df, batch_id):
#   Set Azure SQL DB Properties and Conn String
  sql_pwd = dbutils.secrets.get(scope = "myscope", key = "sqlpwd")
  sql_user = dbutils.secrets.get(scope = "myscope", key = "sqluser")
  dbtable = "staging"
  jdbc_url = "jdbc:sqlserver://sqlserver09.database.windows.net:1433;database=demodb;user="+ sql_user +";password="+ sql_pwd +";encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"
  # write the pyspark dataframe df_spark_test to Azure SQL DB
  batch_df.write.format("jdbc").mode("append").option("url", jdbc_url).option("dbtable", dbtable).save()

In [17]:
# Output Dataframe to Azure SQL DB
df_sqldb = df_transform.writeStream\
  .trigger(once=True)\
  .foreachBatch(output_sqldb)\
  .outputMode("append")\
  .option("checkpointLocation", "abfss://checkpointcontainer@dstore.dfs.core.windows.net/checkpointdb")\
  .start()