# Retail Data Management and Analysis

## Objective:

- Participants will implement a fully modular ETL pipeline using Python. 
- They must:
    - Use Python functions to manage Azure databricks database operations (setup, data loading, transformations).
    - Analyze data using Python for Data Science.
    - Use PySpark for advanced transformations.
- Provide solution to the python functions according to the specifications.
- __Refrain from modifying the boilerplate code as it may lead to unexpected behavior.__
- The solution is to be written between the comments `# code starts here` and `# code ends here`.
- On completing all the questions, the assessment is to be submitted on moodle for evaluation.
- Before submitting the assessment make sure there are no errors while executing the assessment notebook.
- The kernel of the Jupyter notebook is to be set as `Python 3.10.6` if not set already. 

### All the libraries which are required to interact with Snowflake are imported here 

In [1]:
import json
import os
from azure.identity import ClientSecretCredential
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.databricks import AzureDatabricksManagementClient
from pyspark.sql import SparkSession
from databricks import sql
import subprocess
import requests
from requests.auth import HTTPBasicAuth
import pandas as pd
from mongomock import MongoClient
import mongomock
from pyspark.sql import functions as F
import json

# Databricks details

In [2]:
import random
random_number = random.randint(1000, 9999)
workspace_name = "demoworkspace"
resource_group_name = "Test"
location = "eastus"
managed_resource_group_name = "test-rg1"

Loading Azure Credentials

Executes the process of loading Azure credentials from a JSON file and returning an authenticated credential object.

This function reads the azure_credentials.json file (or a specified file path), extracts required authentication details (tenant_id, client_id, client_secret, and subscription_id), and validates their presence. It then authenticates using ClientSecretCredential, allowing secure access to Azure resources.

In [3]:
# Ensure the file exists
credentials_file= "/home/labuser/Desktop/Project/azure_credentials.json"
if not os.path.exists(credentials_file):
    raise FileNotFoundError(f"The credentials file '{credentials_file}' does not exist.")

# Load credentials from the file
with open(credentials_file, "r") as file:
    creds = json.load(file)
global tenant_id , client_id,client_secret,subscription_id,credential
# Extract required fields
tenant_id = creds.get("tenant_id")
client_id = creds.get("client_id")
client_secret = creds.get("client_secret")
subscription_id = creds.get("subscription_id")

# Validate required fields
if not all([tenant_id, client_id, client_secret, subscription_id]):
    raise ValueError("The credentials file is missing one or more required fields: 'tenant_id', 'client_id', 'client_secret', 'subscription_id'.")

# Authenticate using ClientSecretCredential
credential = ClientSecretCredential(tenant_id=tenant_id, client_id=client_id, client_secret=client_secret)

### MongoDB Client Creation ; Loading Product and Order Data 

#### Execute the cell below to upload the data from the Products, Orders, Order_Details CSV file into product_collection, order_collection,order_details_collection onject.

In [4]:
def create_Mongo_Resources():
    # Initialize mongomock client to simulate MongoDB
    client = mongomock.MongoClient()

    # Accessing the simulated MongoDB database
    db = client['inventory_system']

    order_collection = db['orders']
    product_collection = db['products']
    order_details_collection = db['order_details']

    # Loading CSV files
    orders_df = pd.read_csv('Orders.csv')
    products_df = pd.read_csv('Products.csv')
    order_details_df = pd.read_csv('Order_Details.csv')

    # Convert DataFrames to dictionaries (for MongoDB insert)
    orders_data = orders_df.to_dict(orient='records')
    products_data = products_df.to_dict(orient='records')
    order_details_data = order_details_df.to_dict(orient='records')

    # Insert data into MongoDB collections
    order_collection.insert_many(orders_data)
    product_collection.insert_many(products_data)
    order_details_collection.insert_many(order_details_data)

    # Verify insertion by printing counts of documents in each collection
    print(f"Orders count: {order_collection.count_documents({})}")
    print(f"Products count: {product_collection.count_documents({})}")
    print(f"Order Details count: {order_details_collection.count_documents({})}")

    return product_collection,order_collection,order_details_collection

In [5]:
products_collection, orders_collection,order_details_collection = create_Mongo_Resources()

Orders count: 50
Products count: 20
Order Details count: 100


### Question 1: Extract data from mongodb

