## Parameterized Job for running encryption

This notebook is a template to illustrate how encryption at scale can be parameterized so that it can be run with  data factory scheduling or Databricks job scheduling. The main use is to read json param file that contain mapping between dataset name and its schema definition & encryption definition so that we can automate: 
1. Reading datasets of different source types: flat file (csv and other), streaming from eventhubs and decode from raw into structured format
2. Automatically pick up fields to encrypt
3. Write to a designated delta table
Note:
- The structurization of data sources is currently done in a simplistic way, e.g. support string, numeric and date column. For other complex types like timestamp, reformat of date etc...future effort is needed to enhance. The main focus of this is to get the structure out in string and apply encryption
- For parallel running of multiple notebook like this where each notebook process one dataset/table, one can either define parallel execution steps in ADF or use databricks workflow to run notebooks conrrently (https://docs.databricks.com/user-guide/notebooks/notebook-workflows.html#run-multiple-notebooks-concurrently). In that case, the step to load json file should be placed at the master notebook and this notebook is used as worker notebook to run a specific dataset

Parameters for the job/notebook
These are parameters for the notebook that can be set at run time either by Devop engineer or by another master notebook/job
1. schema_mapping_json_path: Path to the Json file that contain mapping between dataset names, fields, whether or not it needs encryption, data type. See below for an example of a json file
2. Dataset name: name of the dataset that will be used to look up for schema and encryption mapping in the json file
3. Ingestion type: Streaming or Batch. Streaming is used when the source is EventHub. Batch is used when raw data is copied to a landing folder in ADLS Gen 2 and this notebook picks up from there
4. Input data folder: For batch processing from landing zone. This can be set with wildcat to copy data recursively
5. Output data folder: In case you let the job create the delta table automatically in batch mode, set the output folder so that encrypted data is created there. For batch mode, this may be needed as there can be multiple checkpoint location before final append to the target table. In case of streaming, the final table can be used directly as Streaming supports checkpoint location
6. Table name: output table name. Can be an existing table name
7. Eventhub Account: the account name of the eventhub in case a streaming dataset is used
8. Secret scope/secret: name of secret scope and key to retrieve EH's key 
9. EH topic: topic to read data from. As a deviation from original design due to the use of Delta table, it's recommended to write streaming encryption output to a delta table instead of EH's topic. The reason is EH delta table support change capture. So the next job can just subscribe to a Delta table to read new changes. This may be faster than reading from a EH's topic.

{
  "datasets": {
    "chicago_crimes": {
      "first_row_is_header": "true",
      "field_target_dtype_mapping": {
        "Seq": "IntegerType()",
        "ID": "IntegerType()",
        "Case_Number": "StringType()",
        "Date": "StringType()",
        "Block": "StringType()",
        "IUCR": "StringType()",
        "Primary_Type": "StringType()",
        "Description": "StringType()",
        "Location_Description": "StringType()",
        "Beat": "StringType()",
        "Arrest": "StringType()",
        "Domestic": "StringType()",
        "District": "StringType()",
        "Ward": "StringType()",
        "Community_Area": "StringType()",
        "FBI_Code": "StringType()",
        "X_Coordinate": "StringType()",
        "Y_Coordinate": "StringType()",
        "Year": "StringType()",
        "Updated_On": "StringType()",
        "Latitude": "StringType()",
        "Longitude": "StringType()",
        "Location": "StringType()"
      },
      "field_encryption_mapping": {
        "Description": "alpha",
        "Location_Description": "alpha",
        "Block": "alpha",
        "X_Coordinate": "alpha",
        "Y_Coordinate": "alpha"
      },
      "delimiter": ",",
      "format": "csv"
    },
    "Event_json_flat": {
      "field_target_dtype_mapping": {
        "time": "TimestampType()",
        "action": "StringType()"
      },
      "field_encryption_mapping": {
        "time": "alpha",
        "action": "alpha"
      },
      "format": "json"
    },
    "Event_json_stream": {
      "field_target_dtype_mapping": {
        "time": "TimestampType()",
        "action": "StringType()"
      },
      "field_encryption_mapping": {
        "time": "alpha",
        "action": "alpha"
      },
      "format": "json"
    }
  }
}

In [4]:
//uncomment and Run this here or set it at the cluster level if you'd like to access ADLS Gen2 store directly instead of using mount point. Currently mount point of ADLS Gen 2 is not supported for Delta table
// spark.conf.set("fs.azure.account.auth.type..dfs.core.windows.net", "OAuth")
// spark.conf.set("fs.azure.account.oauth.provider.type.adlsdatalakegen6.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
// spark.conf.set("fs.azure.account.oauth2.client.id..dfs.core.windows.net", "")
// spark.conf.set("fs.azure.account.oauth2.client.secret..dfs.core.windows.net", "")
// spark.conf.set("fs.azure.account.oauth2.client.endpoint..dfs.core.windows.net", "")
// spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true")
// dbutils.fs.ls("abfss://test@.dfs.core.windows.net/")
// spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "false")


In [5]:
%sql drop table test_db.my_table5 

In [6]:
%sql 
---replace this with the name of the table you'll use if you want to test the job with existing table instead of letting the job to create the table itself
-- CREATE TABLE  iF NOT EXISTS test_db.my_table5 (action STRING, time string)
-- USING DELTA LOCATION 'abfss://test@adlsdatalakegen6.dfs.core.windows.net/testencrypt2'

In [7]:
%python
# dbutils.widgets.removeAll()



In [8]:
%python
from pyspark.sql.types import *
import json
from json import JSONDecoder
from collections import OrderedDict

dbutils.widgets.dropdown("ingestion_type", "Batch", ["Batch", "Streaming"], "Ingestion type")
dbutils.widgets.text("dataset_name", "chicago_crimes", "Dataset name")
dbutils.widgets.text("output_tbl", "output_table", "output delta table name ")

dbutils.widgets.text("eh_topic", "kafka_spark", "Input eventHub topic")
dbutils.widgets.text("secret_scope", "encryptionkey", "Secret scope")
dbutils.widgets.text("secret_key", "evh", "Secret key")

dbutils.widgets.text("checkpointLocation", "dbfs:/mnt/cp/testcp", "Check point Location for streaming")

dbutils.widgets.text("input_path", "/FileStore/tables/Chicago_*.csv", "Input Data Folder")
dbutils.widgets.text("eh_account", "kafkaeventhub01", "Event Hub Account")
dbutils.widgets.text("en_schema_path", "/dbfs/FileStore/tables/enc_schema.json", "Json Encryption Schema Path")
dbutils.widgets.text("output_path", "/FileStore/output/output.delta", "Output File Path")

dataset_name = dbutils.widgets.get("dataset_name")

ingestion_type = dbutils.widgets.get("ingestion_type")
en_schema_path =dbutils.widgets.get("en_schema_path")
input_path = dbutils.widgets.get("input_path")
checkpointLocation = dbutils.widgets.get("checkpointLocation")

eh_account = dbutils.widgets.get("eh_account")
output_path=dbutils.widgets.get("output_path")
output_tbl=dbutils.widgets.get("output_tbl")

eh_topic=dbutils.widgets.get("eh_topic")
secret_scope=dbutils.widgets.get("secret_scope")
secret_key=dbutils.widgets.get("secret_key")


#Need a method to maintain order of json fields in the mapping
json_data=open(en_schema_path).read()
customdecoder = JSONDecoder(object_pairs_hook=OrderedDict)
jsondata = customdecoder.decode(json_data)

field_encryption_mapping =jsondata.get('datasets').get(dataset_name).get('field_encryption_mapping')
data_format = jsondata.get('datasets').get(dataset_name).get("format")
delimiter= jsondata.get('datasets').get(dataset_name).get("delimiter")
schema_text = jsondata.get('datasets').get(dataset_name).get('field_target_dtype_mapping')
schema = StructType()

for (field,dtype) in schema_text.items():
  if dtype =='StringType()':
    dtype =StringType()
  elif dtype =='IntegerType()':
    dtype =IntegerType()
  elif dtype =='LongType()':
    dtype =LongType()
  elif dtype =='LongType()':
    dtype =LongType()
  elif dtype =='DateType()':
    dtype =DateType()
  elif dtype =='TimestampType()':
    dtype =TimestampType()
  schema.add(field, data_type = dtype)

print(ingestion_type)
print(en_schema_path)
print(data_format)
print(input_path)
print(eh_account)
print(output_path)
print(schema)
print(eh_topic)
print(secret_scope)
print(secret_key)


In [9]:
%python
#below is to clear the path before each run, need to disable this for real run
from pyspark.sql.functions import *

dbutils.fs.rm(output_path, True)
dbutils.fs.rm(checkpointLocation, True)
spark.sql("drop table if exists "+output_tbl)

#Batch copy and encryption
temp_tbl_name = 'temp_tbl'
encrypt_func='encrypt'
if ingestion_type == "Batch":
  if data_format =='csv':
    first_row_is_header = "true"
    df = spark.read.format(data_format) \
    .option("header", first_row_is_header) \
    .option("sep", delimiter) \
    .schema(schema) \
    .load(input_path) 
  else:
    df = spark.read.format(data_format) \
    .schema(schema) \
    .load(input_path)     
else: #Prepare for loading eventHub info
  eh_key = dbutils.secrets.get(secret_scope,secret_key)
  BOOTSTRAP_SERVERS = eh_account+".servicebus.windows.net:9093"
  EH_SASL = "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='$ConnectionString' password='Endpoint=sb://"+eh_account+".servicebus.windows.net;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey="+eh_key+"';"
  GROUP_ID = "$Default"
  df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) \
  .option("subscribe", eh_topic) \
  .option("kafka.sasl.mechanism","PLAIN") \
  .option("kafka.security.protocol","SASL_SSL") \
  .option("kafka.sasl.jaas.config", EH_SASL ) \
  .option("kafka.request.timeout.ms", "60000") \
  .option("kafka.session.timeout.ms", "60000") \
  .option("kafka.group.id", GROUP_ID) \
  .option("failOnDataLoss", "false") \
  .load() \
  .select(col("timestamp"), col("value").cast("STRING").alias("value")) \
  .select(col("timestamp"), from_json(col("value"), schema).alias("json"))
  

