### Multi-Tenant Streaming from nested json files then performs mapping, validation and writes to CosmosDB/Postgres

#### Installation: This cluster needs to be installed with following libraries:
1. org.apache.kafka:kafka-clients:3.3.1 (maven)
2. azure-cosmos (pypi)
3. psycopg2 (pypi)

### Connect to Event Hubs

In [3]:
from pyspark.sql.functions import from_json, col,explode, split,get_json_object
from pyspark.sql.types import *
con_str = dbutils.secrets.get("scope1", "ehns001-con")
EH_SASL = f"org.apache.kafka.common.security.plain.PlainLoginModule required username='$ConnectionString' password='{con_str}';"
GROUP_ID = "$Default"

data_schema = StructType([
    StructField("tenant", StringType(), True),
    StructField("data", ArrayType(StringType()), True),
]

)
orders = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "ehns001.servicebus.windows.net:9093") \
  .option("subscribe", "sales_orders") \
  .option("kafka.sasl.mechanism","PLAIN") \
  .option("kafka.security.protocol","SASL_SSL") \
  .option("kafka.sasl.jaas.config", EH_SASL ) \
  .option("kafka.request.timeout.ms", "60000") \
  .option("kafka.session.timeout.ms", "60000") \
  .option("kafka.group.id", GROUP_ID) \
  .option("failOnDataLoss", "false") \
  .load() \
  .select(from_json(col("value").cast("string"), data_schema).alias("value"), "partition") \
  .select("value.tenant",explode("value.data").alias("data"), "partition") \
  .select("tenant","data", "partition") \

orders.createOrReplaceTempView("sales_orders")



In [4]:
%sql select * from sales_orders

### Mapping rule and validation rule per tenant. Easy onboarding new tenant