* The function `extract_data_from_mongodb` extracts the data from the given MongoDB collections and returns pandas Dataframes.
* Retrieve all documents from the products_collection in MongoDB, including only the __"ProductId", "Name", "Category", "Price" and "Stock"__ fields for each document. The result in the form of list must be converted into Pandas dataframe.
* Retrieve all documents from the orders_collection in MongoDB, including only the __"OrderId", "CustomerId", and "Status"__ fields for each document. The result in the form of list must be converted into Pandas dataframe.
* Retrieve all documents from the order_details_collection in MongoDB, including only the __"OrderDetailId", "OrderId", "ProductId" and "Quantity"__ fields for each document. The result in the form of list must be converted into Pandas dataframe.
* Arguments: 
    - products_collection: Products collection in MongoDB
    - orders_collection: Orders collection in MongoDB
    - order_details_collection : Order Details collection in the MongoDB
* Returns:
    - products_df: Pandas dataframe representing data in Products collection with the specified columns
    - orders_df: Pandas dataframe representing data in Orders collection with the specified columns
    - order_details_df: Pandas dataframe representing data in Order Details collection with the specified columns
* Sample Output:
```
Products Data:
   ProductId          Name     Category  Price  Stock
0          1      iPhone15  Electronics    999    100
1          2     GalaxyS24  Electronics    899     80

Orders Data:
   OrderId  CustomerId     Status
0        1         125   Canceled
1        2         187  Delivered

Order Details Data:
   OrderDetailId  OrderId  ProductId  Quantity
0              1        9          1         3
1              2        2          8         2
```
* Your solution should be provided within __code starts here__ and __code ends here__ block.

In [6]:
def extract_data_from_mongodb(products_collection, orders_collection, order_details_collection): 
    orders_df, products_df,order_details_df = None, None, None
    # code starts here
    # Extract Products data
    products = list(products_collection.find({}, {"_id": 0, "ProductId": 1, "Name":1,"Category":1,"Price": 1,"Stock": 1}))
    products_df = pd.DataFrame(products)

    # Extract Orders data
    orders = list(orders_collection.find({}, {"_id": 0, "OrderId": 1, "CustomerId":1,"Status":1}))
    orders_df = pd.DataFrame(orders)   

    # Extract Order Details data
    order_details = list(order_details_collection.find({}, {"_id": 0, "OrderDetailId": 1, "OrderId":1,"ProductId":1,"Quantity": 1}))
    order_details_df = pd.DataFrame(order_details) 
    # code ends here
    return products_df, orders_df,order_details_df

In [9]:
products_df, orders_df,order_details_df = extract_data_from_mongodb(products_collection, orders_collection, order_details_collection)
# if (products_df != None and orders_df != None and order_details_df != None):
# print("Data extraction failed.")
print("\nProducts Data:")
print(products_df.head())
print("Orders Data:")
print(orders_df.head())
print("Order Details Data:")
print(order_details_df.head())


Products Data:
   ProductId          Name     Category  Price  Stock
0          1      iPhone15  Electronics    999    100
1          2     GalaxyS24  Electronics    899     80
2          3    AirPodsPro  Electronics    249    150
3          4    MacBookPro  Electronics   1999     50
4          5  PlayStation5  Electronics    499     70
Orders Data:
   OrderId  CustomerId     Status
0        1         125   Canceled
1        2         187  Delivered
2        3         186    Pending
3        4         137   Canceled
4        5         194    Shipped
Order Details Data:
   OrderDetailId  OrderId  ProductId  Quantity
0              1        9          1         3
1              2        2          8         2
2              3       22         11         1
3              4       33         14         2
4              5       24         10         2


# Creating databricks workspace and retrieving token

