In [0]:
filename = dbutils.widgets.get("filename") # to get filename from pipeline created in ADF which is invoking this notebook
fnameWithoutExt = filename.split('.')[0]
print(fnameWithoutExt)

orders


In [0]:
storage_account = 'san17'
container_name = 'sales'
scope_name = 'supsasscope'
secret_name = 'supsastoken'

In [0]:
spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net","SAS")

In [0]:
spark.conf.set(f"fs.azure.sas.token.provider.type.{storage_account}.dfs.core.windows.net","org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")

In [0]:
spark.conf.set(f"fs.azure.sas.fixed.token.{storage_account}.dfs.core.windows.net",dbutils.secrets.get(f'{scope_name}',f'{secret_name}'))

In [0]:
ordersDf = spark.read.csv(f"abfs://{container_name}@{storage_account}.dfs.core.windows.net/landing/{filename}}",inferSchema=True,header=True)

In [0]:
display(ordersDf)

order_id,order_date,customer_id,order_status
1111111,2013-07-25T00:00:00.000+0000,11599,CLOSED
2222222,2013-07-25T00:00:00.000+0000,256,PENDING_PAYMENT
3333333,2013-07-25T00:00:00.000+0000,12111,COMPLETE
4444444,2013-07-25T00:00:00.000+0000,8827,CLOSED


In [0]:
errorFlg = False
ordersCount = ordersDf.count()
ordersDistinctCount = ordersDf.select('order_id').distinct().count()
print("orders count =",ordersCount,"\norders distinct count =",ordersDistinctCount)

In [0]:
if ordersCount != ordersDistinctCount:
    errorFlg = True
if errorFlg:
    dbutils.fs.mv(f"abfs://{container_name}@{storage_account}.dfs.core.windows.net/landing/{filename}",f"abfs://{container_name}@{storage_account}.dfs.core.windows.net/discarded")
    dbutils.notebook.exit('{"errorFlg":"true", "errorMsg":"Orderid is repeated"}')
ordersDf.createOrReplaceTempView('orders')

connect to sql server

In [0]:
dbServer='supsqlserver'
dbPort='1433'
dbName='supsqldb17'
dbUser='supsqladmin'
dbSecret='supsqlsec17'

In [0]:
connectionUrl='jdbc:sqlserver://{}.database.windows.net:{};database={};user={};'.format(dbServer,dbPort,dbName,dbUser)
dbSecret=dbutils.secrets.get(scope=scope_name,key="supsqlsec17")
connectionProperties={'password':dbSecret,'driver':'com.microsoft.sqlserver.jdbc.SQLServerDriver'}

In [0]:
display(dbutils.secrets.listScopes())

name
supsasscope
supscope17


In [0]:
dbutils.secrets.list("supsasscope")

In [0]:
dbutils.secrets.get(scope_name,"supsqlsec17")

In [0]:
validStatusDf = spark.read.jdbc(url=connectionUrl,table='dbo.valid_order_status',properties=connectionProperties)

In [0]:
display(validStatusDf)

status_name
ON_HOLD
PAYMENT_REVIEW
PROCESSING
CLOSED
SUSPECTED_FRAUD
COMPLETE
PENDING
CANCELED
PENDING_PAYMENT


In [0]:
validStatusDf.createOrReplaceTempView('valid_status')

In [0]:
invalidStatusCount = spark.sql('''
                               select * from orders where order_status not in (select status_name from valid_status)
                               ''')
display(invalidStatusCount)

order_id,order_date,customer_id,order_status


In [0]:
print(invalidStatusCount.count())

In [0]:
if invalidStatusCount.count() > 0:
    errorFlg = True
if errorFlg:
   dbutils.fs.mv(f"abfs://{container_name}@{storage_account}.dfs.core.windows.net/landing/orders.csv",f"abfs://{container_name}@{storage_account}.dfs.core.windows.net/discarded")
   dbutils.notebook.exit('{"errorFlg":"true", "errorMsg":"Invalid order_status found"}') 
else:
    dbutils.fs.mv(f"abfs://{container_name}@{storage_account}.dfs.core.windows.net/landing/orders.csv",f"abfs://{container_name}@{storage_account}.dfs.core.windows.net/staging")
    dbutils.notebook.exit('{"errorFlg":"false", "errorMsg":"file is successfully validated"}')

push the file from Amazon S3 bucket to Azure storage account (order_items directory in sales container) via azure data factory pipeline created

In [0]:
orderItemsDf = spark.read.csv(f"abfs://{container_name}@{storage_account}.dfs.core.windows.net/order_items/order_items.csv",inferSchema=True,header=True)
orderItemsDf.createOrReplaceTempView('order_items')

load the data from azure storage account customers directory in sales container into customers table created in azure SQL database via azure data factory pipeline created

In [0]:
customerDf = spark.read.jdbc(url=connectionUrl,table='dbo.customers',properties=connectionProperties)

In [0]:
customerDf.createOrReplaceTempView('customers')

In [0]:
ordersDf = spark.read.csv(f"abfs://{container_name}@{storage_account}.dfs.core.windows.net/staging/{filename}}",inferSchema=True,header=True)
ordersDf.createOrReplaceTempView('orders')

In [0]:
resultDf1 = spark.sql("SELECT customers.customer_id, customers.customer_fname, customers.customer_lname, customers.customer_city, customers.customer_state, customers.customer_zipcode, COUNT(order_id) AS num_orders_placed, ROUND(SUM(order_item_subtotal), 2) AS total_amount FROM customers, orders, order_items WHERE customers.customer_id = orders.customer_id AND orders.order_id = order_items.order_item_order_id GROUP BY customers.customer_id, customers.customer_fname, customers.customer_lname, customers.customer_city, customers.customer_state, customers.customer_zipcode ORDER BY total_amount DESC")

In [0]:
resultDf1.show()

+-----------+--------------+--------------+-------------+--------------+----------------+-----------------+------------+
|customer_id|customer_fname|customer_lname|customer_city|customer_state|customer_zipcode|num_orders_placed|total_amount|
+-----------+--------------+--------------+-------------+--------------+----------------+-----------------+------------+
|      11318|          Mary|         Henry|       Caguas|            PR|           00725|                5|     1129.86|
|       8827|         Brian|        Wilson|  San Antonio|            TX|           78240|                4|      699.85|
|        256|         David|     Rodriguez|      Chicago|            IL|           60625|                3|      579.98|
|      11599|          Mary|        Malone|      Hickory|            NC|           28601|                1|      299.98|
+-----------+--------------+--------------+-------------+--------------+----------------+-----------------+------------+



writing the results to table in azure sql DB for reporting purpose

In [0]:
resultDf1.write.jdbc(url = connectionUrl, table = 'dbo.sales_reporting', properties=connectionProperties, mode = 'overwrite')