In [6]:
import pandas as pd
rule1 = {
  "mapping" :{"ORDER_NUMBER":"ORDERNUMBER","QTY_ORDERED":"QUANTITYORDERED","PRICE_EACH":"PRICEEACH","ORDER_LINE":"ORDERLINENUMBER","SALES":"SALES","ORDERDATE":"ORDERDATE","STATUS":"STATUS","QTR_ID":"QTR_ID","MONTH_ID":"MONTH_ID","YEAR_ID":"YEAR_ID","PRODUCTLINE":"PRODUCTLINE","MSRP":"MSRP","PRODUCTCODE":"PRODUCTCODE","CUSTOMERNAME":"CUSTOMERNAME","PHONE":"PHONE","ADDRESSLINE1":"ADDRESSLINE1","ADDRESSLINE2":"ADDRESSLINE2","CITY":"CITY","STATE":"STATE","POSTALCODE":"POSTALCODE","COUNTRY":"COUNTRY","TERRITORY":"TERRITORY","CONTACTLASTNAME":"CONTACTLASTNAME","CONTACTFIRSTNAME":"CONTACTFIRSTNAME","DEALSIZE":"DEALSIZE"},
"data_validation"
  :{"CITY":"NOT_NULL", "QUANTITYORDERED":"POSITIVE_NUMBER"}
}
rule2 = {
  "mapping" :{"ORDER_NUMBER":"ORDERNUMBER","QTY_ORDERED":"QUANTITYORDERED","PRICE_EACH":"PRICEEACH","ORDER_LINE":"ORDERLINENUMBER","SALES":"SALES","ORDERDATE":"ORDERDATE","STATUS":"STATUS","QTR_ID":"QTR_ID","MONTH_ID":"MONTH_ID","YEAR_ID":"YEAR_ID","PRODUCTLINE":"PRODUCTLINE","MSRP":"MSRP","PRODUCTCODE":"PRODUCTCODE","CUSTOMERNAME":"CUSTOMERNAME","PHONE":"PHONE","ADDRESSLINE1":"ADDRESSLINE1","ADDRESSLINE2":"ADDRESSLINE2","CITY":"CITY","STATE":"STATE","POSTALCODE":"POSTALCODE","COUNTRY":"COUNTRY","TERRITORY":"TERRITORY","CONTACTLASTNAME":"CONTACTLASTNAME","CONTACTFIRSTNAME":"CONTACTFIRSTNAME","DEALSIZE":"DEALSIZE"},
"data_validation"
  :{"CITY":"NOT_NULL", "QUANTITYORDERED":"POSITIVE_NUMBER"}
}
rule3 = {
  "mapping" :{"ORDER_NUMBER":"ORDERNUMBER","QTY_ORDERED":"QUANTITYORDERED","PRICE_EACH":"PRICEEACH","ORDER_LINE":"ORDERLINENUMBER","SALES":"SALES","ORDERDATE":"ORDERDATE","STATUS":"STATUS","QTR_ID":"QTR_ID","MONTH_ID":"MONTH_ID","YEAR_ID":"YEAR_ID","PRODUCTLINE":"PRODUCTLINE","MSRP":"MSRP","PRODUCTCODE":"PRODUCTCODE","CUSTOMERNAME":"CUSTOMERNAME","PHONE":"PHONE","ADDRESSLINE1":"ADDRESSLINE1","ADDRESSLINE2":"ADDRESSLINE2","CITY":"CITY","STATE":"STATE","POSTALCODE":"POSTALCODE","COUNTRY":"COUNTRY","TERRITORY":"TERRITORY","CONTACTLASTNAME":"CONTACTLASTNAME","CONTACTFIRSTNAME":"CONTACTFIRSTNAME","DEALSIZE":"DEALSIZE"},
"data_validation"
  :{"CITY":"NOT_NULL", "QUANTITYORDERED":"POSITIVE_NUMBER"}
}
tenant_rules = {"tenant":['tenant11','tenant12','tenant2','tenant3','tenant4','tenant5','tenant6','tenant7','tenant8','tenant9','tenant10'], "rule":[rule2, rule1, rule2, rule3, rule2, rule1, rule2, rule3, rule1, rule3, rule1]}
tenant_rules = pd.DataFrame(tenant_rules)
tenant_rule_df = spark.createDataFrame(tenant_rules)
tenant_rule_df.cache()

  Unable to convert the field rule. If this column is not necessary, you may consider dropping it or converting to primitive type before the conversion.
Direct cause: Nested StructType not supported in conversion from Arrow: struct<data_validation: struct<CITY: string, QUANTITYORDERED: string>, mapping: struct<ADDRESSLINE1: string, ADDRESSLINE2: string, CITY: string, CONTACTFIRSTNAME: string, CONTACTLASTNAME: string, COUNTRY: string, CUSTOMERNAME: string, DEALSIZE: string, MONTH_ID: string, MSRP: string, ORDERDATE: string, ORDER_LINE: string, ORDER_NUMBER: string, PHONE: string, POSTALCODE: string, PRICE_EACH: string, PRODUCTCODE: string, PRODUCTLINE: string, QTR_ID: string, QTY_ORDERED: string, SALES: string, STATE: string, STATUS: string, TERRITORY: string, YEAR_ID: string>>
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)
Out[8]: DataFrame[tenant: string, rule: map<string,map<string,string>>]

In [7]:
display(tenant_rule_df)