In [10]:
def create_databricks_workspace_and_get_token(
    credential,
    subscription_id,
    resource_group_name,
    managed_resource_group_name,
    workspace_name,
    location
):
    """
    Creates a Databricks workspace and returns the workspace URL and access token.

    Args:
        credential (DefaultAzureCredential): Azure DefaultAzureCredential.
        subscription_id (str): Azure subscription ID.
        resource_group_name (str): Name of the Azure resource group.
        managed_resource_group_name (str): Managed resource group name for Databricks.
        workspace_name (str): Name of the Databricks workspace.
        location (str): Azure region for the workspace.

    Returns:
        dict: A dictionary containing the workspace URL and access token.
    """
    # Initialize Resource Management Client
    resource_client = ResourceManagementClient(credential, subscription_id)

    # Create the resource group if it does not exist
    print(f"Ensuring resource group '{resource_group_name}' exists in '{location}'...")
    resource_client.resource_groups.create_or_update(
        resource_group_name,
        {"location": location}
    )
    print(f"Resource group '{resource_group_name}' is ready.")

    # Initialize Databricks Management Client
    databricks_client = AzureDatabricksManagementClient(credential, subscription_id)

    # Construct managed resource group ID
    managed_resource_group_id = f"/subscriptions/{subscription_id}/resourceGroups/{resource_group_name}-managed"

    # Create the Databricks workspace
    print(f"Creating Databricks workspace '{workspace_name}' in '{location}'...")
    workspace = databricks_client.workspaces.begin_create_or_update(
        resource_group_name=resource_group_name,
        workspace_name=workspace_name,
        parameters={
            "location": location,
            "managed_resource_group_id": managed_resource_group_id,
            "sku": {"name": "premium"}
        }
    ).result()

    print(f"Databricks workspace '{workspace_name}' created successfully.")
    # print(workspace.workspace_url)
    # workspaceurl = workspace.workspace_url
    # Retrieve the workspace URL
    
 
    return workspace_name

In [11]:
workspace_name = create_databricks_workspace_and_get_token(
        credential,
        subscription_id,
        resource_group_name,
        managed_resource_group_name,
        workspace_name,
        location
    )

Ensuring resource group 'Test1' exists in 'eastus'...
Resource group 'Test1' is ready.
Creating Databricks workspace 'demoworkspace2379' in 'eastus'...
Databricks workspace 'demoworkspace2379' created successfully.


### Retrieving Databricks HTTP Path

Returns the Databricks server hostname and the HTTP path for database connections which will be using for connection creation for the databricks.

In [12]:
def get_databricks_http_path(credentials_file="azure_credentials.json"):
    """
    Retrieves the Databricks HTTP Path for connection.

    Args:
        credentials_file (str): Path to Azure credentials file.

    Returns:
        tuple: (Databricks Server Hostname, HTTP Path)
    """
    
    # Get Databricks workspace URL
    workspace_url = f"https://management.azure.com/subscriptions/{subscription_id}/resourceGroups/{resource_group_name}/providers/Microsoft.Databricks/workspaces/{workspace_name}?api-version=2018-04-01"

    access_token = credential.get_token("https://management.azure.com/.default").token

    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json"
    }

    response = requests.get(workspace_url, headers=headers)
    if response.status_code != 200:
        raise Exception(f"Failed to retrieve Databricks workspace: {response.text}")

    workspace_data = response.json()
    databricks_host = workspace_data["properties"]["workspaceUrl"]

    # Get SQL Warehouses from Databricks
    sql_endpoint_url = f"https://{databricks_host}/api/2.0/sql/warehouses"
    db_access_token = credential.get_token("2ff814a6-3304-4ab8-85cb-cd0e6f879c1d/.default").token  # Databricks token

    db_headers = {
        "Authorization": f"Bearer {db_access_token}",
        "Content-Type": "application/json"
    }

    sql_response = requests.get(sql_endpoint_url, headers=db_headers)
    if sql_response.status_code != 200:
        raise Exception(f"Failed to retrieve SQL Warehouses: {sql_response.text}")

    warehouses = sql_response.json()["warehouses"]
    if not warehouses:
        raise Exception("No SQL Warehouses found in Databricks.")

    warehouse = warehouses[0]  # Selecting the first available warehouse
    http_path = warehouse["odbc_params"]["path"]

    return databricks_host, http_path

In [13]:
databricks_url, databricks_http_path = get_databricks_http_path()
print(databricks_url)
databricks_http_path

adb-2564247772168554.14.azuredatabricks.net


'/sql/1.0/warehouses/e311dbfc0840100b'

### Create token of the databricks using the below steps

Login to databricks with the credentails , and Generate a personal access token and provide here

In [14]:
def get_token():
    token = "dapi9ad66483a1783de585529a283bd34884-3"
    return token