df.registerTempTable(temp_tbl_name)

sql_statement = "Select "
#Where statement to remove null value object
where_statement = " where "
for field in schema_text.keys():
  if field in field_encryption_mapping.keys():
    #in case of streaming above, the field need to be accessed as json.field_name
    if ingestion_type == "Batch":
      sql_statement = sql_statement  +encrypt_func +"("+field+",'"+ field_encryption_mapping.get(field)+"')  "+field+" ," 
      where_statement = where_statement +field+ " is not null and "
    else:
      sql_statement = sql_statement  +encrypt_func +"(json."+field+",'"+ field_encryption_mapping.get(field)+"')  "+field+" ," 
      where_statement = where_statement +"json."+field+ " is not null and "
      
  else:
    if ingestion_type == "Batch":
      sql_statement = sql_statement+ field+"," 
    else:
      sql_statement = sql_statement+"json."+ field+","
sql_statement = sql_statement[:-1] + " from " + temp_tbl_name +  where_statement[:-4]
print(sql_statement)
df = spark.sql(sql_statement)

#Write out the encrypted dataset
if ingestion_type == "Batch":
  df.write.format("delta").saveAsTable(output_tbl, path=output_path)
else:#write stream to the delta table
  df.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", checkpointLocation) \
  .table("output_tbl") 


