first we will create datasets
- quotes_table
- data enrichment table
- postcodes table

In [0]:
# Configuration
catalog_name = "lrcatalog"
schema_name = "agentic_underwriting"
user_path = "laurence.ryszka@databricks.com"

# Create catalog and schema
spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog_name}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog_name}.{schema_name}")


In [0]:
#drop tables

catalog_name = "lrcatalog"
schema_name = "agentic_underwriting"

# List of all known tables to drop
tables_to_drop = [
    "quotes",
    "claims_disclosure_validation",
    "vehicle_risk_attributes",
    "driver_risk_profile",
    "property_attributes",
    "agent_review",
    "sales_call_transcripts"

]

# Drop each table if it exists
for table in tables_to_drop:
    full_name = f"{catalog_name}.{schema_name}.{table}"
    spark.sql(f"DROP TABLE IF EXISTS {full_name}")
    print(f"Dropped: {full_name}")

In [0]:
#Create empty table for agent output
spark.sql(f"""CREATE TABLE IF NOT EXISTS {catalog_name}.{schema_name}.agent_review (
  quote_id STRING,
  agent_output STRING)
""")

In [0]:
# Adjust this to match the actual path in your repo
file_path = "Repo/Users/laurence.ryszka@databricks.com/actuarial-pricing-demo/Agentic-Motor-underwriting/data/ONSPD_MAY_2025_UK_CR.csv"  # relative path from notebook location

# Load CSV using Spark
df = spark.read.option("header", True).csv(file_path)

# Save to Unity Catalog
table_name = "uk_postcodes"
df.write.mode("overwrite").saveAsTable(f"{catalog_name}.{schema_name}.{table_name}")

In [0]:
import random
import pandas as pd
from faker import Faker

faker = Faker("en_GB")  # Use UK-style names

num_quotes = 15116 # don't change this as it must correspond to the number of postcodes in uk_postcodes table

# Get list of real postcodes
postcode_df = spark.table(f"{catalog_name}.{schema_name}.uk_postcodes").select("pcd")
postcode_list = [row.pcd for row in postcode_df.limit(num_quotes).collect()]

# Reference lists
vehicle_types = ["Hatchback", "SUV", "Sedan", "Hot Hatch", "Van"]
storage_options = ["garage", "driveway", "street"]
channels = ["aggregator", "direct"]

# Generate synthetic quote rows
quotes_data = []
for i in range(len(postcode_list)):
    first, last = faker.first_name(), faker.last_name()
    age = random.randint(20, 75)
    vehicle = random.choice(vehicle_types)
    storage = random.choice(storage_options)
    ncd = random.choice([0, 1, 3, 5, 10])
    claims = random.choice([0, 1, 2])
    pcd = postcode_list[i]
    quotes_data.append({
        "quote_id": f"Q{1000+i}",
        "first_name": first,
        "last_name": last,
        "age": age,
        "postcode": pcd,
        "vehicle_type": vehicle,
        "storage_declared": storage,
        "ncd_amount": ncd,
        "claims_amount": claims,
        "channel": random.choice(channels)
    })

# Convert and save to Unity Catalog
quotes_df = pd.DataFrame(quotes_data)
quotes_sdf = spark.createDataFrame(quotes_df)
quotes_sdf.write.mode("overwrite").saveAsTable(f"{catalog_name}.{schema_name}.quotes")

In [0]:
from pyspark.sql.functions import col, when, round as round_col

quotes_table = f"{catalog_name}.{schema_name}.quotes"

# Load table
quotes_df = spark.table(quotes_table)

# Define multipliers as column expression
vehicle_multiplier = when(col("vehicle_type") == "SUV", 1.2) \
    .when(col("vehicle_type") == "Hatchback", 1.0) \
    .when(col("vehicle_type") == "Sports", 1.5) \
    .otherwise(1.1)

# Define age adjustment
age_adjustment = when(col("age") < 25, 200) \
    .when(col("age") < 35, 100) \
    .otherwise(0)

# Define storage adjustment
storage_adjustment = when(col("storage_declared") == "garage", -50) \
    .when(col("storage_declared") == "driveway", 0) \
    .when(col("storage_declared") == "street", 50) \
    .otherwise(25)

# Final price formula with all adjustments and multiplier
base_price_expr = (
    500 +
    (col("claims_amount") * 100) +
    age_adjustment +
    storage_adjustment -
    (col("ncd_amount") * 30)
)

# Apply vehicle multiplier and round
priced_df = quotes_df.withColumn(
    "quote_value",
    round_col(base_price_expr * vehicle_multiplier, 2)
)

# Overwrite the table with updated prices
priced_df.write.option("mergeSchema", "true").mode("overwrite").saveAsTable(quotes_table)

create enrichment tables

In [0]:
import pandas as pd
import random

# Read postcode list
postcode_df = spark.table(f"{catalog_name}.{schema_name}.uk_postcodes").select("pcd")
postcode_list = [row.pcd for row in postcode_df.collect()]
num_postcodes = len(postcode_list)

