### Set up connection information for KQL Database

Information on variables below.
- kustoCluster - Copy the value found in the walkthrough [here](https://learn.microsoft.com/en-us/fabric/data-factory/connector-kusto-dataflows#copy-the-query-uri).
- kustoDb - The name of your KQL Database
- kustoTable - The KQL Database Table you'll be copying data into the Lakehouse from.

In [None]:
kustoCluster = ''
kustoDb = ''
token = mssparkutils.credentials.getToken(kustoCluster)
kustoTable = ""

### Set up function to accept a KQL Query against the KQL Database
You will make multiple calls to KQL Database in this process and this function works in conjunction with the variables you have already set up to execute those queries.

In [None]:
def runKustoQuery(kustoQuery):
    dfKusotResult = spark.read \
        .format("com.microsoft.kusto.spark.datasource")\
        .option("kustoCluster", kustoCluster).option("kustoDatabase", kustoDb) \
        .option("kustoDatabase", kustoDb) \
        .option("accessToken", token)  \
        .option("kustoQuery", kustoQuery) \
        .load()
    
    return dfKusotResult

### Get a _cleaned_ week of data from KQL Database
This function accepts a week date and runs a query to clean the data in the KQL Database table before bringing it to the Silver Lakehouse table. This will serve as the base of the merge statement.

In [None]:
def getWeekData(weekInProcess):
    weekDataQuery = f"""
    faredata_raw
    | where endofweek(todatetime(pickup_datetime)) == endofweek(todatetime("{weekInProcess}"))
    | summarize fareAmount=sum(fare_amount),
                surcharge=sum(surcharge),
                mtaTax=sum(mta_tax),
                tipAmount=sum(tip_amount),
                tollsAmount=sum(tolls_amount),
                totalAmount=sum(total_amount),
                maxIngestDate=max(ingestion_time())
            by  medallion,
                hackLicense=hack_license,
                vendorId=vendor_id,
                pickupDatetime=pickup_datetime,
                paymentType=payment_type
    | extend ingestDate = ingestion_time()
    | extend rowKey = hash_sha256(strcat(medallion,hackLicense,vendorId,pickupDatetime,paymentType))
    """
    
    dfWeekInProcess = runKustoQuery(weekDataQuery)
    dfWeekInProcess.createOrReplaceTempView("v_aggWeek")

### Set up Control table for Batching the stream data coming from KQL DB
The Fabric capacity I'm working on in this Demo is very small. Therefore I have to batch my streaming data into chunks that will not overtax the capacity. Since my data is reliable I know that each week constitutes ~2 Million rows which is acceptable for the downstream processing.

In [None]:
# This is the query that will ascertain whether I have ingested new data for the
# weeks of fare data that exist in KQL DB. 
kqlIngestByWeek= """
faredata_raw
| summarize maxIngestDate = max(ingestion_time()) by endOfWeekDate = endofweek(todatetime(pickup_datetime)) 
"""

# Run the Query using the function defined in a previous step and save it to a temporary view.
dfIngestByWeek = runKustoQuery(kqlIngestByWeek)
dfIngestByWeek.createOrReplaceTempView("v_ingestByWeek")

# Use the temporary view to merge records to the kustoLog Lakehouse Delta Table.
# If a new week is detected it is inserted. If a week already exists but has ingested new data it is updated.
kustoLogMergeQuery = """
MERGE INTO control_kustoLog dest
USING v_ingestByWeek src
ON dest.endOfWeekDate = src.endOfWeekDate
WHEN MATCHED AND src.maxIngestDate > dest.maxIngestDate THEN
    UPDATE 
    SET dest.maxIngestDate = src.maxIngestDate,
        dest.status = 'new'
WHEN NOT MATCHED THEN 
    INSERT (maxIngestDate, endOfWeekDate, status)
    VALUES (src.maxIngestDate, src.endOfWeekDate, 'new')
"""

dfLogMergeQueryResults = spark.sql(kustoLogMergeQuery)

### Process New or Changed Weeks


In [None]:
# Determine which weeks are "new" from the control log. (These weeks may also have already existed but have newer data)
dfWeeksToProcess = spark.sql("SELECT * FROM control_kustolog WHERE status = 'new' order by endOfWeekDate asc limit 1")

# Collect the weeks for and iterate through each new week
for row in dfWeeksToProcess.rdd.collect():

    # Bring the current week into a variable
    weekInProcess = row.endOfWeekDate
    
    # Run the KQL DB query needed to get the week's data. Output will be a temp view used in Merge
    getWeekData(weekInProcess)

    # Construct the Merge query to be run in the next step.
    weekMergeQuery = """
    MERGE INTO silver_faredata dest
    USING v_aggWeek src
    ON dest.rowKey = src.rowKey
    WHEN MATCHED AND to_timestamp(src.maxIngestDate) > dest.maxIngestDate THEN
        UPDATE
        SET fareAmount = src.fareAmount
    WHEN NOT MATCHED THEN 
        INSERT *
    """

    # Merge the data and capture the outcome of that action in a Data Frame 
    # To-do: Add try/catch handling and maybe some row count logging based on this output
    dfWeekMergeResults = spark.sql(weekMergeQuery)

    # Update the Log to reflect that the batch has completed successfully
    updateLogQuery = f"UPDATE control_kustolog SET status = 'completed' WHERE endofWeekDate = '{weekInProcess}'"

    dfUpdateLogResults = spark.sql(updateLogQuery)

    # display(dfUpdateLogResults)


### Pre-requisite Tables

In [None]:
# CREATE TABLE IF NOT EXISTS silver_faredata
# (
#     medallion int,
#     hackLicense int,
#     vendorId string,
#     pickupDatetime timestamp,
#     paymentType string,
#     fareAmount decimal(38,5),
#     surcharge decimal(38,5),
#     mtaTax decimal(38,5),
#     tipAmount decimal(38,5),
#     tollsAmount decimal(38,5),
#     totalAmount decimal(38,5),
#     maxIngestDate TIMESTAMP,
#     rowKey string
# )
# USING DELTA

# CREATE TABLE IF NOT EXISTS control_kustoLog
# (
#     endOfWeekDate timestamp,
#     maxIngestDate timestamp,
#     status  string
# )
# USING DELTA