In [15]:
global workspaceurl,access_token
access_token = get_token()

workspaceurl = "https://"+databricks_url

### Question 2: Spark Dataframe creation

* The function `create_spark_dataframe` initializes a Spark session and converts three given Pandas DataFrames (products_df, orders_df, order_details_df) into corresponding Spark DataFrames (products_spark_df, orders_spark_df, orderdetails_spark_df).
* Create a Spark session with the application name "InventoryProcessing".
* Convert the product, orders and order details dataframe to __Spark dataframe__
* Arguments: 
    - products_df: Pandas Dataframe representing Products 
    - orders_df: Pandas Dataframe representing Orders 
    - order_details_df : Pandas Dataframe representing Order Details 
* Returns:
    - products_spark_df: Spark DataFrame created from the products Pandas DataFrame.
    - orders_spark_df: Spark DataFrame created from the orders Pandas DataFrame.
    - orderdetails_spark_df: Spark DataFrame created from the order details Pandas DataFrame.
* Your solution should be provided within __code starts here__ and __code ends here__ block.

In [16]:
def create_spark_dataframe(products_df, orders_df, order_details_df):
    products_spark_df, orders_spark_df, orderdetails_spark_df = None,None,None
    # code starts here
    # Initialize Spark Session
    spark = SparkSession.builder.appName("InventoryProcessing").getOrCreate()

    # Convert orders and products dataframes to Spark DataFrames
    products_spark_df = spark.createDataFrame(products_df)
    orders_spark_df = spark.createDataFrame(orders_df)
    orderdetails_spark_df = spark.createDataFrame(order_details_df) 
    
    # code ends here
    return products_spark_df, orders_spark_df, orderdetails_spark_df

In [17]:
products_spark_df, orders_spark_df, orderdetails_spark_df = create_spark_dataframe(products_df, orders_df, order_details_df)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/12 17:55:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Question 3: Using Spark, analyse the orders dataframe and find out the top 5 customers who placed the most orders

* The function `get_top5_customers_by_orders` analyzes the orders_spark_df Spark DataFrame to identify the top 5 customers who placed the highest number of orders. It returns a Spark DataFrame containing the CustomerId and OrderCount for these 5 top customers, sorted in descending order of OrderCount.
* Arguments: 
    - orders_spark_df: Spark DataFrame containing order details.
* Returns:
    - top_customers_df (pyspark.sql.DataFrame): Spark DataFrame containing CustomerId and OrderCount of the top 5 customers.
* Make sure the column names are as specified
* Sample return data
```
        +----------+----------+
        |CustomerId|OrderCount|
        +----------+----------+
        |       146|         4|
        |       110|         3|
```
* Your solution should be provided within __code starts here__ and __code ends here__ block.

In [18]:
def get_top5_customers_by_orders(orders_spark_df):
    top_customers_df = None  
    # code starts here
    try:
        # Group by CustomerId and count the number of orders
        customer_order_count = orders_spark_df.groupBy("CustomerId").agg(
            F.count("OrderId").alias("OrderCount")
        )
        
        # Order by the count in descending order and get the top 5
        top_customers_df = customer_order_count.orderBy(F.desc("OrderCount")).limit(5)
    
    except Exception as e:
        print(f"Error occurred while calculating top customers: {e}")
    # code ends here
    return top_customers_df

In [19]:
top5_customers_df = get_top5_customers_by_orders(orders_spark_df)
if (top5_customers_df!= None):
    top5_customers_df.show()

                                                                                

+----------+----------+
|CustomerId|OrderCount|
+----------+----------+
|       146|         4|
|       110|         3|
|       170|         2|
|       137|         2|
|       165|         2|
+----------+----------+



### Question 4: Calculate Top 5 Revenue Products

* The function `calculate_top5_revenue_products` analyzes the orderdetails_spark_df and products_spark_df Spark DataFrames to calculate the top 5 products that generated the highest total revenue. It returns a Spark DataFrame containing the ProductId, ProductName, and TotalRevenue for these top 5 products, sorted in descending order of TotalRevenue.
* Rename `ProductId` column in products DataFrame to `ProductId_Products` avoid ambiguity
* Rename `Name` column in joined dataframe to `ProductName`
* Calculate `TotalRevenue` as Quantity * Price
* Arguments:
    - products_spark_df (pyspark.sql.DataFrame): Spark DataFrame containing product details, including ProductId, Name (ProductName), and Price.
    - orderdetails_spark_df (pyspark.sql.DataFrame): Spark DataFrame containing order details, including ProductId, Quantity, and other order-specific details.    
