In [None]:
from notebookutils import mssparkutils
import multiprocessing
from multiprocessing.pool import ThreadPool
from ast import literal_eval
from pyspark.sql.types import *
import json
import pytz
import datetime
import requests
import hashlib
import hmac
import base64

In [None]:
# set up variable for log analytics
## Retrive tenant id, client id and client secret from key vault
key_vault = "https://xxxxxxx.vault.azure.net/"

# Get customer id and share key of log analytic
customer_id =  mssparkutils.credentials.getSecret(key_vault, 'LogAnalyticCustomerID')
shared_key = mssparkutils.credentials.getSecret(key_vault, 'LogAnalyticsShareKey')

# Set log table name
log_type = 'FabricActivityLog'

In [None]:
# set datetime
autz = pytz.timezone('Australia/Melbourne')
# dateformat = '%d/%m/%Y %H:%M:%S'
dateformat = '%m/%d/%Y %H:%M:%S'
autodaydate =  datetime.datetime.now().astimezone(autz)

In [None]:
## Schema of log table
log_schema =  StructType([
    StructField("WorkspaceName", StringType()),
    StructField("NotebookName", StringType()),
    StructField("LakehouseName", StringType()),
    StructField("TableName", StringType()),
    StructField("LoadType", StringType()),
    StructField("Environment", StringType()),
    StructField("Status", StringType()),
    StructField("StartTime", StringType()),
    StructField("EndTime", StringType()),
    StructField("DurationSec", DoubleType()),
    StructField("RowWritten", IntegerType()), 
    StructField("StatusMessage", StringType())
])

In [None]:
#######################################
######Log Analytics Functions##########  
#######################################

# Build the API signature
def build_signature(customer_id, shared_key, date, content_length, method, content_type, resource):
    x_headers = 'x-ms-date:' + date
    string_to_hash = method + "\n" + str(content_length) + "\n" + content_type + "\n" + x_headers + "\n" + resource
    bytes_to_hash = bytes(string_to_hash, encoding="utf-8")  
    decoded_key = base64.b64decode(shared_key)
    encoded_hash = base64.b64encode(hmac.new(decoded_key, bytes_to_hash, digestmod=hashlib.sha256).digest()).decode()
    authorization = "SharedKey {}:{}".format(customer_id,encoded_hash)
    return authorization

# Build and send a request to the POST API
def post_data(customer_id, shared_key, body, log_type):
    method = 'POST'
    content_type = 'application/json'
    resource = '/api/logs'
    rfc1123date = datetime.datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')

    content_length = len(body)
    signature = build_signature(customer_id, shared_key, rfc1123date, content_length, method, content_type, resource)
    uri = 'https://' + customer_id + '.ods.opinsights.azure.com' + resource + '?api-version=2016-04-01'

    headers = {
        'content-type': content_type,
        'Authorization': signature,
        'Log-Type': log_type,
        'x-ms-date': rfc1123date
    }

    response = requests.post(uri,data=body, headers=headers)
    if (response.status_code >= 200 and response.status_code <= 299):
        print('Accepted')
    else:
        print("Response code: {}".format(response.status_code))

In [None]:
## Run notebook function
def run_notebook(notebook_info):
        table_name = notebook_info['TableName']
        notebook_name = childnotebook
        loadtype = notebook_info['LoadType']
        businesskey = notebook_info['BusinessKeys']
        watermarkcolumn = notebook_info['WatermarkColumn']
        start_time = datetime.datetime.now().astimezone(autz)

        # try run child notebook
        try:
                nboutput = mssparkutils.notebook.run(notebook_name, timeout_seconds= 60*60\
                        , arguments = {'shortcutname': shortcutname , 'lakehousename': lakehousename, 'tablename': table_name , 'loadtype': loadtype\
                                        , 'businesskey': businesskey, 'watermarkcolumn': watermarkcolumn} )
                end_time = datetime.datetime.now().astimezone(autz)
                duration = (end_time - start_time).total_seconds()
                nboutput = json.loads(nboutput)
                rows_written, nbresult = nboutput['rowswritten'], nboutput['nbresult']
                data = [{'WorkspaceName': workspacename, "NotebookName": notebook_name,'LakehouseName': lakehousename
                        ,  "TableName" : table_name, "LoadType" : loadtype , "Environment": f"Dev", "Status": f"Success"
                        , "StartTime": start_time.strftime(dateformat), "EndTime": end_time.strftime(dateformat)
                        , "DurationSec":duration, "RowWritten": rows_written, "StatusMessage": nbresult}]
                print('success - ' + table_name)

        # fail to run, collect error message
        except Exception as e:
                error_message = "Error Message - " + str(e)
                end_time = datetime.datetime.now().astimezone(autz).strftime(dateformat)
                duration = (end_time - start_time).total_seconds()
                rows_written = 0
                data = [{'WorkspaceName': workspacename, "NotebookName": notebook_name, 'LakehouseName': lakehousename
                        , "TableName" : table_name, "LoadType" : loadtype , "Environment": f"Dev",  "Status": f"Failure"
                        , "StartTime": start_time.strftime(dateformat), "EndTime": end_time.strftime(dateformat)
                        ,"DurationSec":duration, "RowWritten": rows_written, "StatusMessage":{error_message}}]
                print('fail - ' + table_name)
        
        #save log data into fabric table and log analytic workspace
        new_log_row = spark.createDataFrame(data, schema=log_schema)
        new_log_row.write.format("delta").mode("append").saveAsTable(fabriclogtable)
        log_body = json.dumps(data)
        post_data(customer_id, shared_key, log_body, log_type)