In [0]:
catalog = dbutils.widgets.get("catalog")
schema = dbutils.widgets.get("schema")

In [0]:
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{schema}")
spark.sql(f"USE CATALOG {catalog}")
spark.sql(f"USE SCHEMA {schema}")

DataFrame[]

In [0]:
# Get all tables under the specified catalog and schema
tables_df = spark.sql(f"SHOW TABLES IN {catalog}.{schema}")
table_names = [row['tableName'] for row in tables_df.collect()]

# Delete all records from each table
for table in table_names:
    spark.sql(f"DROP TABLE IF EXISTS {catalog}.{schema}.{table}")

# Display the list of tables that were cleared
display(spark.createDataFrame([(f"{catalog}.{schema}.{t}",) for t in table_names], ["Dropped_Table"]))

Dropped_Table
shared.yash_underwriter_demo.actuarial
shared.yash_underwriter_demo.client_submission
shared.yash_underwriter_demo.quote
shared.yash_underwriter_demo.quote_status
shared.yash_underwriter_demo.risk


In [0]:
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {catalog}.{schema}.actuarial (
  Coverage_Type STRING,
  Base_Rate DOUBLE,
  Adjustment_Factors BIGINT,
  Deductible STRING,
  Reinsurance_Cost BIGINT
)
USING delta
""")

spark.sql(f"""
CREATE TABLE IF NOT EXISTS {catalog}.{schema}.client_submission (
  Client_ID STRING,
  Business_Name STRING,
  Industry STRING,
  Revenue_USD BIGINT,
  Number_of_Employees BIGINT,
  Coverage_Needed STRING,
  Any_Claims_History STRING,
  Location STRING,
  Risk_Level STRING,
  submission_date DATE
)
USING delta
""")

spark.sql(f"""
CREATE TABLE IF NOT EXISTS {catalog}.{schema}.quote (
  Quote_ID STRING,
  Client_ID STRING,
  Coverage_Details STRING,
  Premium BIGINT,
  Coverage_Limits BIGINT,
  Terms_and_Conditions_Summary STRING
)
USING delta
""")

spark.sql(f"""
CREATE TABLE IF NOT EXISTS {catalog}.{schema}.quote_status (
  Quote_ID STRING,
  Client_ID STRING,
  Stage STRING,
  Status STRING,
  Assigned_To STRING,
  Comments STRING
)
USING delta
""")

spark.sql(f"""
CREATE TABLE IF NOT EXISTS {catalog}.{schema}.risk (
  Business_ID STRING,
  NAICS_Code STRING,
  Regulatory_Violations_Count BIGINT,
  Environmental_Risk BIGINT,
  Credit_Score BIGINT,
  Claims_Count_Last_5_Years BIGINT,
  Safety_Rating BIGINT
)
USING delta
""")

DataFrame[]

In [0]:
spark.sql(f"""
INSERT INTO {catalog}.{schema}.actuarial VALUES
('Property', 0.5, 3, '1-5 (0.8x to 1.5x)', 10000),
('Liability', 0.35, 2, '1-5 (0.7x to 1.6x)', 25000),
('Cyber', 1.2, 4, '1-5 (0.9x to 1.8x)', 15000)
""")

spark.sql(f"""
INSERT INTO {catalog}.{schema}.client_submission VALUES
('C001', 'Acme Corp', 'Manufacturing', 1500000, 50, 'Property', 'Yes', 'Chicago, IL', '3', DATE '2025-01-03'),
('C002', 'Green Valley LLC', 'Agriculture', 800000, 15, 'Liability', 'No', 'Fresno, CA', '2', DATE '2025-01-05'),
('C003', 'Urban Eats Inc.', 'Hospitality', 2200000, 120, 'Cyber', 'Yes', 'Austin, TX', '4', DATE '2025-01-07'),
('C004', 'Tech Innovations', 'Technology', 3000000, 200, 'Liability', 'Yes', 'San Francisco, CA', '5', DATE '2025-01-10'),
('C005', 'Healthy Foods', 'Food & Beverage', 500000, 25, 'Property', 'No', 'Portland, OR', '2', DATE '2025-01-12'),
('C006', 'Quick Logistics', 'Transportation', 1000000, 75, 'Liability', 'Yes', 'Los Angeles, CA', '3', DATE '2025-01-14'),
('C007', 'NextGen Electronics', 'Electronics', 4500000, 500, 'Cyber', 'No', 'Dallas, TX', '4', DATE '2025-01-16'),
('C008', 'Eco Solutions', 'Energy', 2200000, 80, 'Liability', 'Yes', 'Denver, CO', '3', DATE '2025-01-18'),
('C009', 'Urban Designs', 'Construction', 3500000, 250, 'Property', 'No', 'New York, NY', '5', DATE '2025-01-20'),
('C010', 'Luxe Retailers', 'Retail', 1700000, 40, 'Cyber', 'Yes', 'Miami, FL', '2', DATE '2025-01-22'),
('C011', 'Sunshine Farms', 'Agriculture', 1200000, 60, 'Liability', 'Yes', 'Sacramento, CA', '3', DATE '2025-01-21'),
('C012', 'Global Enterprises', 'Manufacturing', 8000000, 1000, 'Property', 'No', 'Seattle, WA', '5', DATE '2025-01-06'),
('C013', 'Future Health', 'Healthcare', 1500000, 80, 'Cyber', 'Yes', 'Boston, MA', '4', DATE '2025-01-08'),
('C014', 'Fast Movers', 'Transportation', 2000000, 150, 'Property', 'No', 'Houston, TX', '2', DATE '2025-01-03'),
('C015', 'SmartTech', 'Technology', 3500000, 250, 'Property', 'Yes', 'Austin, TX', '4', DATE '2025-01-01'),
('C016', 'Dream Builders', 'Construction', 4200000, 300, 'Liability', 'Yes', 'Chicago, IL', '5', DATE '2025-01-03'),
('C017', 'Blue Skies', 'Retail', 2500000, 90, 'Liability', 'No', 'Phoenix, AZ', '3', DATE '2025-01-05'),
('C018', 'Fresh Greens', 'Food & Beverage', 1000000, 40, 'Property', 'Yes', 'Atlanta, GA', '2', DATE '2025-01-07'),
('C019', 'Innovative Solutions', 'Technology', 3800000, 300, 'Cyber', 'No', 'San Diego, CA', '4', DATE '2025-01-09'),
('C020', 'Red Mountain Energy', 'Energy', 5000000, 200, 'Property', 'Yes', 'Salt Lake City, UT', '5', DATE '2025-01-11'),
('C021', 'Speedy Logistics', 'Transportation', 1200000, 60, 'Cyber', 'No', 'Orlando, FL', '3', DATE '2025-01-13'),
('C022', 'Bright Future', 'Healthcare', 1500000, 85, 'Property', 'Yes', 'Chicago, IL', '2', DATE '2025-01-15'),
('C023', 'Precision Design', 'Construction', 2000000, 100, 'Liability', 'No', 'San Antonio, TX', '4', DATE '2025-01-17')
""")

spark.sql(f"""
INSERT INTO {catalog}.{schema}.risk VALUES
('C001', '331110', 2, 3, 700, 3, 6),
('C002', '111199', 0, 2, 800, 0, 8),
('C003', '722110', 4, 4, 650, 5, 5),
('C004', '331110', 2, 3, 700, 3, 6),
('C005', '111199', 0, 2, 800, 0, 8),
('C006', '722110', 4, 4, 650, 5, 5),
('C007', '331112', 2, 3, 700, 3, 6),
('C008', '331115', 4, 3, 700, 3, 6),
('C009', '331117', 1, 3, 700, 3, 6),
('C010', '331119', 2, 2, 700, 3, 6),
('C011', '321110', 3, 3, 700, 3, 6),
('C012', '331110', 2, 2, 700, 3, 6),
('C013', '341110', 2, 3, 700, 5, 6),
('C014', '351110', 4, 3, 700, 5, 6),
('C015', '361110', 2, 1, 700, 3, 6),
('C016', '371110', 2, 3, 700, 3, 6),
('C017', '381110', 2, 3, 700, 4, 6),
('C018', '391110', 4, 3, 700, 3, 6),
('C019', '331120', 2, 2, 700, 3, 6),
('C020', '331130', 2, 3, 700, 4, 6),
('C021', '331140', 4, 3, 700, 3, 6),
('C022', '331150', 2, 4, 700, 2, 6),
('C023', '331160', 2, 3, 700, 3, 6)
""")

spark.sql(f"""
INSERT INTO {catalog}.{schema}.quote VALUES
('Q001', 'C001', 'Property: $1M, Liability: $500K', 12500, 1500000, 'Standard exclusions apply'),
('Q002', 'C002', 'Liability: $1M', 7000, 1000000, 'Deductible of $25K applies'),
('Q003', 'C003', 'Cyber: $1M, Property: $2M', 24800, 3000000, 'Higher premium due to claims history'),
('Q004', 'C004', 'Liability: $2M, Cyber: $1M', 31000, 3000000, 'Includes data breach coverage; high-risk tech exposure'),
('Q005', 'C005', 'Property: $500K', 6500, 500000, 'Small operation, standard deductible'),
('Q006', 'C006', 'Liability: $1.5M, Auto: $500K', 17500, 2000000, 'Moderate exposure due to logistics operations'),
('Q007', 'C007', 'Cyber: $3M', 42000, 3000000, 'Excellent credit score; reduced reinsurance cost'),
('Q008', 'C008', 'Liability: $2.5M', 26000, 2500000, 'Energy sector exposure with regulatory oversight'),
('Q009', 'C009', 'Property: $4M, Liability: $2M', 55000, 6000000, 'Construction project with multiple subcontractors'),
('Q010', 'C010', 'Cyber: $1.5M', 15500, 1500000, 'Retail POS exposure; data protection coverage'),
('Q011', 'C011', 'Liability: $1.5M, Property: $500K', 23000, 2000000, 'Agriculture operations; moderate equipment risk'),
('Q012', 'C012', 'Property: $5M', 48000, 5000000, 'High-value manufacturing facility; risk mitigation applied'),
('Q013', 'C013', 'Cyber: $2M, Liability: $1M', 27000, 3000000, 'Healthcare data security and patient record coverage'),
('Q014', 'C014', 'Property: $2.5M, Auto: $1M', 29500, 3500000, 'Fleet exposure; physical damage deductible applies'),
('Q015', 'C015', 'Property: $3M, Cyber: $1M', 36000, 4000000, 'Tech manufacturer; property and network coverage bundled'),
('Q016', 'C016', 'Liability: $3.5M', 39000, 3500000, 'High construction exposure; claims history considered'),
('Q017', 'C017', 'Liability: $2.5M', 31000, 2500000, 'Retail exposure moderate; consistent risk controls in place'),
('Q018', 'C018', 'Property: $1M', 12000, 1000000, 'Food & beverage; perishable goods coverage'),
('Q019', 'C019', 'Cyber: $2M', 27000, 2000000, 'Tech firm with low loss ratio; adjusted rate applied'),
('Q020', 'C020', 'Property: $5M', 52000, 5000000, 'Energy sector with equipment coverage extension'),
('Q021', 'C021', 'Cyber: $1.2M', 19000, 1200000, 'Transportation data systems; modest exposure'),
('Q022', 'C022', 'Property: $2M', 25000, 2000000, 'Healthcare clinic; facility risk mitigated'),
('Q023', 'C023', 'Liability: $2M, Property: $1M', 29000, 3000000, 'Construction design liability; reviewed by compliance')
""")

spark.sql(f"""
INSERT INTO {catalog}.{schema}.quote_status VALUES
('Q001', 'C001', 'Risk Assessment', 'Completed', 'John_Doe', 'Moderate risk due to location'),
('Q002', 'C002', 'Quote Generation', 'In Progress', 'Jane_Smith', 'Low risk; finalizing premium'),
('Q003', 'C003', 'Submission Review', 'Completed', 'Alex_Taylor', 'High claims frequency noted'),
('Q004', 'C004', 'Underwriting Review', 'In Progress', 'Sarah_Lee', 'Awaiting cyber risk assessment results'),
('Q005', 'C005', 'Quote Generation', 'Completed', 'John_Doe', 'Low exposure; auto-approved quote'),
('Q006', 'C006', 'Underwriting Review', 'In Progress', 'Jane_Smith', 'Fleet size increases risk; pending actuarial input'),
('Q007', 'C007', 'Quote Generation', 'Completed', 'Alex_Taylor', 'Cyber coverage approved; strong controls in place'),
('Q008', 'C008', 'Risk Assessment', 'Completed', 'Michael_Ross', 'Moderate environmental risk in energy sector'),
('Q009', 'C009', 'Submission Review', 'Completed', 'Priya_Kapoor', 'Construction exposure high; reinsurer involvement required'),
('Q010', 'C010', 'Quote Generation', 'In Progress', 'Sarah_Lee', 'Pending POS security audit results'),
('Q011', 'C011', 'Risk Assessment', 'Completed', 'Jane_Smith', 'Farm machinery adds moderate liability risk'),
('Q012', 'C012', 'Final Approval', 'Completed', 'John_Doe', 'Approved for renewal; stable manufacturing operations'),
('Q013', 'C013', 'Underwriting Review', 'In Progress', 'Alex_Taylor', 'Awaiting HIPAA compliance certification'),
('Q014', 'C014', 'Quote Generation', 'Completed', 'Michael_Ross', 'Transportation and property bundled quote approved'),
('Q015', 'C015', 'Final Approval', 'Completed', 'Priya_Kapoor', 'Strong controls; premium adjusted downward'),
('Q016', 'C016', 'Underwriting Review', 'In Progress', 'Sarah_Lee', 'Construction project details under review'),
('Q017', 'C017', 'Quote Generation', 'Completed', 'John_Doe', 'Retail liability exposure moderate; quote issued'),
('Q018', 'C018', 'Submission Review', 'Completed', 'Jane_Smith', 'Food spoilage coverage added per request'),
('Q019', 'C019', 'Final Approval', 'Completed', 'Alex_Taylor', 'Approved under standard cyber program'),
('Q020', 'C020', 'Underwriting Review', 'In Progress', 'Michael_Ross', 'Awaiting reinsurer feedback on energy exposure'),
('Q021', 'C021', 'Quote Generation', 'Completed', 'Priya_Kapoor', 'Transportation exposure within tolerance'),
('Q022', 'C022', 'Risk Assessment', 'Completed', 'Sarah_Lee', 'Healthcare property exposure rated low'),
('Q023', 'C023', 'Final Approval', 'Completed', 'John_Doe', 'Liability and property combination approved')
""")

DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [0]:
spark.sql(f"""
CREATE OR REPLACE FUNCTION {catalog}.{schema}.get_quote(client STRING)
RETURNS TABLE (
    Quote_ID STRING,
    Client_ID STRING,
    Coverage_Details STRING,
    Premium INT,
    Coverage_Limits STRING,
    Terms_and_Conditions_Summary STRING
)
RETURN
SELECT q.*
FROM {catalog}.{schema}.quote q
JOIN {catalog}.{schema}.client_submission c
  ON q.Client_ID = c.Client_ID