* Returns:
    - top_revenue_products_df (pyspark.sql.DataFrame): Spark DataFrame containing __ProductId, ProductName, and TotalRevenue__ of the top 5 revenue-generating products.
* Make sure the column names are as specified
* Sample return data:
```
        +------------------+--------------+------------+
        |ProductId_Products|   ProductName|TotalRevenue|
        +------------------+--------------+------------+
        |                 1|      iPhone15|       30969|
        |                11|  Refrigerator|       22485|
```
* Your solution should be provided within __code starts here__ and __code ends here__ block.

In [20]:
def calculate_top5_revenue_products(products_spark_df, orderdetails_spark_df):
    top5_revenue_products = None
    # code starts here
    # Rename ProductId column in products DataFrame to avoid ambiguity
    products_spark_df = products_spark_df.withColumnRenamed("ProductId", "ProductId_Products")

    # Join order_details with products DataFrame to get product prices
    joined_df = orderdetails_spark_df.join(
        products_spark_df,
        orderdetails_spark_df.ProductId == products_spark_df.ProductId_Products
    )

    # Rename columns to avoid ambiguity
    joined_df = joined_df.withColumnRenamed("ProductId", "ProductId_Orders")
    joined_df = joined_df.withColumnRenamed("Name", "ProductName")

    # Calculate total revenue for each product
    revenue_df = joined_df.withColumn("Revenue", joined_df.Quantity * joined_df.Price)

    # Group by ProductId and ProductName, and calculate total revenue
    revenue_per_product = revenue_df.groupBy("ProductId_Products", "ProductName").agg(
        F.sum("Revenue").alias("TotalRevenue")
    )

    # Order by TotalRevenue in descending order and return the top 5 products
    top5_revenue_products = revenue_per_product.orderBy(F.desc("TotalRevenue")).limit(5) 
    
    # code ends here
    return top5_revenue_products


In [21]:
top5_revenue_products = calculate_top5_revenue_products(products_spark_df,orderdetails_spark_df)
if top5_revenue_products!= None:
    top5_revenue_products.show()

+------------------+--------------+------------+
|ProductId_Products|   ProductName|TotalRevenue|
+------------------+--------------+------------+
|                 1|      iPhone15|       30969|
|                11|  Refrigerator|       22485|
|                 5|  PlayStation5|       12475|
|                 6|   XboxSeriesX|       12475|
|                12|WashingMachine|        9588|
+------------------+--------------+------------+



### Question 5: Transform data to get detailed order details

* The function `transform_order_details` transforms the order details data in orderdetails_spark_df by joining it with orders_spark_df and products_spark_df to include customer and product details, and calculates the total value for each order detail.
* The final_order_details_spark_df should have only the following columns:
    - OrderDetailId,
    - OrderId,
    - ProductId,
    - CustomerId,
    - Name,
    - Category,
    - Quantity,
    - Price,
    - TotalValue
* Args:
    - products_spark_df (pyspark.sql.DataFrame): Spark DataFrame containing product details.
    - orders_spark_df (pyspark.sql.DataFrame): Spark DataFrame containing order information.
    - orderdetails_spark_df (pyspark.sql.DataFrame): Spark DataFrame containing order details.
* Returns:
    - final_order_details_spark_df (pyspark.sql.DataFrame): Transformed Spark DataFrame containing enhanced order details 
        with selected columns and calculated TotalValue.
* Make sure the column names are as specified
* Sample return data:
```
        +-------------+-------+---------+----------+------------+-----------+--------+-----+----------+
        |OrderDetailId|OrderId|ProductId|CustomerId|        Name|   Category|Quantity|Price|TotalValue|
        +-------------+-------+---------+----------+------------+-----------+--------+-----+----------+
        |           90|      3|        5|       186|PlayStation5|Electronics|       4|  499|      1996|
        |           98|     32|        5|       164|PlayStation5|Electronics|       2|  499|       998|
        |           91|     34|        5|       165|PlayStation5|Electronics|       5|  499|      2495|
```
* Your solution should be provided within __code starts here__ and __code ends here__ block.