tenant,rule
tenant11,"Map(data_validation -> Map(CITY -> NOT_NULL, QUANTITYORDERED -> POSITIVE_NUMBER), mapping -> Map(PRODUCTLINE -> PRODUCTLINE, CUSTOMERNAME -> CUSTOMERNAME, MONTH_ID -> MONTH_ID, MSRP -> MSRP, ORDERDATE -> ORDERDATE, CITY -> CITY, PRICE_EACH -> PRICEEACH, CONTACTLASTNAME -> CONTACTLASTNAME, TERRITORY -> TERRITORY, ADDRESSLINE1 -> ADDRESSLINE1, PRODUCTCODE -> PRODUCTCODE, STATUS -> STATUS, QTY_ORDERED -> QUANTITYORDERED, SALES -> SALES, YEAR_ID -> YEAR_ID, PHONE -> PHONE, ORDER_NUMBER -> ORDERNUMBER, POSTALCODE -> POSTALCODE, DEALSIZE -> DEALSIZE, CONTACTFIRSTNAME -> CONTACTFIRSTNAME, COUNTRY -> COUNTRY, ADDRESSLINE2 -> ADDRESSLINE2, STATE -> STATE, QTR_ID -> QTR_ID, ORDER_LINE -> ORDERLINENUMBER))"
tenant12,"Map(data_validation -> Map(CITY -> NOT_NULL, QUANTITYORDERED -> POSITIVE_NUMBER), mapping -> Map(PRODUCTLINE -> PRODUCTLINE, CUSTOMERNAME -> CUSTOMERNAME, MONTH_ID -> MONTH_ID, MSRP -> MSRP, ORDERDATE -> ORDERDATE, CITY -> CITY, PRICE_EACH -> PRICEEACH, CONTACTLASTNAME -> CONTACTLASTNAME, TERRITORY -> TERRITORY, ADDRESSLINE1 -> ADDRESSLINE1, PRODUCTCODE -> PRODUCTCODE, STATUS -> STATUS, QTY_ORDERED -> QUANTITYORDERED, SALES -> SALES, YEAR_ID -> YEAR_ID, PHONE -> PHONE, ORDER_NUMBER -> ORDERNUMBER, POSTALCODE -> POSTALCODE, DEALSIZE -> DEALSIZE, CONTACTFIRSTNAME -> CONTACTFIRSTNAME, COUNTRY -> COUNTRY, ADDRESSLINE2 -> ADDRESSLINE2, STATE -> STATE, QTR_ID -> QTR_ID, ORDER_LINE -> ORDERLINENUMBER))"
tenant2,"Map(data_validation -> Map(CITY -> NOT_NULL, QUANTITYORDERED -> POSITIVE_NUMBER), mapping -> Map(PRODUCTLINE -> PRODUCTLINE, CUSTOMERNAME -> CUSTOMERNAME, MONTH_ID -> MONTH_ID, MSRP -> MSRP, ORDERDATE -> ORDERDATE, CITY -> CITY, PRICE_EACH -> PRICEEACH, CONTACTLASTNAME -> CONTACTLASTNAME, TERRITORY -> TERRITORY, ADDRESSLINE1 -> ADDRESSLINE1, PRODUCTCODE -> PRODUCTCODE, STATUS -> STATUS, QTY_ORDERED -> QUANTITYORDERED, SALES -> SALES, YEAR_ID -> YEAR_ID, PHONE -> PHONE, ORDER_NUMBER -> ORDERNUMBER, POSTALCODE -> POSTALCODE, DEALSIZE -> DEALSIZE, CONTACTFIRSTNAME -> CONTACTFIRSTNAME, COUNTRY -> COUNTRY, ADDRESSLINE2 -> ADDRESSLINE2, STATE -> STATE, QTR_ID -> QTR_ID, ORDER_LINE -> ORDERLINENUMBER))"
tenant3,"Map(data_validation -> Map(CITY -> NOT_NULL, QUANTITYORDERED -> POSITIVE_NUMBER), mapping -> Map(PRODUCTLINE -> PRODUCTLINE, CUSTOMERNAME -> CUSTOMERNAME, MONTH_ID -> MONTH_ID, MSRP -> MSRP, ORDERDATE -> ORDERDATE, CITY -> CITY, PRICE_EACH -> PRICEEACH, CONTACTLASTNAME -> CONTACTLASTNAME, TERRITORY -> TERRITORY, ADDRESSLINE1 -> ADDRESSLINE1, PRODUCTCODE -> PRODUCTCODE, STATUS -> STATUS, QTY_ORDERED -> QUANTITYORDERED, SALES -> SALES, YEAR_ID -> YEAR_ID, PHONE -> PHONE, ORDER_NUMBER -> ORDERNUMBER, POSTALCODE -> POSTALCODE, DEALSIZE -> DEALSIZE, CONTACTFIRSTNAME -> CONTACTFIRSTNAME, COUNTRY -> COUNTRY, ADDRESSLINE2 -> ADDRESSLINE2, STATE -> STATE, QTR_ID -> QTR_ID, ORDER_LINE -> ORDERLINENUMBER))"
tenant4,"Map(data_validation -> Map(CITY -> NOT_NULL, QUANTITYORDERED -> POSITIVE_NUMBER), mapping -> Map(PRODUCTLINE -> PRODUCTLINE, CUSTOMERNAME -> CUSTOMERNAME, MONTH_ID -> MONTH_ID, MSRP -> MSRP, ORDERDATE -> ORDERDATE, CITY -> CITY, PRICE_EACH -> PRICEEACH, CONTACTLASTNAME -> CONTACTLASTNAME, TERRITORY -> TERRITORY, ADDRESSLINE1 -> ADDRESSLINE1, PRODUCTCODE -> PRODUCTCODE, STATUS -> STATUS, QTY_ORDERED -> QUANTITYORDERED, SALES -> SALES, YEAR_ID -> YEAR_ID, PHONE -> PHONE, ORDER_NUMBER -> ORDERNUMBER, POSTALCODE -> POSTALCODE, DEALSIZE -> DEALSIZE, CONTACTFIRSTNAME -> CONTACTFIRSTNAME, COUNTRY -> COUNTRY, ADDRESSLINE2 -> ADDRESSLINE2, STATE -> STATE, QTR_ID -> QTR_ID, ORDER_LINE -> ORDERLINENUMBER))"
tenant5,"Map(data_validation -> Map(CITY -> NOT_NULL, QUANTITYORDERED -> POSITIVE_NUMBER), mapping -> Map(PRODUCTLINE -> PRODUCTLINE, CUSTOMERNAME -> CUSTOMERNAME, MONTH_ID -> MONTH_ID, MSRP -> MSRP, ORDERDATE -> ORDERDATE, CITY -> CITY, PRICE_EACH -> PRICEEACH, CONTACTLASTNAME -> CONTACTLASTNAME, TERRITORY -> TERRITORY, ADDRESSLINE1 -> ADDRESSLINE1, PRODUCTCODE -> PRODUCTCODE, STATUS -> STATUS, QTY_ORDERED -> QUANTITYORDERED, SALES -> SALES, YEAR_ID -> YEAR_ID, PHONE -> PHONE, ORDER_NUMBER -> ORDERNUMBER, POSTALCODE -> POSTALCODE, DEALSIZE -> DEALSIZE, CONTACTFIRSTNAME -> CONTACTFIRSTNAME, COUNTRY -> COUNTRY, ADDRESSLINE2 -> ADDRESSLINE2, STATE -> STATE, QTR_ID -> QTR_ID, ORDER_LINE -> ORDERLINENUMBER))"
tenant6,"Map(data_validation -> Map(CITY -> NOT_NULL, QUANTITYORDERED -> POSITIVE_NUMBER), mapping -> Map(PRODUCTLINE -> PRODUCTLINE, CUSTOMERNAME -> CUSTOMERNAME, MONTH_ID -> MONTH_ID, MSRP -> MSRP, ORDERDATE -> ORDERDATE, CITY -> CITY, PRICE_EACH -> PRICEEACH, CONTACTLASTNAME -> CONTACTLASTNAME, TERRITORY -> TERRITORY, ADDRESSLINE1 -> ADDRESSLINE1, PRODUCTCODE -> PRODUCTCODE, STATUS -> STATUS, QTY_ORDERED -> QUANTITYORDERED, SALES -> SALES, YEAR_ID -> YEAR_ID, PHONE -> PHONE, ORDER_NUMBER -> ORDERNUMBER, POSTALCODE -> POSTALCODE, DEALSIZE -> DEALSIZE, CONTACTFIRSTNAME -> CONTACTFIRSTNAME, COUNTRY -> COUNTRY, ADDRESSLINE2 -> ADDRESSLINE2, STATE -> STATE, QTR_ID -> QTR_ID, ORDER_LINE -> ORDERLINENUMBER))"
tenant7,"Map(data_validation -> Map(CITY -> NOT_NULL, QUANTITYORDERED -> POSITIVE_NUMBER), mapping -> Map(PRODUCTLINE -> PRODUCTLINE, CUSTOMERNAME -> CUSTOMERNAME, MONTH_ID -> MONTH_ID, MSRP -> MSRP, ORDERDATE -> ORDERDATE, CITY -> CITY, PRICE_EACH -> PRICEEACH, CONTACTLASTNAME -> CONTACTLASTNAME, TERRITORY -> TERRITORY, ADDRESSLINE1 -> ADDRESSLINE1, PRODUCTCODE -> PRODUCTCODE, STATUS -> STATUS, QTY_ORDERED -> QUANTITYORDERED, SALES -> SALES, YEAR_ID -> YEAR_ID, PHONE -> PHONE, ORDER_NUMBER -> ORDERNUMBER, POSTALCODE -> POSTALCODE, DEALSIZE -> DEALSIZE, CONTACTFIRSTNAME -> CONTACTFIRSTNAME, COUNTRY -> COUNTRY, ADDRESSLINE2 -> ADDRESSLINE2, STATE -> STATE, QTR_ID -> QTR_ID, ORDER_LINE -> ORDERLINENUMBER))"
tenant8,"Map(data_validation -> Map(CITY -> NOT_NULL, QUANTITYORDERED -> POSITIVE_NUMBER), mapping -> Map(PRODUCTLINE -> PRODUCTLINE, CUSTOMERNAME -> CUSTOMERNAME, MONTH_ID -> MONTH_ID, MSRP -> MSRP, ORDERDATE -> ORDERDATE, CITY -> CITY, PRICE_EACH -> PRICEEACH, CONTACTLASTNAME -> CONTACTLASTNAME, TERRITORY -> TERRITORY, ADDRESSLINE1 -> ADDRESSLINE1, PRODUCTCODE -> PRODUCTCODE, STATUS -> STATUS, QTY_ORDERED -> QUANTITYORDERED, SALES -> SALES, YEAR_ID -> YEAR_ID, PHONE -> PHONE, ORDER_NUMBER -> ORDERNUMBER, POSTALCODE -> POSTALCODE, DEALSIZE -> DEALSIZE, CONTACTFIRSTNAME -> CONTACTFIRSTNAME, COUNTRY -> COUNTRY, ADDRESSLINE2 -> ADDRESSLINE2, STATE -> STATE, QTR_ID -> QTR_ID, ORDER_LINE -> ORDERLINENUMBER))"
tenant9,"Map(data_validation -> Map(CITY -> NOT_NULL, QUANTITYORDERED -> POSITIVE_NUMBER), mapping -> Map(PRODUCTLINE -> PRODUCTLINE, CUSTOMERNAME -> CUSTOMERNAME, MONTH_ID -> MONTH_ID, MSRP -> MSRP, ORDERDATE -> ORDERDATE, CITY -> CITY, PRICE_EACH -> PRICEEACH, CONTACTLASTNAME -> CONTACTLASTNAME, TERRITORY -> TERRITORY, ADDRESSLINE1 -> ADDRESSLINE1, PRODUCTCODE -> PRODUCTCODE, STATUS -> STATUS, QTY_ORDERED -> QUANTITYORDERED, SALES -> SALES, YEAR_ID -> YEAR_ID, PHONE -> PHONE, ORDER_NUMBER -> ORDERNUMBER, POSTALCODE -> POSTALCODE, DEALSIZE -> DEALSIZE, CONTACTFIRSTNAME -> CONTACTFIRSTNAME, COUNTRY -> COUNTRY, ADDRESSLINE2 -> ADDRESSLINE2, STATE -> STATE, QTR_ID -> QTR_ID, ORDER_LINE -> ORDERLINENUMBER))"


