## Transform Data Using PySpark Functionalities

Databricks Data Management and Analysis

Objective: Participants will implement a fully modular ETL pipeline using Python to interact with a Databricks workspace, load data, perform transformations, and generate insights.

They must:

Use Python functions to manage Databricks SQL operations (setup, data loading, transformations).
Analyze data using Python for data analysis and visualization.
Utilize PySpark for advanced transformations where applicable.
Provide the solution in Python functions following the specified structure.
Refrain from modifying any provided boilerplate code, as it may cause unexpected behavior.
The solution should be implemented between the comments # code starts here and # code ends here.
Before submission:

Ensure there are no errors while executing the notebook.
Set the kernel of the Jupyter notebook to Python 3.10.6 if it is not set already.


In [26]:
import json
import os
from azure.identity import ClientSecretCredential
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.databricks import AzureDatabricksManagementClient
from pyspark.sql import SparkSession

from databricks import sql
import subprocess

import requests
from requests.auth import HTTPBasicAuth

In [27]:
import random
random_number = random.randint(1000, 9999)
workspace_name = f"demoworkspacesi{random_number}"
resource_group_name = "Test"
location = "eastus"
cluster_name = "my-databricks-cluster"
managed_resource_group_name = "test-rg"

Loading Azure Credentials

Executes the process of loading Azure credentials from a JSON file and returning an authenticated credential object.

This function reads the azure_credentials.json file (or a specified file path), extracts required authentication details (tenant_id, client_id, client_secret, and subscription_id), and validates their presence. It then authenticates using ClientSecretCredential, allowing secure access to Azure resources.

In [28]:


def load_azure_credentials(credentials_file: str = "azure_credentials.json"):
    """
    Loads Azure credentials from a JSON file and returns an authenticated credential object.

    Args:
        credentials_file (str): Path to the JSON file containing Azure credentials.

    Returns:
        tuple: A tuple containing:
            - ClientSecretCredential object for authentication
            - Subscription ID as a string
    """
    # Ensure the file exists
    if not os.path.exists(credentials_file):
        raise FileNotFoundError(f"The credentials file '{credentials_file}' does not exist.")
    
    # Load credentials from the file
    with open(credentials_file, "r") as file:
        creds = json.load(file)
    global tenant_id , client_id,client_secret,subscription_id
    # Extract required fields
    tenant_id = creds.get("tenant_id")
    client_id = creds.get("client_id")
    client_secret = creds.get("client_secret")
    subscription_id = creds.get("subscription_id")
    
    # Validate required fields
    if not all([tenant_id, client_id, client_secret, subscription_id]):
        raise ValueError("The credentials file is missing one or more required fields: 'tenant_id', 'client_id', 'client_secret', 'subscription_id'.")
    
    # Authenticate using ClientSecretCredential
    credential = ClientSecretCredential(tenant_id=tenant_id, client_id=client_id, client_secret=client_secret)
    
    return credential, subscription_id




In [29]:
# Example usage

credential, subscription_id = load_azure_credentials("azure_credentials.json")
    

Creating Databricks Workspace and Retrieving Access Token

Executes the process of creating an Azure Databricks workspace and retrieving its access token.

This function initializes the Azure Resource Management Client to ensure the resource group exists, then uses the Azure Databricks Management Client to create a Databricks workspace in the specified location. The workspace is configured with a managed resource group and a "premium" SKU. Once created, the function returns the workspace name.

In [30]:


def create_databricks_workspace_and_get_token(
    credential: credential,
    subscription_id: str,
    resource_group_name: str,
    managed_resource_group_name: str,
    workspace_name: str,
    location: str
) -> dict:
    """
    Creates a Databricks workspace and returns the workspace URL and access token.

    Args:
        credential (DefaultAzureCredential): Azure DefaultAzureCredential.
        subscription_id (str): Azure subscription ID.
        resource_group_name (str): Name of the Azure resource group.
        managed_resource_group_name (str): Managed resource group name for Databricks.
        workspace_name (str): Name of the Databricks workspace.
        location (str): Azure region for the workspace.

    Returns:
        dict: A dictionary containing the workspace URL and access token.
    """
    # Initialize Resource Management Client
    resource_client = ResourceManagementClient(credential, subscription_id)

    # Create the resource group if it does not exist
    print(f"Ensuring resource group '{resource_group_name}' exists in '{location}'...")
    resource_client.resource_groups.create_or_update(
        resource_group_name,
        {"location": location}
    )
    print(f"Resource group '{resource_group_name}' is ready.")

    # Initialize Databricks Management Client
    databricks_client = AzureDatabricksManagementClient(credential, subscription_id)

    # Construct managed resource group ID
    managed_resource_group_id = f"/subscriptions/{subscription_id}/resourceGroups/{resource_group_name}-managed"

    # Create the Databricks workspace
    print(f"Creating Databricks workspace '{workspace_name}' in '{location}'...")
    workspace = databricks_client.workspaces.begin_create_or_update(
        resource_group_name=resource_group_name,
        workspace_name=workspace_name,
        parameters={
            "location": location,
            "managed_resource_group_id": managed_resource_group_id,
            "sku": {"name": "premium"}
        }
    ).result()

    print(f"Databricks workspace '{workspace_name}' created successfully.")
    # print(workspace.workspace_url)
    # workspaceurl = workspace.workspace_url
    # Retrieve the workspace URL
    
 
    return workspace_name


   


