In [None]:
filename = dbutils.widgets.get('filename')
print(filename)

In [None]:
storage_access_key = dbutils.secrets.get(scope = 'databricksScope1', key='storage-access-key')

In [None]:
alreadyMounted = False
for x in dbutils.fs.mounts():
    if x.mountPoint == '/mnt/sales':
        alreadyMounted = True
        break 
    else:
        alreadyMounted = False

if not alreadyMounted: 
    dbutils.fs.mount(
    source = 'wasbs://sales@datalakeazureproject.blob.core.windows.net',
    mount_point= '/mnt/sales',
    extra_configs = {'fs.azure.account.key.datalakeazureproject.blob.core.windows.net':storage_access_key})
    
    alreadyMounted = True
    print("mounting done successfully")
else:
    print("Already mounted")
        


In [None]:
%fs 

ls /mnt/sales/order_items



In [None]:
orders_df = spark.read.csv("/mnt/sales/landing/{}".format(filename),inferSchema=True,header=True)

In [None]:
display(orders_df)

In [None]:
#First condition check - if duplicate order_id move file to discarded else create a table

errorFlg = False

orders_count = orders_df.count()
print(orders_count)

orders_distinct_count = orders_df.select('order_id').distinct().count()
print(orders_distinct_count)

if orders_count != orders_distinct_count:
    errorFlg = True

if errorFlg:
    dbutils.fs.mv('/mnt/sales/landing/{}'.format(filename),'/mnt/sales/discarded')
    dbutils.notebook.exit('{"errorFlg":"true","errormsg":"order_id is repeated"}')

orders_df.createOrReplaceTempView('orders')


In [None]:
#applying second validation - to check order status is valid or not by connecting to azure sql db

dbServer = 'ttsqlserver2580'
dbPort = '1433'
dbName = 'ttsqldb'
dbUser = 'nithin'
dbPassword = 'SQL-password'
databricksScope = 'databricksScope1 '

In [None]:
connectionUrl = 'jdbc:sqlserver://{}.database.windows.net:{};database={};user={};'.format(dbServer,
dbPort, dbName, dbUser)

dbPassword =  dbutils.secrets.get(scope = databricksScope, key='SQL-password')

connectionProperties = {'password': dbPassword,'driver':'com.microsoft.sqlserver.jdbc.SQLServerDriver'}

In [None]:
#to read the table

valid_status_df = spark.read.jdbc(url =connectionUrl,table='dbo.valid_order_status',properties=connectionProperties)

In [None]:
display(valid_status_df)

In [None]:
valid_status_df.createOrReplaceTempView('valid_status')

In [None]:
invalidRowsdf = spark.sql("select * from orders where order_status not in (select * from valid_status)")

In [None]:
display(invalidRowsdf)

In [None]:
if invalidRowsdf.count() > 0 :
    errorFlg = True

if errorFlg:
    dbutils.fs.mv('/mnt/sales/landing/{}'.format(filename),'/mnt/sales/discarded')
    dbutils.notebook.exit('{"errorFlg":"true","errormsg":"order_status is not valid"}')
else:
    dbutils.fs.mv('/mnt/sales/landing/{}'.format(filename),'/mnt/sales/staging')
    #dbutils.notebook.exit('{"errorFlg":"false","errormsg":"condition: Pass"}')

In [None]:
orderItemDf = spark.read.csv('/mnt/sales/order_items/order_items.txt',inferSchema=True,header=True)
orderItemDf.createOrReplaceTempView("order_items")

In [None]:
customer_df = spark.read.jdbc(url =connectionUrl,table='dbo.customers',properties=connectionProperties)
display(customer_df)
customer_df.createOrReplaceTempView("customers")

In [None]:
orders_df = spark.read.csv("/mnt/sales/staging/{}".format(filename),inferSchema=True,header=True)
orders_df.createOrReplaceTempView("orders")

In [None]:
result1_df = spark.sql("select customers.customer_id,customers.customer_fname,customers.customer_lname,customers.customer_city,customers.customer_state,customers.customer_zipcode,count(order_id) as num_orders_placed,round(sum(order_item_subtotal),2) as total_amount from customers,orders,order_items where customers.customer_id = orders.customer_id and orders.order_id=order_items.order_item_order_id group by customers.customer_id,customers.customer_fname,customers.customer_lname,customers.customer_city,customers.customer_state,customers.customer_zipcode order by total_amount desc")

In [None]:
display(result1_df) 

In [None]:
result1_df.write.jdbc(url =connectionUrl,table='dbo.sales_reporting',properties=connectionProperties,mode = 'overwrite')