### Perform data mapping and validation and write output to CosmosDB (and Delta table)

In [9]:
from azure.cosmos import CosmosClient
import psycopg2
import pandas as pd
import ast
URL ="https://cosmosdbnative01.documents.azure.com:443/"
KEY =dbutils.secrets.get("scope1", "cosmosdbnative01-con")
postgres_pass = dbutils.secrets.get("scope1", "cosmospostgres01-pass")
schema = "status int"
def write_cosmos(data):
  client = CosmosClient(URL, credential=KEY)
  DATABASE_NAME = 'sales'
  database = client.get_database_client(DATABASE_NAME)
  CONTAINER_NAME = 'sales_orders'
  container = database.get_container_client(CONTAINER_NAME)
  for item in data:
    container.upsert_item(item)
def write_postgress(data):
  columns = ",".join(data.columns)
  columns = "("+columns + ")"
  data = data.to_dict(orient="records")
  connection = psycopg2.connect(user="citus",
                                password=postgres_pass,
                                host="c.cosmospostgres01.postgres.database.azure.com",
                                port="5432",
                                database="citus",
                                sslmode='require')
  values = [tuple([value for value in row.values()]) for row in data ]
  cursor = connection.cursor()
  args = ','.join(cursor.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)", i).decode('utf-8')
                  for i in values)
                      
  # executing the sql statement
  cursor.execute(f"INSERT INTO sales_orders {columns} VALUES " + (args))
  connection.commit()