In [22]:
def transform_order_details(products_spark_df, orders_spark_df, orderdetails_spark_df):
    final_order_details_spark_df = None
    # Code starts here
    # Step 1: Renaming columns in orders DataFrame to avoid conflicts
    orders_spark_df = orders_spark_df.withColumnRenamed("OrderId", "OrderId_Orders")

    # Step 2: Join order_details with orders DataFrame to get CustomerId
    order_with_customer_df = orderdetails_spark_df.join(
        orders_spark_df,
        orderdetails_spark_df.OrderId == orders_spark_df.OrderId_Orders,
        "inner"  # Use "inner" join
    )

    # Step 3: Rename ProductId column in products DataFrame to avoid conflicts
    products_spark_df = products_spark_df.withColumnRenamed("ProductId", "ProductId_Products")

    # Step 4: Join the resulting DataFrame with products DataFrame to get product details (ProductName, Category)
    enhanced_order_details_df = order_with_customer_df.join(
        products_spark_df,
        order_with_customer_df.ProductId == products_spark_df.ProductId_Products,
        "inner"  # Use "inner" join
    )

    # Step 5: Calculate TotalValue (Quantity * Price)
    enhanced_order_details_df = enhanced_order_details_df.withColumn(
        "TotalValue",
        enhanced_order_details_df.Quantity * enhanced_order_details_df.Price
    )
    # Step 6: Select relevant columns for enhanced report
    final_order_details_spark_df = enhanced_order_details_df.select(
        "OrderDetailId",
        "OrderId",
        "ProductId",
        "CustomerId",
        "Name",
        "Category",
        "Quantity",
        "Price",
        "TotalValue"
    ) 
    
    # Code ends here
    return final_order_details_spark_df

In [23]:
final_order_details_spark_df = transform_order_details(products_spark_df, orders_spark_df, orderdetails_spark_df)
if final_order_details_spark_df != None:
    final_order_details_spark_df.show()

                                                                                

+-------------+-------+---------+----------+------------+-----------+--------+-----+----------+
|OrderDetailId|OrderId|ProductId|CustomerId|        Name|   Category|Quantity|Price|TotalValue|
+-------------+-------+---------+----------+------------+-----------+--------+-----+----------+
|           97|     16|        7|       170|  SmartWatch|  Wearables|       1|  199|       199|
|           83|     16|        7|       170|  SmartWatch|  Wearables|       5|  199|       995|
|           66|     22|        7|       168|  SmartWatch|  Wearables|       3|  199|       597|
|           15|     11|        7|       137|  SmartWatch|  Wearables|       2|  199|       398|
|           77|     47|        6|       118| XboxSeriesX|Electronics|       5|  499|      2495|
|           56|     47|        6|       118| XboxSeriesX|Electronics|       1|  499|       499|
|           94|     30|        6|       110| XboxSeriesX|Electronics|       3|  499|      1497|
|           63|      2|        6|       

### Establishing a Databricks SQL Connection

The function get_databricks_connection establishes and returns a connection to a Databricks SQL warehouse or cluster using the Databricks SQL Connector for Python. It is essential to close the connection after use to free up resources.

Process:

Set up connection details using the Databricks workspace URL, HTTP path, and access token. Establish a connection using the sql.connect method from the Databricks SQL Connector for Python. Args:

None Returns:

conn (sql.Connection): The Databricks SQL connection object.

Error Handling:

The function checks if all necessary environment variables (DATABRICKS_SERVER_HOSTNAME, DATABRICKS_HTTP_PATH, DATABRICKS_ACCESS_TOKEN) are set. If any are missing, it raises a ValueError. If the connection attempt fails, it catches the exception, prints an error message, and returns None.

In [24]:
import os

def get_databricks_connection():
    """
    Establish and return a Databricks SQL connection.
    Ensure to close the connection after use.

    Returns:
        conn (sql.Connection): Databricks SQL connection object.
    """
    # Set up connection details
    DATABRICKS_SERVER_HOSTNAME = workspaceurl  # Replace with your Databricks workspace URL
    DATABRICKS_HTTP_PATH = databricks_http_path # Replace with your warehouse's HTTP Path
    DATABRICKS_ACCESS_TOKEN = access_token  # Replace with your access token

    # Establish connection
    conn = sql.connect(
        server_hostname=DATABRICKS_SERVER_HOSTNAME,
        http_path=DATABRICKS_HTTP_PATH,
        access_token=DATABRICKS_ACCESS_TOKEN
    )

    return conn