In [31]:
workspace_name = create_databricks_workspace_and_get_token(
        credential,
        subscription_id,
        resource_group_name,
        managed_resource_group_name,
        workspace_name,
        location
    )

Ensuring resource group 'Test' exists in 'eastus'...
Resource group 'Test' is ready.
Creating Databricks workspace 'demoworkspacesi4176' in 'eastus'...
Databricks workspace 'demoworkspacesi4176' created successfully.


Retrieving Databricks HTTP Path

Fetches the Databricks workspace URL using the Azure Management API.
Retrieves the Databricks workspace properties, including the workspace URL.
Authenticates and generates an access token for secure API requests.
Queries the available SQL Warehouses within the Databricks workspace.
Extracts the HTTP path from the first available warehouse.
Returns the Databricks server hostname and the HTTP path for database connections.

In [32]:
def get_databricks_http_path(credentials_file="azure_credentials.json"):
    """
    Retrieves the Databricks HTTP Path for connection.

    Args:
        credentials_file (str): Path to Azure credentials file.

    Returns:
        tuple: (Databricks Server Hostname, HTTP Path)
    """
    
    # Get Databricks workspace URL
    workspace_url = f"https://management.azure.com/subscriptions/{subscription_id}/resourceGroups/{resource_group_name}/providers/Microsoft.Databricks/workspaces/{workspace_name}?api-version=2018-04-01"

    access_token = credential.get_token("https://management.azure.com/.default").token

    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json"
    }

    response = requests.get(workspace_url, headers=headers)
    if response.status_code != 200:
        raise Exception(f"Failed to retrieve Databricks workspace: {response.text}")

    workspace_data = response.json()
    databricks_host = workspace_data["properties"]["workspaceUrl"]

    # Get SQL Warehouses from Databricks
    sql_endpoint_url = f"https://{databricks_host}/api/2.0/sql/warehouses"
    db_access_token = credential.get_token("2ff814a6-3304-4ab8-85cb-cd0e6f879c1d/.default").token  # Databricks token

    db_headers = {
        "Authorization": f"Bearer {db_access_token}",
        "Content-Type": "application/json"
    }

    sql_response = requests.get(sql_endpoint_url, headers=db_headers)
    if sql_response.status_code != 200:
        raise Exception(f"Failed to retrieve SQL Warehouses: {sql_response.text}")

    warehouses = sql_response.json()["warehouses"]
    if not warehouses:
        raise Exception("No SQL Warehouses found in Databricks.")

    warehouse = warehouses[0]  # Selecting the first available warehouse
    http_path = warehouse["odbc_params"]["path"]

    return databricks_host, http_path

In [33]:
databricks_url, databricks_http_path = get_databricks_http_path()
print(databricks_url)
databricks_http_path

adb-1384417228045474.14.azuredatabricks.net


'/sql/1.0/warehouses/7e2d54601457ffa5'

create token of the databricks using the below steps 

In [34]:
databricks_token = "dapidb4c14f01445af5da740009ea8b9d777-3"

Creating a Databricks Cluster

Constructs the Databricks API endpoint for cluster creation.
Formats the workspace URL to ensure proper API request formatting.
Sets up authentication headers using the provided access token.
Defines cluster configuration, including the name, Spark version, node type, and auto-termination settings.
Sends a POST request to the Databricks API to create the cluster.
Handles API responses, printing success messages or error details based on the response status.

In [35]:
import requests
import json

