## create UC schema

In [0]:
%sql
CREATE CATALOG IF NOT EXISTS prod_silver;
USE CATALOG prod_silver;
CREATE SCHEMA IF NOT EXISTS dts_ops;
USE SCHEMA dts_ops;

In [0]:
import pandas as pd
from pyspark.sql.types import *

# Read CSV file using pandas
csv_path = "../incidents.csv"
df_pandas = pd.read_csv(csv_path)

# Display basic info about the dataframe
print(f"CSV Shape: {df_pandas.shape}")
print(f"Columns: {list(df_pandas.columns)}")
print("\nFirst few rows:")
print(df_pandas.head())

# Convert pandas DataFrame to Spark DataFrame
df_spark = spark.createDataFrame(df_pandas)

# Display Spark DataFrame schema
print("\nSpark DataFrame Schema:")
df_spark.printSchema()

# Create a temporary view for SQL operations
df_spark.createOrReplaceTempView("temp_incidents")

CSV Shape: (50, 93)
Columns: ['active', 'activity_due', 'additional_assignee_list', 'approval', 'approval_history', 'approval_set', 'assigned_to', 'assignment_group', 'business_duration', 'business_service', 'calendar_duration', 'closed_at', 'closed_by', 'close_notes', 'cmdb_ci', 'cmdb_ci_business_app', 'comments', 'comments_and_work_notes', 'company', 'contact_type', 'contract', 'correlation_display', 'correlation_id', 'delivery_plan', 'delivery_task', 'description', 'due_date', 'edw_load_dts', 'edp_load_dts', 'escalation', 'expected_start', 'follow_up', 'group_list', 'impact', 'knowledge', 'location', 'made_sla', 'number', 'opened_at', 'opened_by', 'order', 'parent', 'priority', 'reassignment_count', 'route_reason', 'service_offering', 'short_description', 'skills', 'sla_due', 'sn_esign_document', 'sn_esign_esignature_configuration', 'sn_hr_le_activity', 'state', 'sys_class_name', 'sys_created_by', 'sys_created_on', 'sys_domain', 'sys_domain_path', 'sys_id', 'sys_mod_count', 'sys_tag

In [0]:
# Define the table name
catalog = "prod_silver"
schema = "dts_ops"
table_name = "servicehub_task_displayvalue"
full_table_name = f"{catalog}.{schema}.{table_name}"

# Generate CREATE TABLE statement based on the DataFrame schema
# Get column definitions from the Spark DataFrame
columns_with_types = []
for field in df_spark.schema.fields:
    # Map Spark types to SQL types
    spark_type = field.dataType
    
    if isinstance(spark_type, StringType):
        sql_type = "STRING"
    elif isinstance(spark_type, IntegerType):
        sql_type = "INT"
    elif isinstance(spark_type, LongType):
        sql_type = "BIGINT"
    elif isinstance(spark_type, DoubleType):
        sql_type = "DOUBLE"
    elif isinstance(spark_type, FloatType):
        sql_type = "FLOAT"
    elif isinstance(spark_type, BooleanType):
        sql_type = "BOOLEAN"
    elif isinstance(spark_type, DateType):
        sql_type = "DATE"
    elif isinstance(spark_type, TimestampType):
        sql_type = "TIMESTAMP"
    elif isinstance(spark_type, DecimalType):
        sql_type = f"DECIMAL({spark_type.precision},{spark_type.scale})"
    else:
        sql_type = "STRING"  # Default to STRING for unknown types
    
    # Handle column names with spaces or special characters
    column_name = f"`{field.name}`" if ' ' in field.name or '-' in field.name else field.name
    columns_with_types.append(f"{column_name} {sql_type}")

# Create the column definition string
columns_definition = ",\n    ".join(columns_with_types)

# Create table if not exists
create_table_sql = f"""
CREATE TABLE IF NOT EXISTS {full_table_name} (
    {columns_definition}
)
USING DELTA
COMMENT 'ServiceHub task display value data loaded from CSV'
TBLPROPERTIES (
    'delta.autoOptimize.optimizeWrite' = 'true',
    'delta.autoOptimize.autoCompact' = 'true'
)
"""

In [0]:
print("\n" + "="*50)
print("CREATE TABLE Statement:")
print("="*50)
print(create_table_sql)

# Execute CREATE TABLE
try:
    spark.sql(create_table_sql)
    print(f"\n✓ Table {full_table_name} created successfully (or already exists)")
except Exception as e:
    print(f"\n✗ Error creating table: {str(e)}")
    raise


CREATE TABLE Statement:

CREATE TABLE IF NOT EXISTS prod_silver.dts_ops.servicehub_task_displayvalue (
    active BOOLEAN,
    activity_due STRING,
    additional_assignee_list DOUBLE,
    approval STRING,
    approval_history DOUBLE,
    approval_set DOUBLE,
    assigned_to STRING,
    assignment_group STRING,
    business_duration STRING,
    business_service STRING,
    calendar_duration STRING,
    closed_at STRING,
    closed_by STRING,
    close_notes STRING,
    cmdb_ci STRING,
    cmdb_ci_business_app DOUBLE,
    comments STRING,
    comments_and_work_notes STRING,
    company STRING,
    contact_type STRING,
    contract DOUBLE,
    correlation_display STRING,
    correlation_id STRING,
    delivery_plan DOUBLE,
    delivery_task DOUBLE,
    description STRING,
    due_date DOUBLE,
    edw_load_dts STRING,
    edp_load_dts STRING,
    escalation STRING,
    expected_start DOUBLE,
    follow_up DOUBLE,
    group_list DOUBLE,
    impact STRING,
    knowledge BOOLEAN,
    locati

## load data

In [0]:
insert_sql = f"""
INSERT OVERWRITE {full_table_name}
SELECT * FROM temp_incidents
"""

print("\n" + "="*50)
print("INSERT Statement (OVERWRITE):")
print("="*50)
print(insert_sql)

try:
    result = spark.sql(insert_sql)
    print(f"\n✓ Data loaded successfully into {full_table_name}")
except Exception as e:
    print(f"\n✗ Error loading data: {str(e)}")
    raise

# Verify the data load
count_sql = f"SELECT COUNT(*) as row_count FROM {full_table_name}"
result = spark.sql(count_sql)
row_count = result.collect()[0]['row_count']
print(f"\n✓ Verification: {row_count} rows loaded into the table")

# Show sample data from the loaded table
print("\n" + "="*50)
print("Sample data from loaded table:")
print("="*50)
sample_sql = f"SELECT * FROM {full_table_name} LIMIT 5"
spark.sql(sample_sql).show(truncate=False)


INSERT Statement (OVERWRITE):

INSERT OVERWRITE prod_silver.dts_ops.servicehub_task_displayvalue
SELECT * FROM temp_incidents


✓ Data loaded successfully into prod_silver.dts_ops.servicehub_task_displayvalue

✓ Verification: 50 rows loaded into the table

Sample data from loaded table:
+------+------------+------------------------+-----------------+----------------+------------+---------------------+----------------------------------+-----------------+---------------------------------------+-----------------+-------------------+---------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------+--------------------+--------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------

## generate function in UC

In [0]:
# Define function parameters
catalog = "prod_silver"
schema = "dts_ops"
function_name = "get_incident_info"
full_function_name = f"{catalog}.{schema}.{function_name}"

# First, drop the function if it exists (optional - uncomment if needed)
drop_function_sql = f"""
DROP FUNCTION IF EXISTS {full_function_name}
"""

# Uncomment the following lines if you want to drop and recreate the function
# print("Dropping existing function if it exists...")
# try:
#     spark.sql(drop_function_sql)
#     print(f"✓ Function {full_function_name} dropped (if existed)")
# except Exception as e:
#     print(f"Note: {str(e)}")

# Create the SQL function
create_function_sql = f"""
CREATE OR REPLACE FUNCTION {full_function_name}(incident_number STRING)
RETURNS STRING
LANGUAGE SQL
DETERMINISTIC
COMMENT 'Returns formatted incident information for a given incident number'
RETURN (
    SELECT 
        MAX(
            COALESCE(
                CONCAT(
                    'Opened By:\\n', opened_by,
                    '\\n\\nOpened At:\\n', opened_at,
                    '\\n\\nShort Description:\\n', short_description,
                    '\\n\\nDescription:\\n', description,
                    '\\n\\nEscalation:\\n', escalation,
                    '\\n\\nImpact:\\n', impact,
                    '\\n\\nLocation:\\n', location,
                    '\\n\\nPriority:\\n', priority
                ),
                ''
            )
        ) AS incident_info
    FROM 
        {catalog}.{schema}.servicehub_task_displayvalue
    WHERE 
        sys_class_name = 'Incident'
        AND number = incident_number
)
"""

print("="*60)
print("Creating Unity Catalog SQL Function")
print("="*60)
print(f"Function Name: {full_function_name}")
print("="*60)
print("SQL Statement:")
print(create_function_sql)
print("="*60)

# Execute the CREATE FUNCTION statement
try:
    spark.sql(create_function_sql)
    print(f"\n✓ Function {full_function_name} created successfully!")
except Exception as e:
    print(f"\n✗ Error creating function: {str(e)}")
    raise

Creating Unity Catalog SQL Function
Function Name: prod_silver.dts_ops.get_incident_info
SQL Statement:

CREATE OR REPLACE FUNCTION prod_silver.dts_ops.get_incident_info(incident_number STRING)
RETURNS STRING
LANGUAGE SQL
DETERMINISTIC
COMMENT 'Returns formatted incident information for a given incident number'
RETURN (
    SELECT 
        MAX(
            COALESCE(
                CONCAT(
                    'Opened By:\n', opened_by,
                    '\n\nOpened At:\n', opened_at,
                    '\n\nShort Description:\n', short_description,
                    '\n\nDescription:\n', description,
                    '\n\nEscalation:\n', escalation,
                    '\n\nImpact:\n', impact,
                    '\n\nLocation:\n', location,
                    '\n\nPriority:\n', priority
                ),
                ''
            )
        ) AS incident_info
    FROM 
        prod_silver.dts_ops.servicehub_task_displayvalue
    WHERE 
        sys_class_name = 'Incident'