WHERE c.Business_Name LIKE concat('%', client, '%');
""")

DataFrame[]

In [0]:
display(spark.sql(f"""
    SELECT q.* FROM {catalog}.{schema}.get_quote('Acme Corp') q
"""))

Quote_ID,Client_ID,Coverage_Details,Premium,Coverage_Limits,Terms_and_Conditions_Summary
Q001,C001,"Property: $1M, Liability: $500K",12500,1500000,Standard exclusions apply


In [0]:
spark.sql(f"""
CREATE OR REPLACE FUNCTION {catalog}.{schema}.get_quote_status(client STRING)
RETURNS TABLE (
    Quote_ID STRING,
    Client_ID STRING,
    Stage STRING,
    Status STRING,
    Assigned_To STRING,
    Comments STRING
)
RETURN
SELECT q.*
FROM {catalog}.{schema}.quote_status q
JOIN {catalog}.{schema}.client_submission c
  ON q.Client_ID = c.Client_ID
WHERE c.Business_Name LIKE concat('%', client, '%');
""")

DataFrame[]

In [0]:
display(spark.sql(f"""
    SELECT q.* FROM {catalog}.{schema}.get_quote_status('Acme Corp') q
"""))

Quote_ID,Client_ID,Stage,Status,Assigned_To,Comments
Q001,C001,Risk Assessment,Completed,John_Doe,Moderate risk due to location


In [0]:
spark.sql(f"""
CREATE OR REPLACE FUNCTION {catalog}.{schema}.get_risk_score(client STRING)
RETURNS TABLE (
    Business_ID STRING,
    NAICS_Code STRING,
    Regulatory_Violations_Count INT,
    Environmental_Risk INT,
    Credit_Score INT,
    Claims_Count_Last_5_Years INT,
    Safety_Rating INT
)
RETURN
SELECT q.*
FROM {catalog}.{schema}.risk q
JOIN {catalog}.{schema}.client_submission c
  ON q.Business_ID = c.Client_ID
WHERE c.Business_Name LIKE concat('%', client, '%');
""")

DataFrame[]

In [0]:
display(spark.sql(f"""
    SELECT * FROM {catalog}.{schema}.get_risk_score('Acme Corp')
"""))

Business_ID,NAICS_Code,Regulatory_Violations_Count,Environmental_Risk,Credit_Score,Claims_Count_Last_5_Years,Safety_Rating
C001,331110,2,3,700,3,6


### 🚀 How to Use in Databricks Genie

Here are example Genie prompts for your underwriting demo:

**Prompt	Genie’s Possible Response**

“Explain why this client’s premium is $950.”	“80% of the increase is due to smoking status (+50%), 15% from above-average risk score, and 5% from age.”

“Show top 10 applicants where smoking had the largest impact.”	Genie lists those with highest Premium_from_Smoking.

“Compare average explained premiums between genders.”	Genie produces an aggregated view.

“Which factor contributes most to premium variance across all applicants?”	Genie visualizes variance drivers across Age, Smoking, Risk.

**Then you can ask Genie things like**:

“Compare average premium factors by pricing tier.”
“Which group has the highest smoking-related premium impact?”