# Generate driver_risk_profile
ages = list(range(18, 101))  # 18 to 100 inclusive
risk_segments = []

for age in ages:
    if age <= 21:
        risk_segments.append("very_high_risk")
    elif age <= 30:
        risk_segments.append("high_risk")
    elif age <= 60:
        risk_segments.append("medium_risk")
    elif age <= 75:
        risk_segments.append("high_risk")
    else:
        risk_segments.append("very_high_risk")

# Build DataFrame
driver_df = pd.DataFrame({
    "age": ages,
    "risk_segment": risk_segments
})


# Generate claims_disclosure_validation
# Load quotes from Unity Catalog
quotes_df = spark.table("lrcatalog.agentic_underwriting.quotes").toPandas()

# Create aligned claims_disclosure_validation data
claims_data = []
for _, row in quotes_df.iterrows():
    declared_claims = row['claims_amount']
    actual_claims = declared_claims + random.choice([-1, 0, 1])
    claims_data.append({
        "first_name": row["first_name"],
        "last_name": row["last_name"],
        "postcode": row["postcode"],
        "ncd_amount": random.choice([0, 1, 3, 5, 10]),
        "claims_amount": max(0, actual_claims)
    })

# Convert and save to Unity Catalog
claims_df = pd.DataFrame(claims_data)

# Generate vehicle_risk_attributes (static)
vehicle_types = ["Hatchback", "SUV", "Sedan", "Hot Hatch", "Van"]
vehicle_risk_data = []
for v in set(vehicle_types):
    risk_band = random.choice(["low", "medium", "high"])
    is_high_value = v in ["Hot Hatch", "SUV"] and risk_band == "high"
    vehicle_risk_data.append({
        "vehicle_type": v,
        "vehicle_risk_band": risk_band,
        "is_high_value": is_high_value
    })
vehicle_df = pd.DataFrame(vehicle_risk_data)

# Generate property_attributes
property_data = []
risk_levels = ["high", "mid_high", "mid_low", "low"]
num_levels = len(risk_levels)
num_rows = len(postcode_list)

# Assign risk levels descending over rows
risk_per_block = num_rows // num_levels
remaining = num_rows % num_levels

# Build full list of risk levels in order
ordered_risks = []
for i in range(num_levels):
    count = risk_per_block + (1 if i < remaining else 0)
    ordered_risks.extend([risk_levels[i]] * count)

for i, pc in enumerate(postcode_list):
    property_data.append({
        "postcode": pc,
        "has_garage": random.choice([True, False]),
        "has_driveway": random.choice([True, False]),
        "property_risk_level": ordered_risks[i]
    })

property_df = pd.DataFrame(property_data)

# Save all enrichment tables
tables_to_save = {
    "driver_risk_profile": driver_df,
    "claims_disclosure_validation": claims_df,
    "vehicle_risk_attributes": vehicle_df,
    "property_attributes": property_df
}

for table_name, df in tables_to_save.items():
    sdf = spark.createDataFrame(df)
    sdf.write.mode("overwrite").saveAsTable(f"{catalog_name}.{schema_name}.{table_name}")

Add one quote we will base our demo on, must be added to quotes and claims disclosure validation table.

In [0]:
#create call transcript table for sales calls
from pyspark.sql import Row

# Define catalog and schema
table = "sales_call_transcripts"

# Create transcript text
transcript = """Agent: Good morning, you're speaking with Amy from Swift Insurance. How can I help you today?

Customer: Hi, I'd like to get a quote for my car insurance.

Agent: Sure. Can I take your name, please?

Customer: John Doe.

Agent: Thanks, Mr. Doe. Can I confirm your postcode?

Customer: CR3 6JF.

Agent: And the type of vehicle?

Customer: It's an SUV.

Agent: Great. Where is the vehicle usually stored?

Customer: On the driveway.

Agent: Got it. And how many years of no claims discount do you have?

Customer: 6 years.

Agent: And how many claims have you had in the last 5 years?

Customer: Three.

Agent: Thank you. Just a moment while I generate your quote.

...

Agent: The quote I have for you today is £1332.

Customer: Okay, thanks for your help."""

# Create DataFrame
rows = [Row(quote_id="R9999", call_transcript=transcript)]
df = spark.createDataFrame(rows)

# Save to Unity Catalog
df.write.mode("overwrite").saveAsTable(f"{catalog_name}.{schema_name}.{table}")

In [0]:
from pyspark.sql import Row

# Define catalog and schema
table = "sales_call_transcripts"

# Create transcript text for quote R9998
transcript = """Agent: Good afternoon, you're speaking with Olivia from Swift Insurance. How can I help today?

Customer: Hi, I'm looking to get a quote for my car insurance.

Agent: Of course. Can I take your name, please?

Customer: Lauren Fish.

Agent: Thank you, Ms. Fish. What's your postcode?

Customer: CR3 6JF.

Agent: Got it. What type of vehicle is it?

Customer: It's a Hatchback.

Agent: And where is the vehicle usually kept?

Customer: On the driveway.

Agent: Perfect. How many years of no claims discount do you have?

Customer: 5 years.

Agent: And any claims in the past 5 years?

Customer: None.

Agent: Thanks. Let me just generate your quote...

...

Agent: Alright, the quote I have for you is £450.

Customer: That sounds good. Thank you!"""