In [25]:
global conn
conn = get_databricks_connection()
conn

<databricks.sql.client.Connection at 0x7e1498dd2bf0>

### Question 7: Create snowflake Resources

* This function `setup_database_and_table` creates a database named __PRODUCT_INVENTORY__ and a schema named __ANALYTICS__ in the Snowflake instance if they do not already exist, using the provided cursor object.
* Create table named __TABLE ORDER_DETAILS__ in __PRODUCT_INVENTORY__ and schema __ANALYTICS__
* Table __TABLE ORDER_DETAILS__ should have the following schema:
    - OrderDetailId (INT): Unique identifier for each order detail record.
    - OrderId (INT): Identifier for the associated order.
    - ProductId (INT): Identifier for the product.
    - CustomerId (INT): Identifier for the customer who placed the order.
    - Name (STRING): Name of the product.
    - Category (STRING): Category of the product.
    - Quantity (INT): Number of units ordered.
    - Price (FLOAT): Price per unit of the product.
    - TotalValue (FLOAT): Total value of the order detail (calculated as `Quantity * Price`).
* The database, schema, and table are created using SQL commands executed through the provided Snowflake cursor.
* Arguments:
    - cursor (snowflake.connector.cursor.SnowflakeCursor): A Snowflake cursor object used to execute SQL commands.
* Returns:
    - result (str): A message indicating the result of the database creation operation.
        - Success: "Database, schema, and table setup successfully."
        - Failure: "An error occurred during setup: {e}"
* Your solution should be provided within __code starts here__ and __code ends here__ block.
* Implement suitable error handling.

In [26]:
def setup_database_and_table(conn):
    # Code starts here

    # Step 2: Create table if it does not exist
    create_query = """
    CREATE OR REPLACE TABLE ORDER_DETAILS (
        OrderDetailId INT,
        OrderId INT,
        ProductId INT,
        CustomerId INT,
        Name STRING,
        Category STRING,
        Quantity INT,
        Price FLOAT,
        TotalValue FLOAT
    )
    """

    with conn.cursor() as cursor:
        
        result = cursor.execute(create_query)

    return result

In [27]:
result = setup_database_and_table(conn)
result

<databricks.sql.client.Cursor at 0x7e1498dd31c0>

### Question 8: Insert data into the ORDER_DETAILS table

* This function `insert_order_details_into_snowflake` inserts order details into the Snowflake ORDER_DETAILS table using the provided Snowflake cursor object.
* Arguments:
    - cursor (snowflake.connector.cursor.SnowflakeCursor): A Snowflake cursor object used to execute SQL commands.
    - final_order_details_df (pandas.DataFrame): A DataFrame containing the final order details to be inserted into the Snowflake ORDER_DETAILS table.
* Returns:
    - insert_status (str): A message indicating the result of the insertion operation.
        - Success: "Order details data successfully inserted into ORDER_DETAILS."
        - Failure: "Error occurred while inserting order details data: {e}"
* Your solution should be provided within code starts here and code ends here block.
* Suitable error handling is implemented to capture and return any issues during the insertion process.