def process_validation(inputdf):
  rule = inputdf.iloc[0]['rule']
  mapping = rule['mapping']
  mapping = {item[0]:item[1] for item in mapping }
  source_columns = list(mapping.keys())
  target_columns = list(mapping.values())
  exploded_df = inputdf["data"].to_list()
  exploded_df = [ast.literal_eval(item) for item in exploded_df]
  exploded_df = pd.DataFrame(exploded_df)
  exploded_df['tenant'] = inputdf['tenant']

  #doing data mapping
  if set(source_columns).issubset(exploded_df.columns):
    source_columns.append("tenant")
    outputdf = exploded_df[source_columns]
    outputdf.rename(columns = mapping, inplace=True)
  else:
    target_columns.append("tenant")
    outputdf = pd.DataFrame([["9999"]*len(target_columns)], columns=target_columns)
    outputdf["valid_flag"] = "false"
    outputdf["reason"] = "column mapping failed"
    write_postgress(outputdf)
    return pd.DataFrame({"status":[0]})
  ###doing data validation
  
  data_validation = rule['data_validation']
  data_validation = {item[0]:item[1] for item in data_validation }
  outputdf['valid_flag'] = "true"
  outputdf['reason'] = ""
  #process data mapping
  for column in data_validation.keys():
    rule = data_validation[column]
    if rule == "POSITIVE_NUMBER":
      outputdf.loc[outputdf[column] <= 0, 'valid_flag'] = "false"
      outputdf.loc[outputdf[column] <= 0, 'reason'] = f"{column} has negative value"
#   write_cosmos(outputdf.to_dict(orient="records"))
  write_postgress(outputdf)

  return pd.DataFrame({"status":[1]})
def process_batch(batchdf, batchid):
  
  joined_df = batchdf.join(tenant_rule_df, on = "tenant")
  outputdf = joined_df.groupby("tenant").applyInPandas(process_validation, schema) 
  outputdf.collect()
#   outputdf.write.format("delta").mode("append").save("dbfs:/tmp/tmp_df")
writer = orders.writeStream.foreachBatch(process_batch)
writer.start()

Out[22]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f3044231a60>