# Create DataFrame
rows = [Row(quote_id="R9998", call_transcript=transcript)]
df = spark.createDataFrame(rows)

# Save to Unity Catalog
df.write.mode("append").saveAsTable(f"{catalog_name}.{schema_name}.{table}")

In [0]:
from pyspark.sql import Row
# First 'quotes' table
# Define the new quote row
new_quote = [Row(
    quote_id="R9998",
    first_name="Lauren",
    last_name="Fish",
    age=61,
    postcode="CR3 6JF",
    vehicle_type="Hatchback",
    storage_declared="driveway",
    ncd_amount=5,
    claims_amount=0,
    channel="direct",
    quote_value=float(450)
)]

# Convert to DataFrame
new_quote_df = spark.createDataFrame(new_quote)

# Append to existing quotes table
new_quote_df.write.mode("append").saveAsTable(f"{catalog_name}.{schema_name}.quotes")

#now claims_disclosure_validation table
# Define the new quote row
new_val = [Row(
    first_name="Lauren",
    last_name="Fish",
    postcode="CR3 6JF",
    ncd_amount=5,
    claims_amount=0
)]

# Convert to DataFrame
new_val_df = spark.createDataFrame(new_val)

# Append to existing quotes table
new_val_df.write.mode("append").saveAsTable(f"{catalog_name}.{schema_name}.claims_disclosure_validation")

#ensure our property_attributes table always has false for garage (random gen)
# Create a single-row DataFrame with updated values
updated_row = [Row(
    postcode="CR3 6JF",
    has_garage=False,
    has_driveway=True,
    property_risk_level="mid_high"
)]

updated_df = spark.createDataFrame(updated_row)

# Overwrite existing row for the same postcode
(
    updated_df.write
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .option("replaceWhere", "postcode = 'CR3 6JF'")
    .saveAsTable(f"{catalog_name}.{schema_name}.property_attributes"))

In [0]:
from pyspark.sql import Row
# First 'quotes' table
# Define the new quote row
new_quote = [Row(
    quote_id="R9997",
    first_name="James",
    last_name="Bond",
    age=61,
    postcode="CR3 6JE",
    vehicle_type="Hot Hatch",
    storage_declared="garage",
    ncd_amount=2,
    claims_amount=0,
    channel="direct",
    quote_value=float(450)
)]

# Convert to DataFrame
new_quote_df = spark.createDataFrame(new_quote)

# Append to existing quotes table
new_quote_df.write.mode("append").saveAsTable(f"{catalog_name}.{schema_name}.quotes")

#now claims_disclosure_validation table
# Define the new quote row
new_val = [Row(
    first_name="James",
    last_name="Bond",
    postcode="CR3 6JE",
    ncd_amount=2,
    claims_amount=0
)]

# Convert to DataFrame
new_val_df = spark.createDataFrame(new_val)

# Append to existing quotes table
new_val_df.write.mode("append").saveAsTable(f"{catalog_name}.{schema_name}.claims_disclosure_validation")

#ensure our property_attributes table always has false for garage (random gen)
# Create a single-row DataFrame with updated values
updated_row = [Row(
    postcode="CR3 6JE",
    has_garage=False,
    has_driveway=False,
    property_risk_level="mid_high"
)]

updated_df = spark.createDataFrame(updated_row)

# Overwrite existing row for the same postcode
(
    updated_df.write
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .option("replaceWhere", "postcode = 'CR3 6JE'")
    .saveAsTable(f"{catalog_name}.{schema_name}.property_attributes"))

    

In [0]:
from pyspark.sql import Row
# First 'quotes' table
# Define the new quote row
new_quote = [Row(
    quote_id="R9999",
    first_name="John",
    last_name="Doe",
    age=32,
    postcode="CR3 6JF",
    vehicle_type="SUV",
    storage_declared="driveway",
    ncd_amount=3,
    claims_amount=6,
    channel="direct",
    quote_value=float(1332)
)]

# Convert to DataFrame
new_quote_df = spark.createDataFrame(new_quote)

# Append to existing quotes table
new_quote_df.write.mode("append").saveAsTable(f"{catalog_name}.{schema_name}.quotes")

#now claims_disclosure_validation table
# Define the new quote row
new_val = [Row(
    first_name="John",
    last_name="Doe",
    postcode="CR3 6JF",
    ncd_amount=6,
    claims_amount=3
)]

# Convert to DataFrame
new_val_df = spark.createDataFrame(new_val)

# Append to existing quotes table
new_val_df.write.mode("append").saveAsTable(f"{catalog_name}.{schema_name}.claims_disclosure_validation")

scoring for inputs

In [0]:
#to do - add cars table (it is in the volume), assign risk score to each model, then generate quotes using real cars from that table. for now we keep the simple car/van/whatever.