In [28]:
def insert_order_details_into_sql(conn, final_order_details_spark_df):
    insert_status = None
    # Code starts here
    try:
        # Convert PySpark DataFrame to Pandas DataFrame if necessary
        if hasattr(final_order_details_spark_df, "toPandas"):
            final_order_details_df = final_order_details_spark_df.toPandas()

        # Ensure data types are appropriate for the columns
        final_order_details_df['OrderDetailId'] = final_order_details_df['OrderDetailId'].astype(int)
        final_order_details_df['OrderId'] = final_order_details_df['OrderId'].astype(int)
        final_order_details_df['ProductId'] = final_order_details_df['ProductId'].astype(int)
        final_order_details_df['CustomerId'] = final_order_details_df['CustomerId'].astype(int)
        final_order_details_df['Name'] = final_order_details_df['Name'].astype(str)
        final_order_details_df['Category'] = final_order_details_df['Category'].astype(str)
        final_order_details_df['Quantity'] = final_order_details_df['Quantity'].astype(int)
        final_order_details_df['Price'] = final_order_details_df['Price'].astype(float)
        final_order_details_df['TotalValue'] = final_order_details_df['TotalValue'].apply(lambda x: float(x))

        # Iterate over the rows in the Pandas DataFrame
        for index, row in final_order_details_df.iterrows():
            # Explicitly cast to Python's native types
            order_detail_id = int(row['OrderDetailId'])
            order_id = int(row['OrderId'])
            product_id = int(row['ProductId'])
            customer_id = int(row['CustomerId'])
            product_name = str(row['Name'])
            category = str(row['Category'])
            quantity = int(row['Quantity'])
            price = float(row['Price'])
            total_value = float(row['TotalValue'])

            # Construct the SQL insert statement
            insert_sql = """
            INSERT INTO ORDER_DETAILS (OrderDetailId, OrderId, ProductId, CustomerId, Name, Category, Quantity, Price, TotalValue)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
            """
            # Execute the SQL statement with the values
            with conn.cursor() as cursor:
                cursor.execute(insert_sql, (order_detail_id, order_id, product_id, customer_id,
                                                product_name, category, quantity, price, total_value))
 
        insert_status = "Order details data successfully inserted into ORDER_DETAILS."
    except Exception as e:
        # Catch any unforeseen errors during the iteration over the Pandas DataFrame
        insert_status = f"Error occurred while inserting order details data: : {str(e)}" 
    # Code ends here
    return insert_status

In [29]:
insert_status = insert_order_details_into_sql(conn, final_order_details_spark_df)
insert_status

                                                                                

'Order details data successfully inserted into ORDER_DETAILS.'

### Question 9: Find the total revenue by category from the ORDER_DETAILS table.

* This function `find_total_revenue_by_category` fetches the total revenue for each product category from the ORDER_DETAILS table.
* The result should be ordered by Total Revenue in Descending Order.
* Covert the result from the SQL Query into a dictionary 
* Arguments: 
    - cursor: Snowflake database cursor.
* Returns:
    - category_revenue (list): A list of dictionaries containing the Category and TotalRevenue of each category.
* Sample return data:
```
        [{'Category': 'Electronics', 'TotalRevenue': 70347.0},
        {'Category': 'Appliances', 'TotalRevenue': 34469.0}....]
```
* Your solution should be provided within code starts here and code ends here block.
* Suitable error handling is implemented to capture and return any issues during the insertion process.

In [30]:
def find_total_revenue_by_category(conn):
    category_revenue  = []
    # Code starts here
    try:
        with conn.cursor() as cursor:  # Cursor opens here
            # Use the appropriate database and schema
            # cursor.execute("USE DATABASE PRODUCT_INVENTORY;")
            # cursor.execute("USE SCHEMA ANALYTICS;")

            # SQL query to find total revenue by category
            query = """
                SELECT Category, SUM(TotalValue) AS TotalRevenue
                FROM ORDER_DETAILS
                GROUP BY Category
                ORDER BY TotalRevenue DESC;
            """
            cursor.execute(query)
            results = cursor.fetchall()

            # Convert results to a list of dictionaries
            for row in results:
                category_revenue.append({
                    "Category": row[0],
                    "TotalRevenue": row[1]
                })
    except Exception as e:
        # Handle errors and return an error message in the result
        category_revenue = [{"Error": f"Error occurred: {str(e)}"}]
    # code ends here
    return category_revenue

In [31]:
category_revenue = find_total_revenue_by_category(conn)
category_revenue

[{'Category': 'Electronics', 'TotalRevenue': 70347.0},
 {'Category': 'Appliances', 'TotalRevenue': 34469.0},
 {'Category': 'Home', 'TotalRevenue': 8767.0},
 {'Category': 'Transportation', 'TotalRevenue': 6986.0},
 {'Category': 'Sports', 'TotalRevenue': 6746.0},
 {'Category': 'Kitchen', 'TotalRevenue': 6338.0},
 {'Category': 'Wearables', 'TotalRevenue': 2189.0}]

------------------------------------------------------- END OF ASSESSMENT --------------------------------------------------------------------------------