In [10]:
//Show how the next job can just subscribe to delta table to read new data automatically without having to read from EH.
// display(spark.readStream.table("test_db.my_table5"))

In [11]:
%python
#To simulate a stream to Kakfa for testing purpose, uncomment and run the following 
# from pyspark.sql.types import *
# from pyspark.sql.functions import *

# inputPath = "/databricks-datasets/structured-streaming/events/"

# # Since we know the data format already, let's define the schema to speed up processing (no need for Spark to infer schema)
# jsonSchema = StructType([ StructField("time", TimestampType(), True), StructField("action", StringType(), True) ])


# # Similar to definition of staticInputDF above, just using `readStream` instead of `read`
# streamingInputDF = (
#   spark
#     .readStream                       
#     .schema(jsonSchema)               # Set the schema of the JSON data
#     .option("maxFilesPerTrigger", 1)  # Treat a sequence of files as a stream by picking one file at a time
#     .json(inputPath)
# )


# EH_SASL = "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='$ConnectionString' password='Endpoint=sb://"+eh_account+".servicebus.windows.net;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey="+eh_key+"';"
# BOOTSTRAP_SERVERS = eh_account+".servicebus.windows.net:9093"

# GROUP_ID = "$Default"
# df = streamingInputDF.selectExpr("CAST(time AS STRING) AS key", "to_json(struct(*)) AS value") \
#   .writeStream \
#   .format("kafka") \
#   .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) \
#   .option("topic", "kafka_spark") \
#   .option("kafka.sasl.mechanism","PLAIN") \
#   .option("kafka.security.protocol","SASL_SSL") \
#   .option("kafka.sasl.jaas.config", EH_SASL ) \
#   .option("checkpointLocation", "dbfs:/mnt/demo/checkpoint19") \
#   .start() 

