In [0]:
# Project Summary: Customer Sales Reporting

This notebook documents the process of analyzing customer sales data and reporting it to a SQL Server database. The project involves extracting data from an Azure Blob Storage container, performing data transformations and analysis using Apache Spark, and writing the results to a SQL Server database for reporting purposes.

Key Steps:
1. Data ingestion from Azure Blob Storage.
2. Data transformation and analysis using Apache Spark.
3. Creating temporary views for querying and analysis.
4. Generating customer sales reporting data.
5. Writing the results to a SQL Server database.

In [0]:
filename = dbutils.widgets.get('filename')
#filename = "orders_new.csv"
print(filename)

In [0]:

alreadyMounted = False
for x in dbutils.fs.mounts():
    if x.mountPoint == "/mnt/sales":
        alreadyMounted = True
        break
    else:
        alreadyMounted = False


print(alreadyMounted)

In [0]:
dbServer = 'jdserver'
dbPort = '1433'
dbName = 'jddatabase'
dbUser = 'jayesh'
dbPassword = 'sql-password'
databricksScope = 'salesprojectscope01'

In [0]:
stroageAccountKey = dbutils.secrets.get(scope = 'salesprojectscope01', key = 'stroage-account-key')

In [0]:
if not alreadyMounted:
    dbutils.fs.mount(source='wasbs://sales@jdstroage.blob.core.windows.net',mount_point='/mnt/sales',
extra_configs={'fs.azure.account.key.jdstroage.blob.core.windows.net':'stroageAccountKey'})
    
    alreadyMounted = True
    print("mounting done successfully")
else:
    print("it is already mounted")

In [0]:
orders_df = spark.read.csv('dbfs:/mnt/sales/landing/{}'.format(filename), inferSchema = True, header = True)

In [0]:
display(orders_df.limit(20))

In [0]:
orders_df.count()

In [0]:
errorFlg = False

ordersCount = orders_df.count()
print(ordersCount)

distinct_count = orders_df.select('order_id').distinct().count()
print(distinct_count)

if ordersCount != distinct_count:
    errorFlg = True
if errorFlg:
    dbutils.fs.mv('/mnt/sales/landing/{}'.format(filename),'/mnt/sales/discarded')
    dbutils.notebook.exit('{"errorFlg": "true", "errorMsg":"Orderid is repeated"}')

    orders_df.createOrReplaceTempView("orders")

In [0]:
orders_df.createOrReplaceTempView("orders")

In [0]:
df = spark.sql("select * from orders")
display(df.limit(20))

In [0]:
connectionUrl = 'jdbc:sqlserver://{}.database.windows.net:{};database={};user={};'.format(dbServer,dbPort,dbName,dbUser)

In [0]:
dbPassword = dbutils.secrets.get(scope = databricksScope, key = 'sql-password')

In [0]:
connectionProperties = {
    'password': dbPassword,
    'driver':'com.microsoft.sqlserver.jdbc.SQLServerDriver'
}

In [0]:
valid_status = spark.read.jdbc(url = connectionUrl, table = 'dbo.valid_order_status', properties= connectionProperties)

In [0]:
display(valid_status)

In [0]:
valid_status.createOrReplaceTempView("valid_status")

In [0]:
invaild_rows_df = spark.sql("select * from orders where order_status not in (select * from valid_status)")

In [0]:
display(invaild_rows_df)

In [0]:
if invaild_rows_df.count() >0:
    errorFlg = True
if errorFlg:
    dbutils.fs.mv('/mnt/sales/landing/{}'.format(filename),'/mnt/sales/discarded')
    dbutils.notebook.exit('{"errorFlg": "true", "errorMsg":"Invalid order status found"}')
else:
    dbutils.fs.mv('/mnt/sales/landing/{}'.format(filename),'/mnt/sales/staging')
    
    

In [0]:
order_item_df = spark.read.csv('/mnt/sales/orders_item/order_items.csv', inferSchema = True, header = True)
display(order_item_df.limit(20))

In [0]:
order_item_df.createOrReplaceTempView("order_items")

In [0]:
customer_df = spark.read.jdbc(url = connectionUrl, table = 'dbo.customers', properties= connectionProperties)

In [0]:
display(customer_df.limit(20))
customer_df.createOrReplaceTempView("customers")

In [0]:
orders_df = spark.read.csv('dbfs:/mnt/sales/staging/{}'.format(filename), inferSchema = True, header = True)
orders_df.createOrReplaceTempView("orders")

In [0]:
result_df1 = spark.sql("""
    SELECT 
        Customers.customer_id,
        Customers.customer_fname,
        Customers.customer_lname,
        Customers.customer_email,
        Customers.customer_password,
        Customers.customer_street,
        Customers.customer_city,
        Customers.customer_state,
        Customers.customer_zipcode,
        COUNT(orders.order_id) AS num_orders_placed,
        ROUND(SUM(order_items.order_item_subtotal), 2) AS total_amt
    FROM 
        Customers
    JOIN 
        orders ON Customers.customer_id = orders.customer_id
    JOIN 
        Order_items ON orders.order_id = Order_items.order_item_order_id
    GROUP BY 
        Customers.customer_id,
        Customers.customer_fname,
        Customers.customer_lname,
        Customers.customer_email,
        Customers.customer_password,
        Customers.customer_street,
        Customers.customer_city,
        Customers.customer_state,
        Customers.customer_zipcode
    ORDER BY 
        total_amt DESC
""")


In [0]:
display(result_df1.limit(20))


In [0]:


result_df1.write.jdbc(url=connectionUrl, table='dbo.sales_reporting', properties= connectionProperties, mode='overwrite')