def create_databricks_cluster(workspaceurl, access_token: str):
    
    # Validate and format the workspace URL
    
    # API endpoint for cluster creation
    api_endpoint = "/api/2.1/clusters/create"
    api_url = workspaceurl.rstrip("/") + api_endpoint  # Ensure no trailing slash in URL

    # Headers for authentication
    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json",
    }

    # Cluster configuration
    cluster_config = {
        "cluster_name": "democluster",
        "spark_version": "16.1.x-scala2.12",  # Replace with your required version
        "node_type_id": "Standard_DS3_v2",   # Choose the appropriate node type
        "autotermination_minutes": 30,
        "num_workers": 2
    }

    try:
        # Send POST request to create the cluster
        response = requests.post(api_url, headers=headers, json=cluster_config)

        if response.status_code == 200:
            print("Cluster created successfully!")
            print("Cluster Details:", response.json())
        else:
            print(f"Failed to create cluster. HTTP Status Code: {response.status_code}")
            print("Error Response:", response.text)

    except requests.exceptions.RequestException as e:
        print("An error occurred while connecting to Databricks API:")
        print(e)




In [36]:
access_token = databricks_token
workspaceurl = "https://"+databricks_url
#"https://"+databricks_url
create_databricks_cluster(workspaceurl, access_token)



Cluster created successfully!
Cluster Details: {'cluster_id': '0204-170211-dcovtoor'}


In [37]:
import mongomock
import pandas as pd
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, desc, split, trim
import snowflake.connector
import json

In [38]:
def create_Mongo_Resources():

    # Initialize mongomock client to simulate MongoDB
    mongo_client = mongomock.MongoClient()
    db = mongo_client["population_database"]

    titles_collection = db["peoples"]
    #credits_collection = db["credits"]
    
    # Load Titles data
    titles_df = pd.read_csv("population-vs-price.csv")
    titles_data = titles_df.to_dict("records")
    titles_collection.insert_many(titles_data)

    # Load Credits data
    # credits_df = pd.read_csv("Credits.csv")
    # credits_data = credits_df.to_dict("records")
    # credits_collection.insert_many(credits_data)

      # Verify insertion by printing counts of documents in each collection
    print(f"Titles count: {titles_collection.count_documents({})}")
    #print(f"Credits count: {credits_collection.count_documents({})}")

    return titles_collection

In [39]:
titles_collection = create_Mongo_Resources()

Titles count: 294


In [40]:
def extract_data_from_mongodb(titles_collection):
    titles_df= None
    # code starts here
    titles_df = pd.DataFrame(list(titles_collection.find({}, {"_id": 0})))
    #credits_df = pd.DataFrame(list(credits_collection.find({}, {"_id": 0,"rank":1})))
    # code ends here
    return titles_df

In [41]:
titles_df = extract_data_from_mongodb(titles_collection)
print("Titles Data:")
print(titles_df.head(5))

Titles Data:
   rank           City    State Code  Population  price
0   101     Birmingham  Alabama   AL    212247.0  162.9
1   125     Huntsville  Alabama   AL    188226.0  157.7
2   122         Mobile  Alabama   AL    194675.0  122.5
3   114     Montgomery  Alabama   AL    200481.0  129.0
4    64  Anchorage[19]   Alaska   AK    301010.0    NaN


In [25]:
titles_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 294 entries, 0 to 293
Data columns (total 6 columns):
 #   Column      Non-Null Count  Dtype  
---  ------      --------------  -----  
 0   rank        294 non-null    int64  
 1   City        294 non-null    object 
 2   State       294 non-null    object 
 3   Code        294 non-null    object 
 4   Population  293 non-null    float64
 5   price       109 non-null    float64
dtypes: float64(2), int64(1), object(3)
memory usage: 13.9+ KB


Establishing a Databricks SQL Connection

Sets up connection details using the Databricks workspace URL, HTTP path, and access token.
Establishes a connection to Databricks SQL using the provided credentials.
Returns the connection object for executing SQL queries.
Ensures the connection is closed after use to prevent resource leaks.

In [57]:
import os

def get_databricks_connection():
    """
    Establish and return a Databricks SQL connection.
    Ensure to close the connection after use.

    Returns:
        conn (sql.Connection): Databricks SQL connection object.
    """
    # Set up connection details
    DATABRICKS_SERVER_HOSTNAME = workspaceurl  # Replace with your Databricks workspace URL
    DATABRICKS_HTTP_PATH = databricks_http_path # Replace with your warehouse's HTTP Path
    DATABRICKS_ACCESS_TOKEN = access_token  # Replace with your access token

    # Establish connection
    conn = sql.connect(
        server_hostname=DATABRICKS_SERVER_HOSTNAME,
        http_path=DATABRICKS_HTTP_PATH,
        access_token=DATABRICKS_ACCESS_TOKEN
    )

    return conn



In [58]:
global conn
conn = get_databricks_connection()
conn

