In [0]:
filename = dbutils.widgets.get('filename')
print(filename)

In [0]:
alreadyMounted = False
for x in dbutils.fs.mounts():
    if x.mountPoint == "/mnt/sales":
        alreadyMounted = True
        break
    else:
        alreadyMounted = False
print(alreadyMounted)

In [0]:
dbServer = 'vetrisqlserver'
dbPort = '1433'
dbName = 'salesdb'
dbUser = 'vetrisa'
dbPassword = 'sql-password'
databricksScope = 'salesprojectscope'

In [0]:
storageAccountKey = dbutils.secrets.get(scope='salesprojectscope',key='storage-acccount-key')

In [0]:
if not alreadyMounted:
    dbutils.fs.mount(
source = 'wasbs://sales@vetristoragesa.blob.core.windows.net',
mount_point = '/mnt/sales',
extra_configs={'fs.azure.account.key.vetristoragesa.blob.core.windows.net':storageAccountKey}
)
    alreadyMounted = True
    print('mounting done')
else:
    print("It is already mounted")

In [0]:
ordersDf = spark.read.csv('/mnt/sales/landing/{}',inferSchema=True,header=True)

In [0]:
display(ordersDf)

In [0]:
errorFlg = False
ordersCount = ordersDf.count()
print(ordersCount)
distinctOrdersCount = ordersDf.select('order_id').distinct().count()
print(distinctOrdersCount)

if ordersCount != distinctOrdersCount:
    errorFlg = True
if errorFlg:
    dbutils.fs.mv('/mnt/sales/landing/{}'.format(filename),'/mnt/sales/discarded')
    dbutils.notebook.exit('{"errorFlg":"true","errorMsg":"order id is repeated"}')

ordersDf.createOrReplaceTempView("orders")

In [0]:
df = spark.sql("select * from orders")
display(df)

In [0]:
connectionUrl =
'jdbc:sqlserver://{}.database.windows.net:{};database={};user={};'.format(dbServer,dbPort, dbName, dbUser)
dbPassword = dbutils.secrets.get(scope = databricksScope, key='sql-password')
connectionProperties = {
'password': dbPassword,
'driver':'com.microsoft.sqlserver.jdbc.SQLServerDriver'
}

In [0]:
validStatusDf = spark.read.jdbc(url = connectionUrl, table ='dbo.valid_order_status',properties = connectionProperties )
display(validStatusDf)

In [0]:
invalidRowsDf = spark.sql("select * from orders where order_status not in (select * from valid_status)")
display(validStatusDf)

In [0]:
if invalidRowsDf.count() > 0:
    errorFlg = True
if errorFlg:
    dbutils.fs.mv('/mnt/sales/landing/{}'.format(filename),'/mnt/sales/discarded')
    dbutils.notebook.exit('{"errorFlg":"true","errorMsg":"Invalid order status"}')
else:
    dbutils.fs.mv('/mnt/sales/landing/{}'.format(filename),'/mnt/sales/staging')

In [0]:
orderItemsDf = spark.read.csv('/mnt/sales/order_items/order_items.csv',inferSchema=True,header=True)
display(orderItemsDf)
orderItemsDf.createOrReplaceTempView("order_items")

In [0]:
customersDf = spark.read.jdbc(url=connectionUrl, table='dbo.customers',properties = connectionProperties)
display(customersDf)
customersDf.createOrReplaceTempView("customers")

In [0]:
ordersDf = spark.read.csv('/mnt/sales/staging/{}',inferSchema=True,header=True)
ordersDf.createOrReplaceTempView("orders")

In [0]:
resultDf1 = spark.sql("""select c.customer_id,c.customer_firstname,c.customer_lastname,c.customer_city,c.customer_state,c.customer_zipcode,count(order_id) as no_of_orders,round(sum(order_item_subtotal),2)as total_amount from customers c,orders o,order_items i on c.customer_id = o.customer_id and o.order_id = i.order_item_order_id group by c.customer_id,c.customer_firstname,c.customer_lastname,c.customer_city,c.customer_state,c.customer_zipcode order by total_amount desc""")

display(resultDf1)

In [0]:
resultDf1.write.jdbc(url=connectionUrl, table='dbo.sales_reporting',properties = connectionProperties,mode='overwrite')