<databricks.sql.client.Connection at 0x715cd607f640>


Initializing SparkSession for Local File Reading

Creates and returns a SparkSession instance.
Sets the application name to "DatabricksUpload".
Enables interaction with Spark for data processing and transformations.

In [59]:
# Step 2: Initialize SparkSession for local file reading
def initialize_spark():
    """Returns a Spark session."""
    return SparkSession.builder.appName("DatabricksUpload").getOrCreate()



In [60]:
spark = initialize_spark()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/31 13:30:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Loading CSV File into DataFrame

Reads a CSV file from the specified path using Spark.
Sets header=True to use the first row as column names.
Enables inferSchema=True to automatically detect data types.
Returns the loaded DataFrame for further processing.

In [61]:
# Method 2: Load CSV File into DataFrame
def load_csv(csv_path):
    df = spark.read.csv(csv_path, header=True, inferSchema=True)
    return df


In [62]:
csv_path = "/home/labuser/Desktop/Project/population-vs-price.csv"
df = load_csv(csv_path)

                                                                                

Creating a Table in Databricks SQL

Defines an SQL query to create a population_vs_price table if it does not already exist.
Specifies column names and data types, including rank, city, state, code, population, and price.
Executes the query using a database connection cursor.
Ensures table creation for storing population and price data in Databricks SQL.

In [63]:
def create_table(conn):
    """
    Creates a table in Databricks SQL if it doesn't exist.
    """
    create_query = """
    CREATE TABLE IF NOT EXISTS population_vs_price (
        rank INT,
        city STRING,
        state STRING,
        code STRING,
        population STRING,
        price STRING
    )
    """
    with conn.cursor() as cursor:
        
        result = cursor.execute(create_query)

    return result
        


In [64]:
result = create_table(conn)


Inserting Data from Spark DataFrame into Databricks SQL Table

Converts the Spark DataFrame to a Pandas DataFrame for easier SQL insertion.
Defines an SQL INSERT query to add data into the population_vs_price table.
Iterates over the rows of the Pandas DataFrame, inserting each row into the table.
Executes the query using a database connection cursor.
Returns a success message once the data has been successfully inserted into the table.

In [65]:
def insert_data(conn, df):
    """
    Inserts data from a Spark DataFrame into the Databricks SQL table.
    """
    # Convert Spark DataFrame to Pandas for easy SQL insertion
    pandas_df = df.toPandas()

    insert_query = """
    INSERT INTO population_vs_price (rank, City, State, Code, Population, Price) 
    VALUES (?, ?, ?, ?, ?, ?)
    """
    with conn.cursor() as cursor:
        for _, row in pandas_df.iterrows():
            cursor.execute(insert_query, tuple(row))

    return "Successfully Inserted"


In [66]:
result = insert_data(conn, df)

#Find the top 5 cities with highest median sales price.

Finding the Cities with Highest Sales

Defines an SQL query to fetch the City, State, and price from the population_vs_price table, excluding null prices.
Orders the results by price in descending order to identify the top 5 most expensive cities.
Executes the query using a database connection cursor.
Fetches the query results and returns the list of the top 5 cities with the highest prices.
Ensures the cursor is closed automatically after fetching the results.

In [73]:
def highest_sales(conn):
    """
    Finds the most and least expensive cities based on median sales price.
    """
    query = """
    SELECT City, State, price 
    FROM population_vs_price
    WHERE price IS NOT NULL
    ORDER BY price DESC
    LIMIT 5
    """
    with conn.cursor() as cursor:  # Cursor opens here
        cursor.execute(query)
        highest = cursor.fetchall()  # ✅ Fetch results inside the block

    return highest  # ✅ Return the result after closing the cursor automatically


In [74]:

highest_sales = highest_sales(conn)
print(highest_sales)

[Row(City='Cambridge', State='Massachusetts', price='null'), Row(City='Charleston', State='South Carolina', price='null'), Row(City='Thousand Oaks', State='California', price='null'), Row(City='North Charleston', State='South Carolina', price='null'), Row(City='West Palm Beach', State='Florida', price='null')]



Finding the Cities with Lowest Sales

Defines an SQL query to fetch the City, State, and price from the population_vs_price table, excluding null prices.
Orders the results by price in ascending order to identify the top 5 least expensive cities.
Executes the query using a database connection cursor.
Fetches the query results and returns the list of the top 5 cities with the lowest prices.
Ensures the cursor is automatically closed after fetching the results.

In [81]:
def lowest_sales(conn):
    """
    Finds the most and least expensive cities based on median sales price.
    """
    query = """
    SELECT City, State, "price" 
    FROM population_vs_price
    WHERE "price" IS NOT NULL
    ORDER BY "price" ASC
    LIMIT 5
    """
    with conn.cursor() as cursor:  # Cursor opens here
        cursor.execute(query)
        lowest = cursor.fetchall()  # ✅ Fetch results inside the block


    return lowest

    


In [82]:
lowest_sales = lowest_sales(conn)
print(lowest_sales)

[Row(City='Long Beach', State='California', price='price'), Row(City='Gilbert[20]', State='Arizona', price='price'), Row(City='Arlington', State='Texas', price='price'), Row(City='Cincinnati', State='Ohio', price='price'), Row(City='Aurora', State='Colorado', price='price')]


Performing Correlation Analysis

Defines an SQL query to fetch the Population and price columns from the population_vs_price table, ensuring both are cast to DOUBLE for proper numerical analysis.
Converts the columns to numeric types, coercing any errors into NaN.
Drops any rows with NaN values, ensuring clean data for analysis.
Calculates the correlation matrix between the Population and price columns.
Visualizes the relationship between population and price using a scatter plot.
Returns the correlation matrix after displaying the results.

In [92]:
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

def correlation_analysis(conn):
    """
    Fetches data from Databricks and performs correlation analysis.
    """
    query = """
    SELECT CAST(Population AS DOUBLE) AS population, 
           CAST(price AS DOUBLE) AS price
    FROM population_vs_price
    WHERE price IS NOT NULL AND Population IS NOT NULL
    """
    df = pd.read_sql(query, conn)

    # Ensure numeric conversion (if still needed)
    df["population"] = pd.to_numeric(df["population"], errors="coerce")
    df["price"] = pd.to_numeric(df["price"], errors="coerce")

    # Drop any rows with NaN values after conversion
    df.dropna(inplace=True)

    # Calculate correlation
    correlation = df.corr()
    print("Correlation Matrix:\n", correlation)

    # Visualization
    plt.figure(figsize=(8,6))
    sns.scatterplot(x=df['population'], y=df['price'])
    plt.title("Population vs Median Sales Price")
    plt.xlabel("Population")
    plt.ylabel("Median Sales Price")
    plt.show()

    return correlation


In [93]:
analysis = correlation_analysis(conn)
print(analysis)

  df = pd.read_sql(query, conn)


DatabaseError: Execution failed on sql: 
    SELECT CAST(Population AS DOUBLE) AS population, 
           CAST(price AS DOUBLE) AS price
    FROM population_vs_price
    WHERE price IS NOT NULL AND Population IS NOT NULL
    
[CAST_INVALID_INPUT] The value 'null' of the type "STRING" cannot be cast to "DOUBLE" because it is malformed. Correct the value as per the syntax, or change its target type. Use `try_cast` to tolerate malformed input and return NULL instead. If necessary set "ansi_mode" to "false" to bypass this error. SQLSTATE: 22018
== SQL (line 3, position 12) ==
           CAST(price AS DOUBLE) AS price
           ^^^^^^^^^^^^^^^^^^^^^

unable to rollback

Find average house price for each state.



Analyzing Average Sales Price by State

Defines an SQL query to fetch the State and the average price from the population_vs_price table.
Groups the data by State and calculates the average sales price for each state.
Orders the results by average price in descending order, highlighting the states with the highest average sales prices.
Returns the result as a DataFrame containing the State and avg_price for each state.

In [91]:
def state_price_analysis(conn):
    """
    Finds the average sales price for each state.
    """
    query = """
    SELECT State, AVG("price") AS avg_price 
    FROM population_vs_price
    GROUP BY State
    ORDER BY avg_price DESC
    """
    df = pd.read_sql(query, conn)

    return df

In [90]:
state_analysis = state_price_analysis(conn)
print(state_analysis)


  df = pd.read_sql(query, conn)


DatabaseError: Execution failed on sql: 
    SELECT State, AVG("price") AS avg_price 
    FROM population_vs_price
    GROUP BY State
    ORDER BY avg_price DESC
    
[CAST_INVALID_INPUT] The value 'price' of the type "STRING" cannot be cast to "DOUBLE" because it is malformed. Correct the value as per the syntax, or change its target type. Use `try_cast` to tolerate malformed input and return NULL instead. If necessary set "ansi_mode" to "false" to bypass this error. SQLSTATE: 22018
== SQL (line 5, position 5) ==
    ORDER BY avg_price DESC
    ^^^^^^^^^^^^^^^^^^^^^^^

unable to rollback

In [None]:
# conn.close()
