# PipelineIQ Incremental Processing

This notebook processes new and updated use cases incrementally using watermark-based loading.

**Pipeline Flow:**
1. Load only new/changed records from source (based on `piq_last_updated` watermark)
2. Apply AI enrichments sequentially
3. MERGE results into target table

**Design: Minimized Column Storage**
- The `pipelineiq` table stores only essential columns:
  - `usecase_id` (primary key)
  - `last_modified_date` (from source, to detect updates)
  - `piq_last_updated` (watermark, set to CURRENT_DATE() when processed)
  - `competition_string` (needed for AI inference)
  - All AI-generated fields (context, confidence scores, recommendations, etc.)
- For reporting, join back to `main.gtm_data.core_usecase_curated` on `usecase_id` to get source fields like `usecase_name`, `account_name`, `stage`, etc.
- This minimizes storage and focuses the pipeline on AI enrichment only


In [0]:
CONFIG = {
    # Filter Criteria
    "min_monthly_dbus": 1000,
    "excluded_stages": ['U1', 'Lost', 'Closed', 'Disqualified'],
    "target_live_date_months_ahead": 6,
    "last_modified_months_back": -6,
    
    # AI Model Settings
    "ai_model": "databricks-gpt-oss-20b",
    "temperature": 0.0,
    "top_p": 0.1,
    
    # Token Limits
    "max_tokens_summarize": 100,
    "max_tokens_oneliner": 50,
    "max_tokens_confidence": 200,
    "max_tokens_classification": 20,
    "max_tokens_recommendation": 200,
    
    # Confidence Thresholds
    "confidence_high_threshold": 75,
    "confidence_medium_threshold": 45,
    
    # MEDDPICC Weights
    "meddpicc_weights": {
        "pain": 0.25,
        "champion": 0.20,
        "implementation_plan": 0.20,
        "decision_process": 0.10,
        "urgency": 0.10,
        "competition_awareness": 0.05,
        "measurable_impact": 0.02,
        "major_blockers": -0.08
    }
}

# Use Case Type Categories
USECASE_TYPE_CATEGORIES = [
    'Machine Learning', 'Generative AI', 'Data Warehousing', 
    'Migration AI', 'Migration Unity Catalog', 'Migration DWH', 
    'Migration ETL', 'Migration Streaming', 'Migration BI',
    'BI', 'ETL', 'Governance', 'Streaming', 'Ingestion', 'Platform', 'Other'
]

# =============================================================================
# WIDGETS FOR PARAMETERIZATION
# =============================================================================

# Create widgets for catalog and schema
dbutils.widgets.text("catalog", "users", "Target Catalog")
dbutils.widgets.text("schema", "sam_lecorre", "Target Schema")

# Get widget values
TARGET_CATALOG = dbutils.widgets.get("catalog")
TARGET_SCHEMA = dbutils.widgets.get("schema")
TARGET_TABLE = f"{TARGET_CATALOG}.{TARGET_SCHEMA}.pipelineiq"
SOURCE_TABLE = "main.gtm_data.core_usecase_curated"

print(f"Target Catalog: {TARGET_CATALOG}")
print(f"Target Schema: {TARGET_SCHEMA}")
print(f"Target Table: {TARGET_TABLE}")
print(f"Source Table: {SOURCE_TABLE}")

# =============================================================================
# SETUP & VALIDATION
# =============================================================================

# Set Spark configurations for performance
spark.conf.set("spark.sql.shuffle.partitions", "64")
spark.conf.set("spark.databricks.execution.timeout", "14400")

# Check if target table exists
table_exists = spark.catalog.tableExists(TARGET_TABLE)

if not table_exists:
    print(f"‚ö†Ô∏è  Target table {TARGET_TABLE} does not exist. Creating empty table...")
    
    # Create empty target table with schema from source
    # We'll add the AI enrichment columns in the first merge
    spark.sql(f"""
        CREATE TABLE IF NOT EXISTS {TARGET_TABLE}
        (
        -- Core identifiers and timestamps
        usecase_id STRING COMMENT 'Primary key - use case identifier from source system',
        last_modified_date DATE COMMENT 'Last modified date from source system (used to detect updates)',
        piq_last_updated DATE COMMENT 'Watermark column - date when this record was last processed by PipelineIQ',
        
        -- Fields needed for AI inference
        competition_string STRING COMMENT 'Competition information from source (used for AI inference)',
        num_of_blockers BIGINT COMMENT 'Number of blockers from source',
        
        -- AI enriched fields
        context STRING COMMENT 'AI-generated summary of BDR context and business impact for the use case',
        one_liner STRING COMMENT 'AI-generated concise summary of the use case',
        next_steps_summary STRING COMMENT 'AI-generated summary of next steps for the use case',
        implementation_notes_summary STRING COMMENT 'AI-generated summary of implementation notes',
        blockers_summary STRING COMMENT 'AI-generated summary of blockers',
        usecase_type STRING COMMENT 'AI-classified technical use case category',
        business_usecase_type STRING COMMENT 'AI-generated business use case label (max 5 words)',
        soundbyte STRING COMMENT 'AI-generated executive elevator pitch connecting technical solution to business outcome',
        
        -- Confidence scoring
        ai_confidence_score_advanced STRING COMMENT 'Raw AI-generated MEDDPICC confidence assessment output',
        confidence_score INT COMMENT 'AI-extracted confidence score (0-100) for successful go-live',
        confidence_level STRING COMMENT 'AI-extracted confidence level (High, Medium, Low)',
        rationale_for_confidence_score STRING COMMENT 'AI-extracted rationale for confidence score',
        pain_score INT COMMENT 'AI-extracted MEDDPICC Pain dimension score (0-10)',
        champion_score INT COMMENT 'AI-extracted MEDDPICC Champion dimension score (0-10)',
        implementationplan_score INT COMMENT 'AI-extracted MEDDPICC Implementation Plan dimension score (0-10)',
        decisionprocess_score INT COMMENT 'AI-extracted MEDDPICC Decision Process dimension score (0-10)',
        urgency_score INT COMMENT 'AI-extracted MEDDPICC Urgency dimension score (0-10)',
        competitionawareness_score INT COMMENT 'AI-extracted MEDDPICC Competition Awareness dimension score (0-10)',
        majorblockers_score INT COMMENT 'AI-extracted MEDDPICC Major Blockers dimension score (0-10)',
        measurableimpact_score INT COMMENT 'AI-extracted MEDDPICC Measurable Impact dimension score (0-10)',
        confidence_level_normalized STRING COMMENT 'Normalized confidence level (High, Medium, Low, Not computed)',
        
        -- Recommendations and classifications
        next_best_action_recommendation STRING COMMENT 'AI-generated next best action recommendation for account team',
        likely_to_slip BOOLEAN COMMENT 'Flag if use case is likely to slip (true/false)',
        slippage_category STRING COMMENT 'AI-classified primary reason for slippage',
        can_be_accelerated BOOLEAN COMMENT 'Flag if use case can be accelerated (true/false)',
        accel_category STRING COMMENT 'AI-classified best acceleration lever'
        )
        USING DELTA
        COMMENT 'PipelineIQ AI-enriched use case insights with minimized storage footprint'
        TBLPROPERTIES (
        'delta.enableChangeDataFeed' = 'true'
        )
    """)
    
    print(f"‚úÖ Created empty table {TARGET_TABLE}")
    watermark_date = "1900-01-01"
else:
    print(f"‚úÖ Target table {TARGET_TABLE} exists")
    
    # Query watermark - use piq_last_updated as the watermark column
    watermark_result = spark.sql(f"""
        SELECT COALESCE(MAX(piq_last_updated), '1900-01-01') AS max_date
        FROM {TARGET_TABLE}
    """).first()
    
    watermark_date = str(watermark_result['max_date'])
    print(f"üìÖ Watermark date (piq_last_updated): {watermark_date}")

# Store watermark as temp variable for SQL
spark.sql(f"""DECLARE OR REPLACE VARIABLE watermark_date STRING DEFAULT '{watermark_date}'""")

# Store config values as SQL variables
spark.sql(f"""DECLARE OR REPLACE VARIABLE min_dbus INT DEFAULT {CONFIG['min_monthly_dbus']}""")
spark.sql(f"""DECLARE OR REPLACE VARIABLE target_months INT DEFAULT {CONFIG['target_live_date_months_ahead']}""")
spark.sql(f"""DECLARE OR REPLACE VARIABLE last_modified_months INT DEFAULT {CONFIG['last_modified_months_back']}""")

print(f"\n‚úÖ Setup complete. Ready to process incremental data.")


In [0]:
%sql
-- Load and filter new/updated use cases based on watermark
-- Only load records where source last_modified_date > our piq_last_updated watermark
CREATE OR REPLACE TEMPORARY VIEW filtered_usecases AS
SELECT 
  usecase_id,
  last_modified_date,
  
  -- Fields needed for AI inference
  sdr_bdr_notes,
  business_impact,
  business_outcome_description,
  ae_authority,
  ae_money_budget,
  usecase_name,
  usecase_description,
  demand_plan_stage_next_steps,
  implementation_notes,
  COALESCE(num_of_blockers, 0) AS num_of_blockers,
  last_modified_blocker_category,
  last_modified_blocker_comment,
  competition_string,
  industry_imperative
  
FROM main.gtm_data.core_usecase_curated
WHERE 
  last_modified_date > watermark_date
  AND estimated_monthly_dollar_dbus >= min_dbus
  AND stage NOT IN ('U1', 'Lost', 'Closed', 'Disqualified')
  AND target_live_date BETWEEN CURRENT_DATE() AND ADD_MONTHS(CURRENT_DATE(), target_months)
  AND last_modified_date >= ADD_MONTHS(CURRENT_DATE(), last_modified_months)
ORDER BY estimated_monthly_dollar_dbus DESC;

-- Display count and sample
SELECT 
  COUNT(*) AS new_usecases_count,
  MIN(last_modified_date) AS oldest_modified,
  MAX(last_modified_date) AS newest_modified
FROM filtered_usecases;


In [0]:
%sql
-- Generate contextual summaries using AI
CREATE OR REPLACE TEMPORARY VIEW ai_context AS
SELECT 
  usecase_id,
  CASE 
    WHEN (sdr_bdr_notes IS NOT NULL AND LENGTH(TRIM(sdr_bdr_notes)) > 0)
      OR (business_impact IS NOT NULL AND LENGTH(TRIM(business_impact)) > 0)
      OR (business_outcome_description IS NOT NULL AND LENGTH(TRIM(business_outcome_description)) > 0)
      OR (ae_authority IS NOT NULL AND LENGTH(TRIM(ae_authority)) > 0)
      OR (ae_money_budget IS NOT NULL AND LENGTH(TRIM(ae_money_budget)) > 0)
    THEN
      ai_summarize(
        CONCAT(
          'Summarize the following context and business impact into a concise paragraph (max 4 sentences). Focus on the main challenge, expected business value, the main type of contact (e.g., Executive sponsor, technical decision maker, Economic decision maker, etc), and whether funding is available or approved. Context: ',
          COALESCE(TRIM(LEFT(regexp_replace(sdr_bdr_notes, '<[^>]+>', ''), 2000)), 'No context provided. '),
          'Business Impact: ', COALESCE(TRIM(business_impact), 'No business impact provided. '),
          'Business Outcome: ', COALESCE(TRIM(business_outcome_description), 'No business outcome provided. '),
          'Main Contact Type: ', COALESCE(TRIM(ae_authority), 'Not specified.'),
          'Funding Status: ', COALESCE(TRIM(ae_money_budget), 'Not specified.')
        ),
        100
      )
    ELSE
      'No context or business impact provided.'
  END AS context
FROM filtered_usecases;

-- Preview
-- SELECT * FROM ai_context LIMIT 5;


In [0]:
%sql
-- Generate one-liner summaries using AI
CREATE OR REPLACE TEMPORARY VIEW ai_oneliner AS
SELECT 
  f.usecase_id,
  CASE
    WHEN (f.usecase_name IS NOT NULL AND LENGTH(TRIM(f.usecase_name)) > 0)
      OR (f.usecase_description IS NOT NULL AND LENGTH(TRIM(f.usecase_description)) > 0)
    THEN
      ai_summarize(
        CONCAT(
          'Generate a short, clear one-line summary (max 50 words) describing the core business objective of this use case. ',
          'Account Context: ', COALESCE(TRIM(c.context), 'No account context'),
          ' Use Case Name: ', COALESCE(TRIM(f.usecase_name), 'Use Case name not provided.'),
          '. Description: ', COALESCE(TRIM(f.usecase_description), 'No description provided.')
        ),
        50
      )
    ELSE
      'No use case name or description provided'
  END AS one_liner
FROM filtered_usecases f
LEFT JOIN ai_context c ON f.usecase_id = c.usecase_id;

-- SELECT * FROM ai_oneliner LIMIT 5;


In [0]:
%sql
-- Generate next steps summaries using AI
CREATE OR REPLACE TEMPORARY VIEW ai_nextsteps AS
SELECT 
  usecase_id,
  CASE 
    WHEN demand_plan_stage_next_steps IS NOT NULL 
         AND LENGTH(TRIM(demand_plan_stage_next_steps)) > 0 THEN
      ai_summarize(
        CONCAT(
          'Summarize the following next steps into a short, action-oriented sentence focusing on immediate priorities: ',
          TRIM(demand_plan_stage_next_steps)
        ),
        100
      )
    ELSE
      'No next steps provided'
  END AS next_steps_summary
FROM filtered_usecases;

-- SELECT * FROM ai_nextsteps LIMIT 5;


In [0]:
%sql
-- Generate implementation notes summaries using AI
CREATE OR REPLACE TEMPORARY VIEW ai_implementation AS
SELECT 
  usecase_id,
  CASE 
    WHEN implementation_notes IS NOT NULL AND LENGTH(TRIM(implementation_notes)) > 0 THEN
      ai_summarize(
        CONCAT(
          'Summarize the following implementation notes into a short, action-oriented paragraph focusing on key steps, milestones, and ownership: ',
          implementation_notes
        ),
        100
      )
    ELSE
      'There is no implementation plan or not specified in Salesforce'
  END AS implementation_notes_summary
FROM filtered_usecases;

-- SELECT * FROM ai_implementation LIMIT 5;


In [0]:
%sql
-- Generate blockers summaries using AI
CREATE OR REPLACE TEMPORARY VIEW ai_blockers AS
SELECT 
  usecase_id,
  CASE 
    WHEN num_of_blockers = 0 THEN 'There are 0 blockers'
    WHEN (last_modified_blocker_category IS NULL OR LENGTH(TRIM(last_modified_blocker_category)) = 0)
      AND (last_modified_blocker_comment IS NULL OR LENGTH(TRIM(last_modified_blocker_comment)) = 0)
      THEN 'Blocker details not provided'
    ELSE CONCAT(
      'There are ', CAST(num_of_blockers AS STRING), ' blockers. ',
      ai_summarize(
        CONCAT(
          'There are ', CAST(num_of_blockers AS STRING), ' blockers. Briefly summarize the main theme or issue based on the latest blocker comment: ',
          CASE 
            WHEN last_modified_blocker_category IS NOT NULL AND LENGTH(TRIM(last_modified_blocker_category)) > 0 
              THEN CONCAT('Category: ', TRIM(last_modified_blocker_category), '. ')
            ELSE ''
          END,
          COALESCE(TRIM(last_modified_blocker_comment), 'No blocker comment provided.')
        ),
        50
      )
    )
  END AS blockers_summary
FROM filtered_usecases;

-- SELECT * FROM ai_blockers LIMIT 5;


In [0]:
%sql
-- Classify use case types using AI
CREATE OR REPLACE TEMPORARY VIEW ai_usecase_types AS
SELECT 
  usecase_id,
  CASE
    WHEN (usecase_name IS NOT NULL AND LENGTH(TRIM(usecase_name)) > 0)
      OR (usecase_description IS NOT NULL AND LENGTH(TRIM(usecase_description)) > 0)
    THEN COALESCE(
      ai_classify(
        concat('Usecase Name: ', coalesce(trim(usecase_name), '.'), ' Description: ', coalesce(trim(usecase_description), '.')),
        array(
          'Machine Learning', 'Generative AI', 'Data Warehousing', 'Migration AI', 'Migration Unity Catalog',
          'Migration DWH', 'Migration ETL', 'Migration Streaming', 'Migration BI', 
          'BI', 'ETL', 'Governance', 'Streaming', 'Ingestion', 'Platform', 'Other'
        )
      ),
      'Other'
    )
    ELSE 'Other'
  END AS usecase_type,
  
  COALESCE(
    ai_query(
      'databricks-gemma-3-12b',
      CONCAT(
        'Given a use case name, description and industry imperative, return ONLY a concise business use case label (e.g. Fraud Detection, Inventory Forecasting, Customer Segmentation). Output exactly one short phrase, max 5 words, no punctuation or explanation. Usecase Name: ',
        COALESCE(TRIM(usecase_name), 'Unknown'), 
        '. Description: ', COALESCE(TRIM(usecase_description), 'No description'),
        '. Industry Imperative: ', COALESCE(TRIM(industry_imperative), 'No industry imperative')
      ),
      named_struct('max_tokens', 20, 'temperature', 0.0, 'failOnError', false)
    ),
    'Unknown Business Use Case'
  ) AS business_usecase_type
FROM filtered_usecases;

-- SELECT * FROM ai_usecase_types LIMIT 5;


In [None]:
%sql
-- Generate executive soundbytes using AI
CREATE OR REPLACE TEMPORARY VIEW ai_soundbyte AS
SELECT
  uct.usecase_id,
  COALESCE(
    ai_gen(
      CONCAT(
        'You are the Value Soundbite Agent. Your task is to generate a single elevator pitch sentence that connects a technical solution to a clear business outcome. You need to extract from the context\n\n',
        'Customer name: ', COALESCE(f.account_name, 'Unknown'), '\n',
        'Technical Challenge (current pain/limitation): ', COALESCE(one.one_liner, 'Not specified'), '\n',
        'Technical Solution / Benefit (what the technology enables): ', COALESCE(uct.usecase_type, 'Not specified'), '\n',
        'Business Benefit (how analysts/users/teams work better, faster, cheaper): ', COALESCE(uct.business_usecase_type, 'Not specified'), '\n',
        'Business Goal (strategic outcome such as cost savings, revenue growth, margin improvement, risk reduction, etc.): ', COALESCE(ctx.context, 'Not specified'), '\n\n',
        'Output:\n',
        'Write one concise, executive-ready sentence using this formula:\n',
        'By [implementing the solution] to tackle [technical challenge], [Customer] will [achieve business benefit] and [achieve business goal], enabling [strategic initiative/program] which will [ultimate business outcome].\n\n',
        'Guidelines:\n',
        'Keep the sentence short, punchy, and outcome-driven.\n',
        'Focus on benefits, not features.\n',
        'Use business-friendly language (no jargon).'
      )
    ),
    'No soundbyte generated'
  ) AS soundbyte
FROM ai_usecase_types uct
LEFT JOIN filtered_usecases f ON uct.usecase_id = f.usecase_id
LEFT JOIN ai_context ctx ON uct.usecase_id = ctx.usecase_id
LEFT JOIN ai_oneliner one ON uct.usecase_id = one.usecase_id;

-- SELECT soundbyte FROM ai_soundbyte LIMIT 5;


In [0]:
%sql
-- Generate AI confidence scores using MEDDPICC framework
CREATE OR REPLACE TEMPORARY VIEW ai_confidence AS
SELECT
  f.usecase_id,
  COALESCE(
    ai_query(
      'databricks-gemma-3-12b',
      CONCAT(
        'You are a Databricks Solutions Architect evaluating a customer use case and scoring its likelihood to reach production and generate sustained usage.\n\n',
        'Rate the use case using these MEDDPICC-based dimensions (0‚Äì10 each):\n',
        '1. Pain ‚Äì Clear, urgent business problem?\n',
        '2. Champion ‚Äì Identified internal advocate?\n',
        '3. Implementation Plan ‚Äì Concrete next steps and ownership?\n',
        '4. Decision Process ‚Äì Clear decision path?\n',
        '5. Urgency ‚Äì Time pressure or deadline?\n',
        '6. Competition Awareness ‚Äì Known competitors?\n',
        '7. Measurable Impact ‚Äì Defined success metrics?\n',
        '8. Major Blockers ‚Äì Significant obstacles (higher score = **more blockers**).\n\n',
        'Rules:\n',
        '- Base scores strictly on available evidence ‚Äî do not speculate or imagine missing info.\n',
        '- If information is missing, assign a low score (‚â§3) for that dimension.\n',
        '- Use these weightings: Pain 25%, Champion 20%, Implementation Plan 20%, Decision Process 10%, Urgency 10%, Competition Awareness 5%, Measurable Impact 2%, Major Blockers -8% (deductive).\n',
        '- Compute the total confidence_score as: (Pain√ó0.25 + Champion√ó0.20 + ImplementationPlan√ó0.20 + DecisionProcess√ó0.10 + Urgency√ó0.10 + CompetitionAwareness√ó0.05 + MeasurableImpact√ó0.02 - MajorBlockers√ó0.08) √ó 10.\n',
        '- A perfect use case (10s across positive dimensions, 0 blockers) scores 100.\n',
        '- The total confidence_score must be between 0 and 100, rounded to the nearest multiple of 10.\n\n',
        'Context:\n',
        'Account context: ', COALESCE(ctx.context, 'None'), '\n',
        'Use Case: ', COALESCE(one.one_liner, 'None'), '\n',
        'Next steps: ', COALESCE(ns.next_steps_summary, 'None'), '\n',
        'Implementation notes: ', COALESCE(imp.implementation_notes_summary, 'None'), '\n',
        'Blockers: ', COALESCE(blk.blockers_summary, 'None'), '\n',
        'Competition: ', COALESCE(f.competition_string, 'None'), '\n\n',
        'Return output in **this exact format** (no text before or after):\n\n',
        'CONFIDENCE_SCORE: [0‚Äì100 number]\n',
        'LEVEL: [High || Medium || Low]\n',
        'RATIONALE: [Brief explanation, 2‚Äì3 sentences max]\n',
        'DIMENSION_SCORES: Pain=x, Champion=x, ImplementationPlan=x, DecisionProcess=x, Urgency=x, CompetitionAwareness=x, MajorBlockers=x, MeasurableImpact=x'
      ),
      named_struct('max_tokens', 200, 'temperature', 0.0, 'top_p', 0.1, 'failOnError', false)
    ),
    'CONFIDENCE_SCORE: 0\nLEVEL: Low\nRATIONALE: No information provided.\nDIMENSION_SCORES: Pain=0, Champion=0, ImplementationPlan=0, DecisionProcess=0, Urgency=0, CompetitionAwareness=0, MajorBlockers=0, MeasurableImpact=0'
  ) AS ai_confidence_score_advanced
FROM filtered_usecases f
LEFT JOIN ai_context ctx ON f.usecase_id = ctx.usecase_id
LEFT JOIN ai_oneliner one ON f.usecase_id = one.usecase_id
LEFT JOIN ai_nextsteps ns ON f.usecase_id = ns.usecase_id
LEFT JOIN ai_implementation imp ON f.usecase_id = imp.usecase_id
LEFT JOIN ai_blockers blk ON f.usecase_id = blk.usecase_id;

-- SELECT * FROM ai_confidence LIMIT 3;


In [0]:
%sql
-- Extract confidence score components using regex
-- Apply override logic for use cases with >3 blockers
CREATE OR REPLACE TEMPORARY VIEW parsed_confidence AS
SELECT
  ac.usecase_id,
  ac.ai_confidence_score_advanced,
  
  -- Override confidence_score to 10 if >3 blockers, otherwise extract from AI response
  CASE
    WHEN COALESCE(f.num_of_blockers, 0) > 3 THEN 10
    ELSE CAST(regexp_extract(ac.ai_confidence_score_advanced, 'CONFIDENCE_SCORE:\\s*(\\d+)', 1) AS INT)
  END AS confidence_score,
  
  -- Override confidence_level to 'Low' if >3 blockers, otherwise extract from AI response
  CASE
    WHEN COALESCE(f.num_of_blockers, 0) > 3 THEN 'Low'
    ELSE regexp_extract(ac.ai_confidence_score_advanced, 'LEVEL:\\s*(High|Medium|Low)', 1)
  END AS confidence_level,
  
  -- Override rationale if >3 blockers, otherwise extract from AI response
  CASE
    WHEN COALESCE(f.num_of_blockers, 0) > 3 THEN 'The use case has too many blockers'
    ELSE regexp_extract(ac.ai_confidence_score_advanced, 'RATIONALE:\\s*(.+?)\\s*DIMENSION_SCORES:', 1)
  END AS rationale_for_confidence_score,
  
  -- Extract MEDDPICC dimension scores (no override - these come from AI analysis)
  CAST(regexp_extract(ac.ai_confidence_score_advanced, 'Pain=(\\d+)', 1) AS INT) AS pain_score,
  CAST(regexp_extract(ac.ai_confidence_score_advanced, 'Champion=(\\d+)', 1) AS INT) AS champion_score,
  CAST(regexp_extract(ac.ai_confidence_score_advanced, 'ImplementationPlan=(\\d+)', 1) AS INT) AS implementationplan_score,
  CAST(regexp_extract(ac.ai_confidence_score_advanced, 'DecisionProcess=(\\d+)', 1) AS INT) AS decisionprocess_score,
  CAST(regexp_extract(ac.ai_confidence_score_advanced, 'Urgency=(\\d+)', 1) AS INT) AS urgency_score,
  CAST(regexp_extract(ac.ai_confidence_score_advanced, 'CompetitionAwareness=(\\d+)', 1) AS INT) AS competitionawareness_score,
  CAST(regexp_extract(ac.ai_confidence_score_advanced, 'MajorBlockers=(\\d+)', 1) AS INT) AS majorblockers_score,
  CAST(regexp_extract(ac.ai_confidence_score_advanced, 'MeasurableImpact=(\\d+)', 1) AS INT) AS measurableimpact_score
FROM ai_confidence ac
LEFT JOIN filtered_usecases f ON ac.usecase_id = f.usecase_id;

-- SELECT confidence_score, confidence_level, rationale_for_confidence_score, pain_score, champion_score FROM parsed_confidence LIMIT 5;


In [0]:
%sql
-- Apply normalized confidence level thresholds
CREATE OR REPLACE TEMPORARY VIEW normalized_confidence AS
SELECT
  *,
  CASE
    WHEN confidence_score >= 75 THEN 'High'
    WHEN confidence_score >= 45 THEN 'Medium'
    WHEN confidence_score IS NOT NULL THEN 'Low'
    ELSE 'Not computed'
  END AS confidence_level_normalized
FROM parsed_confidence;

-- SELECT confidence_level_normalized, COUNT(*) AS count FROM normalized_confidence GROUP BY confidence_level_normalized ORDER BY count DESC;


In [0]:
%sql
-- Generate next best action recommendations
CREATE OR REPLACE TEMPORARY VIEW ai_next_best_action AS
SELECT
  nc.usecase_id,
  COALESCE(
    ai_query(
      'databricks-gpt-oss-20b',
      CONCAT(
        'You are a Databricks Solutions Architect helping an account team decide the next best action for a customer account and a specific use case.\n',
        'Inputs:\n',
        'Account context: ', COALESCE(ctx.context, 'None'), '\n',
        'Use Case: ', COALESCE(one.one_liner, 'None'), '\n',
        'Next steps summary: ', COALESCE(ns.next_steps_summary, 'None'), '\n',
        'Implementation notes: ', COALESCE(imp.implementation_notes_summary, 'None'), '\n',
        'Blockers: ', COALESCE(blk.blockers_summary, 'None'), '\n',
        'Competition: ', COALESCE(f.competition_string, 'None'), '\n',
        'MEDDPICC dimension scores: Pain=', COALESCE(CAST(nc.pain_score AS STRING), 'None'),
        ', Champion=', COALESCE(CAST(nc.champion_score AS STRING), 'None'),
        ', ImplementationPlan=', COALESCE(CAST(nc.implementationplan_score AS STRING), 'None'),
        ', DecisionProcess=', COALESCE(CAST(nc.decisionprocess_score AS STRING), 'None'),
        ', Urgency=', COALESCE(CAST(nc.urgency_score AS STRING), 'None'),
        ', CompetitionAwareness=', COALESCE(CAST(nc.competitionawareness_score AS STRING), 'None'),
        ', MajorBlockers=', COALESCE(CAST(nc.majorblockers_score AS STRING), 'None'),
        ', MeasurableImpact=', COALESCE(CAST(nc.measurableimpact_score AS STRING), 'None'), '\n',
        'Overall confidence level: ', COALESCE(nc.confidence_level_normalized, 'None'), '\n\n',
        'Rules:\n',
        '1. Base your recommendation strictly on the evidence provided ‚Äî do not assume missing information.\n',
        '2. Prioritize actions that maximize the likelihood of a successful go-live and reduce blockers.\n',
        '3. If Pain, Champion, or Implementation Plan is low (<5/10), suggest actions to strengthen these dimensions first.\n',
        '4. If Major Blockers > 5/10, suggest actions to mitigate or remove the blockers.\n',
        '5. If Urgency is high (>7/10), prioritize time-sensitive actions.\n',
        '6. Provide one concise, actionable next step, written as if for the account team to execute within the next week.\n',
        '7. Keep your answer short ‚Äî 1‚Äì2 sentences max.\n',
        '8. Be deterministic: always follow the rules exactly and do not vary wording unnecessarily.\n\n',
        'Return output in this exact format (no extra text):\n\n',
        'NEXT_BEST_ACTION: [Short, actionable recommendation for the account team]\n',
        'RATIONALE: [Brief explanation linking the action to the MEDDPICC scores and blockers, ‚â§2 sentences]'
      ),
      named_struct('max_tokens', 200, 'temperature', 0.0, 'top_p', 0.1, 'failOnError', false)
    ),
    'NEXT_BEST_ACTION: Review MEDDPICC evidence and address missing information.\nRATIONALE: No sufficient context or scores provided to generate a specific action.'
  ) AS next_best_action_recommendation
FROM normalized_confidence nc
LEFT JOIN filtered_usecases f ON nc.usecase_id = f.usecase_id
LEFT JOIN ai_context ctx ON nc.usecase_id = ctx.usecase_id
LEFT JOIN ai_oneliner one ON nc.usecase_id = one.usecase_id
LEFT JOIN ai_nextsteps ns ON nc.usecase_id = ns.usecase_id
LEFT JOIN ai_implementation imp ON nc.usecase_id = imp.usecase_id
LEFT JOIN ai_blockers blk ON nc.usecase_id = blk.usecase_id;

-- SELECT * FROM ai_next_best_action LIMIT 3;


In [0]:
%sql
-- Classify slippage and acceleration categories
CREATE OR REPLACE TEMPORARY VIEW ai_slippage_acceleration AS
SELECT 
  nc.usecase_id,
  
  -- Flag use cases likely to slip
  CASE 
    WHEN nc.confidence_level_normalized = 'Medium' THEN true
    ELSE false
  END AS likely_to_slip,
  
  -- Classify slippage category
  CASE
    WHEN nc.confidence_level_normalized = 'Medium'
    THEN COALESCE(
      ai_classify(
        CONCAT(
          'You are a Databricks Senior Solutions Architect.\n',
          'Classify the PRIMARY reason this use case is likely to slip. Choose exactly ONE from:\n',
          '[Technical, Business, Stakeholder, Budget, Project Timelines, Data, Integration, Partner, Hyperscaler, Competition, Other, External dependencies]\n\n',
          'Decision Rules:\n',
          '- Technical ‚Üí missing features, blockers, or bugs.\n',
          '- Business ‚Üí shifting priorities or unclear value.\n',
          '- Stakeholder ‚Üí low buy-in or sponsor turnover.\n',
          '- Budget ‚Üí funding or approval delays.\n',
          '- Project Timelines ‚Üí missed milestones or dependencies.\n',
          '- Data ‚Üí quality, access, or integration issues.\n',
          '- Integration ‚Üí tooling or system connectivity issues.\n',
          '- Partner ‚Üí bandwidth or enablement gaps.\n',
          '- Hyperscaler ‚Üí regional, quota, or policy constraints.\n',
          '- Competition ‚Üí market or rival-driven reprioritization.\n',
          '- External dependencies ‚Üí third-party or customer-side blockers.\n',
          '- Other ‚Üí none of the above.\n\n',
          'Inputs:\n',
          'Confidence Score Rationale: ', COALESCE(LEFT(nc.rationale_for_confidence_score, 200), 'N/A'), '\n',
          'Next Step: ', COALESCE(LEFT(nba.next_best_action_recommendation, 200), 'N/A')
        ),
        ARRAY(
          'Technical', 'Business', 'Stakeholder', 'Budget', 'Project Timelines', 'Data', 'Integration', 'Partner', 'Hyperscaler', 'Competition', 'Other', 'External dependencies'
        )
      ),
      'Unable to determine slippage category'
    )
    ELSE 'N/A'
  END AS slippage_category,
  
  -- Flag use cases that can be accelerated
  CASE 
    WHEN nc.confidence_level_normalized IN ('Medium', 'High') THEN true
    ELSE false
  END AS can_be_accelerated,
  
  -- Classify acceleration category
  CASE
    WHEN nc.confidence_level_normalized IN ('Medium', 'High') THEN 
      COALESCE(
        ai_classify(
          CONCAT(
            'You are a Databricks Senior Solutions Architect.\n',
            'Classify the PRIMARY reason this use case could be accelerated.\n',
            'Choose exactly ONE label from:\n',
            '[Implementation Planning, Resource Allocation, Stakeholder Engagement, Technical Optimization, Partnership Leverage, Governance and Support, Innovation and Technology Enablement, Capability Development, Funding, Other]\n\n',
            'Decision rules:\n',
            '- Implementation Planning ‚Üí clarifying the WHAT/WHEN/WHO (critical path, milestones, owners).\n',
            '- Resource Allocation ‚Üí people/tools/compute NOW.\n',
            '- Stakeholder Engagement ‚Üí alignment, cadence, or decisions.\n',
            '- Technical Optimization ‚Üí performance, architecture, integration tuning.\n',
            '- Innovation and Technology Enablement ‚Üí templates, automation, AI accelerators.\n',
            '- Governance and Support ‚Üí steering, dashboards, SLAs, support.\n',
            '- Capability Development ‚Üí training, upskilling, knowledge transfer.\n',
            '- Partnership Leverage ‚Üí partner expertise/assets as key lever.\n',
            '- Funding ‚Üí securing or expediting budget, approvals, resources.\n',
            '- Other ‚Üí none of the above.\n\n',
            'Return exactly one of the labels above ‚Äî no punctuation or explanation.\n\n',
            'Context:\n',
            'Confidence Score Rationale: ', COALESCE(LEFT(nc.rationale_for_confidence_score, 200), 'N/A'), '\n',
            'Next Step: ', COALESCE(LEFT(nba.next_best_action_recommendation, 200), 'N/A')
          ),
          ARRAY(
            'Implementation Planning', 'Resource Allocation', 'Stakeholder Engagement', 'Technical Optimization', 
            'Partnership Leverage', 'Governance and Support', 'Innovation and Technology Enablement', 
            'Capability Development', 'Funding', 'Other'
          )
        ),
        'Unable to determine acceleration category'
      )
    ELSE 'N/A'
  END AS accel_category
FROM normalized_confidence nc
LEFT JOIN ai_next_best_action nba ON nc.usecase_id = nba.usecase_id;

-- SELECT likely_to_slip, slippage_category, can_be_accelerated, accel_category FROM ai_slippage_acceleration LIMIT 5;


In [0]:
%sql
-- Join all AI enrichments - keep only minimal columns
-- Can always re-join to source table on usecase_id for other fields
CREATE OR REPLACE TEMPORARY VIEW enriched_results AS
SELECT 
  -- Core identifiers and timestamps
  base.usecase_id,
  base.last_modified_date,
  CURRENT_DATE() AS piq_last_updated,
  
  -- Fields needed for AI inference (kept for reference)
  base.competition_string,
  base.num_of_blockers,
  -- AI enriched fields
  ctx.context,
  one.one_liner,
  ns.next_steps_summary,
  imp.implementation_notes_summary,
  blk.blockers_summary,
  uct.usecase_type,
  uct.business_usecase_type,
  sb.soundbyte,
  
  -- Confidence scoring
  nc.ai_confidence_score_advanced,
  nc.confidence_score,
  nc.confidence_level,
  nc.rationale_for_confidence_score,
  nc.pain_score,
  nc.champion_score,
  nc.implementationplan_score,
  nc.decisionprocess_score,
  nc.urgency_score,
  nc.competitionawareness_score,
  nc.majorblockers_score,
  nc.measurableimpact_score,
  nc.confidence_level_normalized,
  
  -- Recommendations and classifications
  nba.next_best_action_recommendation,
  sa.likely_to_slip,
  sa.slippage_category,
  sa.can_be_accelerated,
  sa.accel_category
FROM filtered_usecases base
LEFT JOIN ai_context ctx ON base.usecase_id = ctx.usecase_id
LEFT JOIN ai_oneliner one ON base.usecase_id = one.usecase_id
LEFT JOIN ai_nextsteps ns ON base.usecase_id = ns.usecase_id
LEFT JOIN ai_implementation imp ON base.usecase_id = imp.usecase_id
LEFT JOIN ai_blockers blk ON base.usecase_id = blk.usecase_id
LEFT JOIN ai_usecase_types uct ON base.usecase_id = uct.usecase_id
LEFT JOIN ai_soundbyte sb ON base.usecase_id = sb.usecase_id
LEFT JOIN normalized_confidence nc ON base.usecase_id = nc.usecase_id
LEFT JOIN ai_next_best_action nba ON base.usecase_id = nba.usecase_id
LEFT JOIN ai_slippage_acceleration sa ON base.usecase_id = sa.usecase_id;

-- Show count of enriched records
-- SELECT 
--   COUNT(*) AS total_enriched_records,
--   COUNT(context) AS with_context,
--   COUNT(confidence_score) AS with_confidence_score
-- FROM enriched_results;


In [0]:
%sql
-- Merge enriched results into target table
-- Updates when source last_modified_date is newer, always updates piq_last_updated
MERGE INTO IDENTIFIER(:catalog || '.' || :schema || '.pipelineiq') AS target
USING enriched_results AS source
ON target.usecase_id = source.usecase_id
WHEN MATCHED AND source.last_modified_date > target.last_modified_date THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *;


In [0]:
%sql
-- Add table properties for tracking AI model and processing metadata
ALTER TABLE IDENTIFIER(:catalog || '.' || :schema || '.pipelineiq') SET TBLPROPERTIES (
  'ai.model.primary' = 'databricks-gpt-oss-20b',
  'ai.model.confidence' = 'databricks-gemma-3-12b',
  'ai.model.business_usecase' = 'databricks-gemma-3-12b',
  -- 'ai.generation.timestamp' = CURRENT_DATE(), -- replaced by the piq_last_updated field
  'ai.prompt.version' = 'v1.4',
  'pipeline.type' = 'incremental',
  'quality' = 'gold'
);

-- SELECT 'Metadata added to table: ' || :catalog || '.' || :schema || '.pipelineiq' AS status;


In [0]:
%sql
-- Verify the merge results
SELECT 
  'Total Records' AS metric,
  COUNT(*) AS count
FROM IDENTIFIER(:catalog || '.' || :schema || '.pipelineiq')

UNION ALL

SELECT 
  'Records with AI Enrichment' AS metric,
  COUNT(*) AS count
FROM IDENTIFIER(:catalog || '.' || :schema || '.pipelineiq')
WHERE context IS NOT NULL

UNION ALL

SELECT 
  'Records with Confidence Score' AS metric,
  COUNT(*) AS count
FROM IDENTIFIER(:catalog || '.' || :schema || '.pipelineiq')
WHERE confidence_score IS NOT NULL

UNION ALL

SELECT 
  'High Confidence' AS metric,
  COUNT(*) AS count
FROM IDENTIFIER(:catalog || '.' || :schema || '.pipelineiq')
WHERE confidence_level_normalized = 'High'

UNION ALL

SELECT 
  'Medium Confidence' AS metric,
  COUNT(*) AS count
FROM IDENTIFIER(:catalog || '.' || :schema || '.pipelineiq')
WHERE confidence_level_normalized = 'Medium'

UNION ALL

SELECT 
  'Low Confidence' AS metric,
  COUNT(*) AS count
FROM IDENTIFIER(:catalog || '.' || :schema || '.pipelineiq')
WHERE confidence_level_normalized = 'Low';


In [0]:
%sql
select piq_last_updated, count(*) from IDENTIFIER(:catalog || '.' || :schema || '.pipelineiq') group by all

In [0]:
%sql
-- Show sample of enriched records (recently processed)
-- Note: To see usecase_name and other source fields, join back to core_usecase_curated on usecase_id
SELECT 
  usecase_id,
  one_liner,
  confidence_score,
  confidence_level_normalized,
  usecase_type,
  business_usecase_type,
  likely_to_slip,
  slippage_category,
  can_be_accelerated,
  accel_category,
  last_modified_date,
  piq_last_updated
FROM IDENTIFIER(:catalog || '.' || :schema || '.pipelineiq')
WHERE piq_last_updated = CURRENT_DATE()
ORDER BY confidence_score DESC
LIMIT 10;


In [0]:
%sql
-- Create pipelineiq_view matching Confidence Analysis structure
-- This view joins PipelineIQ AI enrichments with source data and applies look-ahead filters
-- Filters are applied to exclude closed/lost use cases and focus on active pipeline
-- DEVELOPMENT NOTE: ‚ö†Ô∏è Variable values are not allowed, if you update min DBUs or the window dates, you need to update them here too!!
use catalog IDENTIFIER(:catalog);
use schema IDENTIFIER(:schema);
CREATE OR REPLACE VIEW IDENTIFIER(:catalog || '.' || :schema || '.pipelineiq_view') AS
SELECT 
  -- Original fields from source (with transformations)
  src.snapshot_date,
  CASE 
    WHEN src.field_manager IS NULL THEN 'FLM not assigned'
    ELSE src.field_manager
  END AS field_manager,
  src.account_name,
  piq.usecase_id,
  src.usecase_name,
  src.last_modified_date,
  src.account_executive,
  CASE 
    WHEN src.account_solution_architect IS NULL THEN 'SA not assigned'
    ELSE src.account_solution_architect
  END AS account_solution_architect,
  src.stage,
  src.target_cloud,
  src.primary_competitor,
  src.target_live_date,
  src.target_onboarding_date,
  src.estimated_monthly_dollar_dbus,
  src.usecase_description,
  src.demand_plan_stage_next_steps,
  src.implementation_notes,
  CASE
    WHEN src.sales_region IS NOT NULL THEN src.sales_region
    WHEN src.sales_region IS NULL AND src.business_unit IS NOT NULL THEN src.business_unit
    WHEN src.sales_region IS NULL AND src.business_unit IS NULL THEN 'No assigned'
    ELSE 'No assigned'
  END AS sales_region,
  src.sales_subregion_level_1,
  src.sales_subregion_level_2,
  src.sales_subregion_level_3,
  src.sales_subregion_level_4,
  src.sales_manager,
  src.sales_leader,
  src.field_leader,
  src.use_case_product,
  src.blockers_last_modified_date,
  src.sdr_bdr_notes,
  src.business_impact,
  src.business_outcome_description,
  src.ae_authority,
  src.ae_money_budget,
  src.business_unit,
  piq.num_of_blockers,
  src.last_modified_blocker_category,
  src.last_modified_blocker_comment,
  src.competition_string,
  
  -- AI-generated fields
  piq.context,
  piq.one_liner,
  piq.next_steps_summary,
  piq.implementation_notes_summary,
  piq.blockers_summary,
  piq.usecase_type,
  piq.business_usecase_type,
  piq.soundbyte,
  
  -- Computed staleness flag (always relative to current date)
  CASE 
    WHEN src.last_modified_date < DATE_SUB(CURRENT_DATE(), 14) 
    THEN true 
    ELSE false 
  END AS is_stale,
  
  piq.ai_confidence_score_advanced,
  piq.confidence_score,
  piq.confidence_level,
  piq.rationale_for_confidence_score,
  piq.pain_score,
  piq.champion_score,
  piq.implementationplan_score,
  piq.decisionprocess_score,
  piq.urgency_score,
  piq.competitionawareness_score,
  piq.majorblockers_score,
  piq.measurableimpact_score,
  piq.confidence_level_normalized,
  piq.next_best_action_recommendation,
  piq.likely_to_slip,
  piq.slippage_category,
  piq.can_be_accelerated,
  piq.accel_category,
  
  -- Set vertical to 'Not specified' if null
  COALESCE(src.vertical, 'Not specified') AS vertical
FROM pipelineiq piq
INNER JOIN main.gtm_data.core_usecase_curated src 
  ON piq.usecase_id = src.usecase_id
WHERE 
  -- Apply look-ahead filters to focus on active pipeline
  -- These filters prevent closed/lost use cases from appearing in the view
  src.estimated_monthly_dollar_dbus >= 1000
  AND src.stage NOT IN ('U1', 'Lost', 'Closed', 'Disqualified')
  AND src.target_live_date BETWEEN CURRENT_DATE() AND ADD_MONTHS(CURRENT_DATE(), 6);

-- Add column comments for key fields
COMMENT ON VIEW IDENTIFIER(:catalog || '.' || :schema || '.pipelineiq_view') IS 'Business-facing view of PipelineIQ with cleaned recommendation labels, normalized verticals, and presentation-ready AI enrichment. Applies look-ahead filters to focus on active pipeline use cases.';

-- Add table properties
ALTER VIEW IDENTIFIER(:catalog || '.' || :schema || '.pipelineiq_view') SET TBLPROPERTIES (
  'ai.pipeline.version' = 'v1.4',
  'ai.model.primary' = 'databricks-gpt-oss-20b',
  'ai.model.confidence' = 'databricks-gemma-3-12b',
  'ai.model.business_usecase' = 'databricks-gemma-3-12b',
  'ai.view.purpose' = 'Executive insights and reporting',
  'pipeline.type' = 'incremental',
  'quality' = 'gold'
);

-- Display the first 10 lines of the view
SELECT * FROM IDENTIFIER(:catalog || '.' || :schema || '.pipelineiq_view') LIMIT 10;


In [None]:
%sql
-- Create field manager summary view for executive reporting
-- Aggregates use case metrics, confidence distribution, and top categories by field manager
CREATE OR REPLACE VIEW IDENTIFIER(:catalog || '.' || :schema || '.field_manager_summary_view') (
  snapshot_date,
  field_manager,
  sales_region,
  sales_subregion_level_1,
  sales_subregion_level_2,
  sales_subregion_level_3,
  sales_subregion_level_4,
  total_use_cases,
  total_estimated_monthly_dbu,
  high_confidence_count,
  high_confidence_pct,
  medium_confidence_count,
  medium_confidence_pct,
  low_confidence_count,
  low_confidence_pct,
  slip_usecase_count,
  slip_total_monthly_dbu,
  accel_usecase_count,
  accel_total_monthly_dbu,
  stale_usecase_count,
  stale_total_monthly_dbu,
  u3_stale_usecase_count,
  u3_stale_total_monthly_dbu,
  u5_stale_usecase_count,
  u5_stale_total_monthly_dbu,
  one_blocker_usecase_count,
  one_blocker_total_monthly_dbu,
  azure_medhigh_usecase_count,
  azure_medhigh_total_monthly_dbu,
  threeplus_blocker_usecase_count,
  threeplus_blocker_total_monthly_dbu,
  top1_usecase_type,
  top1_usecase_type_count,
  top1_usecase_type_monthly_dbu,
  top2_usecase_type,
  top2_usecase_type_count,
  top2_usecase_type_monthly_dbu,
  top3_usecase_type,
  top3_usecase_type_count,
  top3_usecase_type_monthly_dbu,
  top1_slippage_category,
  top1_slip_usecase_count,
  top1_slippage_category_description,
  top2_slippage_category,
  top2_slip_usecase_count,
  top2_slippage_category_description,
  top3_slippage_category,
  top3_slip_usecase_count,
  top3_slippage_category_description,
  top1_accel_category,
  top1_accel_usecase_count,
  top1_accel_category_description,
  top2_accel_category,
  top2_accel_usecase_count,
  top2_accel_category_description,
  top3_accel_category,
  top3_accel_usecase_count,
  top3_accel_category_description,
  top1_vertical,
  top1_vertical_usecase_count,
  top2_vertical,
  top2_vertical_usecase_count,
  top3_vertical,
  top3_vertical_usecase_count,
  name,
  surname,
  generated_email,
  email,
  email_status
) AS
WITH field_manager_summary AS (
  SELECT
    any_value(snapshot_date) as snapshot_date,
    field_manager,
    any_value(sales_region) as sales_region,
    any_value(sales_subregion_level_1) as sales_subregion_level_1,
    any_value(sales_subregion_level_2) as sales_subregion_level_2,
    any_value(sales_subregion_level_3) as sales_subregion_level_3,
    any_value(sales_subregion_level_4) as sales_subregion_level_4,
    COUNT(*) AS total_use_cases,
    SUM(estimated_monthly_dollar_dbus) AS total_estimated_monthly_dbu,
    SUM(CASE WHEN confidence_level_normalized = 'High' THEN 1 ELSE 0 END) AS high_confidence_count,
    ROUND(100.0 * SUM(CASE WHEN confidence_level_normalized = 'High' THEN 1 ELSE 0 END) / COUNT(*), 2) AS high_confidence_pct,
    SUM(CASE WHEN confidence_level_normalized = 'Medium' THEN 1 ELSE 0 END) AS medium_confidence_count,
    ROUND(100.0 * SUM(CASE WHEN confidence_level_normalized = 'Medium' THEN 1 ELSE 0 END) / COUNT(*), 2) AS medium_confidence_pct,
    SUM(CASE WHEN confidence_level_normalized = 'Low' THEN 1 ELSE 0 END) AS low_confidence_count,
    ROUND(100.0 * SUM(CASE WHEN confidence_level_normalized = 'Low' THEN 1 ELSE 0 END) / COUNT(*), 2) AS low_confidence_pct,
    SUM(CASE WHEN likely_to_slip = TRUE THEN 1 ELSE 0 END) AS slip_usecase_count,
    SUM(CASE WHEN likely_to_slip = TRUE THEN estimated_monthly_dollar_dbus ELSE 0 END) AS slip_total_monthly_dbu,
    SUM(CASE WHEN can_be_accelerated = TRUE THEN 1 ELSE 0 END) AS accel_usecase_count,
    SUM(CASE WHEN can_be_accelerated = TRUE THEN estimated_monthly_dollar_dbus ELSE 0 END) AS accel_total_monthly_dbu,
    SUM(CASE WHEN stage IN ('U3', 'U5') AND last_modified_date < date_sub(current_date(), 14) THEN 1 ELSE 0 END) AS stale_usecase_count,
    SUM(CASE WHEN stage IN ('U3', 'U5') AND last_modified_date < date_sub(current_date(), 14) THEN estimated_monthly_dollar_dbus ELSE 0 END) AS stale_total_monthly_dbu,
    SUM(CASE WHEN stage = 'U3' AND last_modified_date < date_sub(current_date(), 14) THEN 1 ELSE 0 END) AS u3_stale_usecase_count,
    SUM(CASE WHEN stage = 'U3' AND last_modified_date < date_sub(current_date(), 14) THEN estimated_monthly_dollar_dbus ELSE 0 END) AS u3_stale_total_monthly_dbu,
    SUM(CASE WHEN stage = 'U5' AND last_modified_date < date_sub(current_date(), 14) THEN 1 ELSE 0 END) AS u5_stale_usecase_count,
    SUM(CASE WHEN stage = 'U5' AND last_modified_date < date_sub(current_date(), 14) THEN estimated_monthly_dollar_dbus ELSE 0 END) AS u5_stale_total_monthly_dbu,
    SUM(CASE WHEN num_of_blockers = 1 THEN 1 ELSE 0 END) AS one_blocker_usecase_count,
    SUM(CASE WHEN num_of_blockers = 1 THEN estimated_monthly_dollar_dbus ELSE 0 END) AS one_blocker_total_monthly_dbu,
    SUM(CASE WHEN stage IN ('U2', 'U4', 'U5') AND confidence_level_normalized IN ('Medium', 'High') AND lower(target_cloud) LIKE '%azure%' THEN 1 ELSE 0 END) AS azure_medhigh_usecase_count,
    SUM(CASE WHEN stage IN ('U2', 'U4', 'U5') AND confidence_level_normalized IN ('Medium', 'High') AND lower(target_cloud) LIKE '%azure%' THEN estimated_monthly_dollar_dbus ELSE 0 END) AS azure_medhigh_total_monthly_dbu,
    SUM(CASE WHEN num_of_blockers >= 3 THEN 1 ELSE 0 END) AS threeplus_blocker_usecase_count,
    SUM(CASE WHEN num_of_blockers >= 3 THEN estimated_monthly_dollar_dbus ELSE 0 END) AS threeplus_blocker_total_monthly_dbu
  FROM IDENTIFIER(:catalog || '.' || :schema || '.pipelineiq_view')
  GROUP BY field_manager
),
slip_categories AS (
  SELECT
    field_manager,
    sc.slippage_category,
    COUNT(*) AS slip_usecase_count,
    ROW_NUMBER() OVER (PARTITION BY field_manager ORDER BY COUNT(*) DESC) AS rn
  FROM IDENTIFIER(:catalog || '.' || :schema || '.pipelineiq_view') sc
  WHERE likely_to_slip = TRUE
  GROUP BY field_manager, slippage_category
),
accel_categories AS (
  SELECT
    field_manager,
    ac.accel_category,
    COUNT(*) AS accel_usecase_count,
    ROW_NUMBER() OVER (PARTITION BY field_manager ORDER BY COUNT(*) DESC) AS rn
  FROM IDENTIFIER(:catalog || '.' || :schema || '.pipelineiq_view') ac
  WHERE can_be_accelerated = TRUE
  GROUP BY field_manager, accel_category
),
usecase_type_ranked AS (
  SELECT
    field_manager,
    usecase_type,
    COUNT(*) AS usecase_type_count,
    SUM(estimated_monthly_dollar_dbus) AS usecase_type_monthly_dbu,
    ROW_NUMBER() OVER (PARTITION BY field_manager ORDER BY SUM(estimated_monthly_dollar_dbus) DESC) AS rn
  FROM IDENTIFIER(:catalog || '.' || :schema || '.pipelineiq_view')
  GROUP BY field_manager, usecase_type
),
vertical_categories AS (
  SELECT
    field_manager,
    vertical,
    COUNT(*) AS vertical_usecase_count,
    ROW_NUMBER() OVER (PARTITION BY field_manager ORDER BY COUNT(*) DESC) AS rn
  FROM IDENTIFIER(:catalog || '.' || :schema || '.pipelineiq_view')
  GROUP BY field_manager, vertical
)
SELECT
  fms.snapshot_date,
  fms.field_manager,
  fms.sales_region,
  fms.sales_subregion_level_1,
  fms.sales_subregion_level_2,
  fms.sales_subregion_level_3,
  fms.sales_subregion_level_4,
  fms.total_use_cases,
  fms.total_estimated_monthly_dbu,
  fms.high_confidence_count,
  fms.high_confidence_pct,
  fms.medium_confidence_count,
  fms.medium_confidence_pct,
  fms.low_confidence_count,
  fms.low_confidence_pct,
  fms.slip_usecase_count,
  fms.slip_total_monthly_dbu,
  fms.accel_usecase_count,
  fms.accel_total_monthly_dbu,
  fms.stale_usecase_count,
  fms.stale_total_monthly_dbu,
  fms.u3_stale_usecase_count,
  fms.u3_stale_total_monthly_dbu,
  fms.u5_stale_usecase_count,
  fms.u5_stale_total_monthly_dbu,
  fms.one_blocker_usecase_count,
  fms.one_blocker_total_monthly_dbu,
  fms.azure_medhigh_usecase_count,
  fms.azure_medhigh_total_monthly_dbu,
  fms.threeplus_blocker_usecase_count,
  fms.threeplus_blocker_total_monthly_dbu,
  coalesce(u1.usecase_type, '') AS top1_usecase_type,
  coalesce(u1.usecase_type_count, 0) AS top1_usecase_type_count,
  coalesce(u1.usecase_type_monthly_dbu, 0) AS top1_usecase_type_monthly_dbu,
  coalesce(u2.usecase_type, '') AS top2_usecase_type,
  coalesce(u2.usecase_type_count, 0) AS top2_usecase_type_count,
  coalesce(u2.usecase_type_monthly_dbu, 0) AS top2_usecase_type_monthly_dbu,
  coalesce(u3.usecase_type, '') AS top3_usecase_type,
  coalesce(u3.usecase_type_count, 0) AS top3_usecase_type_count,
  coalesce(u3.usecase_type_monthly_dbu, 0) AS top3_usecase_type_monthly_dbu,
  coalesce(sc1.slippage_category, '') AS top1_slippage_category,
  coalesce(sc1.slip_usecase_count, 0) AS top1_slip_usecase_count,
  coalesce(sc1desc.description, '') AS top1_slippage_category_description,
  coalesce(sc2.slippage_category, '') AS top2_slippage_category,
  coalesce(sc2.slip_usecase_count, 0) AS top2_slip_usecase_count,
  coalesce(sc2desc.description, '') AS top2_slippage_category_description,
  coalesce(sc3.slippage_category, '') AS top3_slippage_category,
  coalesce(sc3.slip_usecase_count, 0) AS top3_slip_usecase_count,
  coalesce(sc3desc.description, '') AS top3_slippage_category_description,
  coalesce(ac1.accel_category, '') AS top1_accel_category,
  coalesce(ac1.accel_usecase_count, 0) AS top1_accel_usecase_count,
  coalesce(ac1desc.description, '') AS top1_accel_category_description,
  coalesce(ac2.accel_category, '') AS top2_accel_category,
  coalesce(ac2.accel_usecase_count, 0) AS top2_accel_usecase_count,
  coalesce(ac2desc.description, '') AS top2_accel_category_description,
  coalesce(ac3.accel_category, '') AS top3_accel_category,
  coalesce(ac3.accel_usecase_count, 0) AS top3_accel_usecase_count,
  coalesce(ac3desc.description, '') AS top3_accel_category_description,
  coalesce(v1.vertical, '') AS top1_vertical,
  coalesce(v1.vertical_usecase_count, 0) AS top1_vertical_usecase_count,
  coalesce(v2.vertical, '') AS top2_vertical,
  coalesce(v2.vertical_usecase_count, 0) AS top2_vertical_usecase_count,
  coalesce(v3.vertical, '') AS top3_vertical,
  coalesce(v3.vertical_usecase_count, 0) AS top3_vertical_usecase_count,
  split(fms.field_manager, ' ')[0] AS name,
  split(fms.field_manager, ' ')[size(split(fms.field_manager, ' '))-1] AS surname,
  concat(
    lower(split(fms.field_manager, ' ')[0]), 
    '.', 
    lower(split(fms.field_manager, ' ')[size(split(fms.field_manager, ' '))-1]), 
    '@databricks.com'
  ) AS generated_email,
  concat(
    lower(split(fms.field_manager, ' ')[0]), 
    '.', 
    lower(split(fms.field_manager, ' ')[size(split(fms.field_manager, ' '))-1]), 
    '@databricks.com'
  ) AS email,
  'correct' AS email_status
FROM field_manager_summary fms
LEFT JOIN usecase_type_ranked u1
  ON fms.field_manager = u1.field_manager AND u1.rn = 1
LEFT JOIN usecase_type_ranked u2
  ON fms.field_manager = u2.field_manager AND u2.rn = 2
LEFT JOIN usecase_type_ranked u3
  ON fms.field_manager = u3.field_manager AND u3.rn = 3
LEFT JOIN slip_categories sc1
  ON fms.field_manager = sc1.field_manager AND sc1.rn = 1
LEFT JOIN slip_categories sc2
  ON fms.field_manager = sc2.field_manager AND sc2.rn = 2
LEFT JOIN slip_categories sc3
  ON fms.field_manager = sc3.field_manager AND sc3.rn = 3
LEFT JOIN IDENTIFIER(:catalog || '.' || :schema || '.slippage_categories') sc1desc
  ON sc1.slippage_category = sc1desc.type
LEFT JOIN IDENTIFIER(:catalog || '.' || :schema || '.slippage_categories') sc2desc
  ON sc2.slippage_category = sc2desc.type
LEFT JOIN IDENTIFIER(:catalog || '.' || :schema || '.slippage_categories') sc3desc
  ON sc3.slippage_category = sc3desc.type
LEFT JOIN accel_categories ac1
  ON fms.field_manager = ac1.field_manager AND ac1.rn = 1
LEFT JOIN accel_categories ac2
  ON fms.field_manager = ac2.field_manager AND ac2.rn = 2
LEFT JOIN accel_categories ac3
  ON fms.field_manager = ac3.field_manager AND ac3.rn = 3
LEFT JOIN IDENTIFIER(:catalog || '.' || :schema || '.acceleration_categories') ac1desc
  ON ac1.accel_category = ac1desc.type
LEFT JOIN IDENTIFIER(:catalog || '.' || :schema || '.acceleration_categories') ac2desc
  ON ac2.accel_category = ac2desc.type
LEFT JOIN IDENTIFIER(:catalog || '.' || :schema || '.acceleration_categories') ac3desc
  ON ac3.accel_category = ac3desc.type
LEFT JOIN vertical_categories v1
  ON fms.field_manager = v1.field_manager AND v1.rn = 1
LEFT JOIN vertical_categories v2
  ON fms.field_manager = v2.field_manager AND v2.rn = 2
LEFT JOIN vertical_categories v3
  ON fms.field_manager = v3.field_manager AND v3.rn = 3
ORDER BY fms.field_manager;


In [None]:
%sql
-- Create sales manager summary view for executive reporting
-- Aggregates use case metrics, confidence distribution, and top categories by sales manager
CREATE OR REPLACE VIEW IDENTIFIER(:catalog || '.' || :schema || '.sales_manager_summary_view') (
  snapshot_date,
  sales_manager,
  sales_region,
  sales_subregion_level_1,
  sales_subregion_level_2,
  sales_subregion_level_3,
  sales_subregion_level_4,
  total_use_cases,
  total_estimated_monthly_dbu,
  high_confidence_count,
  high_confidence_pct,
  medium_confidence_count,
  medium_confidence_pct,
  low_confidence_count,
  low_confidence_pct,
  slip_usecase_count,
  slip_total_monthly_dbu,
  accel_usecase_count,
  accel_total_monthly_dbu,
  stale_usecase_count,
  stale_total_monthly_dbu,
  u3_stale_usecase_count,
  u3_stale_total_monthly_dbu,
  u5_stale_usecase_count,
  u5_stale_total_monthly_dbu,
  one_blocker_usecase_count,
  one_blocker_total_monthly_dbu,
  azure_medhigh_usecase_count,
  azure_medhigh_total_monthly_dbu,
  threeplus_blocker_usecase_count,
  threeplus_blocker_total_monthly_dbu,
  top1_usecase_type,
  top1_usecase_type_count,
  top1_usecase_type_monthly_dbu,
  top2_usecase_type,
  top2_usecase_type_count,
  top2_usecase_type_monthly_dbu,
  top3_usecase_type,
  top3_usecase_type_count,
  top3_usecase_type_monthly_dbu,
  top1_slippage_category,
  top1_slip_usecase_count,
  top1_slippage_category_description,
  top2_slippage_category,
  top2_slip_usecase_count,
  top2_slippage_category_description,
  top3_slippage_category,
  top3_slip_usecase_count,
  top3_slippage_category_description,
  top1_accel_category,
  top1_accel_usecase_count,
  top1_accel_category_description,
  top2_accel_category,
  top2_accel_usecase_count,
  top2_accel_category_description,
  top3_accel_category,
  top3_accel_usecase_count,
  top3_accel_category_description,
  top1_vertical,
  top1_vertical_usecase_count,
  top2_vertical,
  top2_vertical_usecase_count,
  top3_vertical,
  top3_vertical_usecase_count,
  name,
  surname,
  generated_email,
  email,
  email_status
) AS
WITH sales_manager_summary AS (
  SELECT
    any_value(snapshot_date) as snapshot_date,
    sales_manager,
    any_value(sales_region) as sales_region,
    any_value(sales_subregion_level_1) as sales_subregion_level_1,
    any_value(sales_subregion_level_2) as sales_subregion_level_2,
    any_value(sales_subregion_level_3) as sales_subregion_level_3,
    any_value(sales_subregion_level_4) as sales_subregion_level_4,
    COUNT(*) AS total_use_cases,
    SUM(estimated_monthly_dollar_dbus) AS total_estimated_monthly_dbu,
    SUM(CASE WHEN confidence_level_normalized = 'High' THEN 1 ELSE 0 END) AS high_confidence_count,
    ROUND(100.0 * SUM(CASE WHEN confidence_level_normalized = 'High' THEN 1 ELSE 0 END) / COUNT(*), 2) AS high_confidence_pct,
    SUM(CASE WHEN confidence_level_normalized = 'Medium' THEN 1 ELSE 0 END) AS medium_confidence_count,
    ROUND(100.0 * SUM(CASE WHEN confidence_level_normalized = 'Medium' THEN 1 ELSE 0 END) / COUNT(*), 2) AS medium_confidence_pct,
    SUM(CASE WHEN confidence_level_normalized = 'Low' THEN 1 ELSE 0 END) AS low_confidence_count,
    ROUND(100.0 * SUM(CASE WHEN confidence_level_normalized = 'Low' THEN 1 ELSE 0 END) / COUNT(*), 2) AS low_confidence_pct,
    SUM(CASE WHEN likely_to_slip = TRUE THEN 1 ELSE 0 END) AS slip_usecase_count,
    SUM(CASE WHEN likely_to_slip = TRUE THEN estimated_monthly_dollar_dbus ELSE 0 END) AS slip_total_monthly_dbu,
    SUM(CASE WHEN can_be_accelerated = TRUE THEN 1 ELSE 0 END) AS accel_usecase_count,
    SUM(CASE WHEN can_be_accelerated = TRUE THEN estimated_monthly_dollar_dbus ELSE 0 END) AS accel_total_monthly_dbu,
    SUM(CASE WHEN stage IN ('U3', 'U5') AND last_modified_date < date_sub(current_date(), 14) THEN 1 ELSE 0 END) AS stale_usecase_count,
    SUM(CASE WHEN stage IN ('U3', 'U5') AND last_modified_date < date_sub(current_date(), 14) THEN estimated_monthly_dollar_dbus ELSE 0 END) AS stale_total_monthly_dbu,
    SUM(CASE WHEN stage = 'U3' AND last_modified_date < date_sub(current_date(), 14) THEN 1 ELSE 0 END) AS u3_stale_usecase_count,
    SUM(CASE WHEN stage = 'U3' AND last_modified_date < date_sub(current_date(), 14) THEN estimated_monthly_dollar_dbus ELSE 0 END) AS u3_stale_total_monthly_dbu,
    SUM(CASE WHEN stage = 'U5' AND last_modified_date < date_sub(current_date(), 14) THEN 1 ELSE 0 END) AS u5_stale_usecase_count,
    SUM(CASE WHEN stage = 'U5' AND last_modified_date < date_sub(current_date(), 14) THEN estimated_monthly_dollar_dbus ELSE 0 END) AS u5_stale_total_monthly_dbu,
    SUM(CASE WHEN num_of_blockers = 1 THEN 1 ELSE 0 END) AS one_blocker_usecase_count,
    SUM(CASE WHEN num_of_blockers = 1 THEN estimated_monthly_dollar_dbus ELSE 0 END) AS one_blocker_total_monthly_dbu,
    SUM(CASE WHEN stage IN ('U2', 'U4', 'U5') AND confidence_level_normalized IN ('Medium', 'High') AND lower(target_cloud) LIKE '%azure%' THEN 1 ELSE 0 END) AS azure_medhigh_usecase_count,
    SUM(CASE WHEN stage IN ('U2', 'U4', 'U5') AND confidence_level_normalized IN ('Medium', 'High') AND lower(target_cloud) LIKE '%azure%' THEN estimated_monthly_dollar_dbus ELSE 0 END) AS azure_medhigh_total_monthly_dbu,
    SUM(CASE WHEN num_of_blockers >= 3 THEN 1 ELSE 0 END) AS threeplus_blocker_usecase_count,
    SUM(CASE WHEN num_of_blockers >= 3 THEN estimated_monthly_dollar_dbus ELSE 0 END) AS threeplus_blocker_total_monthly_dbu
  FROM IDENTIFIER(:catalog || '.' || :schema || '.pipelineiq_view')
  GROUP BY sales_manager
),
slip_categories AS (
  SELECT
    sales_manager,
    sc.slippage_category,
    COUNT(*) AS slip_usecase_count,
    ROW_NUMBER() OVER (PARTITION BY sales_manager ORDER BY COUNT(*) DESC) AS rn
  FROM IDENTIFIER(:catalog || '.' || :schema || '.pipelineiq_view') sc
  WHERE likely_to_slip = TRUE
  GROUP BY sales_manager, slippage_category
),
accel_categories AS (
  SELECT
    sales_manager,
    ac.accel_category,
    COUNT(*) AS accel_usecase_count,
    ROW_NUMBER() OVER (PARTITION BY sales_manager ORDER BY COUNT(*) DESC) AS rn
  FROM IDENTIFIER(:catalog || '.' || :schema || '.pipelineiq_view') ac
  WHERE can_be_accelerated = TRUE
  GROUP BY sales_manager, accel_category
),
usecase_type_ranked AS (
  SELECT
    sales_manager,
    usecase_type,
    COUNT(*) AS usecase_type_count,
    SUM(estimated_monthly_dollar_dbus) AS usecase_type_monthly_dbu,
    ROW_NUMBER() OVER (PARTITION BY sales_manager ORDER BY SUM(estimated_monthly_dollar_dbus) DESC) AS rn
  FROM IDENTIFIER(:catalog || '.' || :schema || '.pipelineiq_view')
  GROUP BY sales_manager, usecase_type
),
vertical_categories AS (
  SELECT
    sales_manager,
    vertical,
    COUNT(*) AS vertical_usecase_count,
    ROW_NUMBER() OVER (PARTITION BY sales_manager ORDER BY COUNT(*) DESC) AS rn
  FROM IDENTIFIER(:catalog || '.' || :schema || '.pipelineiq_view')
  GROUP BY sales_manager, vertical
)
SELECT
  sms.snapshot_date,
  sms.sales_manager,
  sms.sales_region,
  sms.sales_subregion_level_1,
  sms.sales_subregion_level_2,
  sms.sales_subregion_level_3,
  sms.sales_subregion_level_4,
  sms.total_use_cases,
  sms.total_estimated_monthly_dbu,
  sms.high_confidence_count,
  sms.high_confidence_pct,
  sms.medium_confidence_count,
  sms.medium_confidence_pct,
  sms.low_confidence_count,
  sms.low_confidence_pct,
  sms.slip_usecase_count,
  sms.slip_total_monthly_dbu,
  sms.accel_usecase_count,
  sms.accel_total_monthly_dbu,
  sms.stale_usecase_count,
  sms.stale_total_monthly_dbu,
  sms.u3_stale_usecase_count,
  sms.u3_stale_total_monthly_dbu,
  sms.u5_stale_usecase_count,
  sms.u5_stale_total_monthly_dbu,
  sms.one_blocker_usecase_count,
  sms.one_blocker_total_monthly_dbu,
  sms.azure_medhigh_usecase_count,
  sms.azure_medhigh_total_monthly_dbu,
  sms.threeplus_blocker_usecase_count,
  sms.threeplus_blocker_total_monthly_dbu,
  coalesce(u1.usecase_type, '') AS top1_usecase_type,
  coalesce(u1.usecase_type_count, 0) AS top1_usecase_type_count,
  coalesce(u1.usecase_type_monthly_dbu, 0) AS top1_usecase_type_monthly_dbu,
  coalesce(u2.usecase_type, '') AS top2_usecase_type,
  coalesce(u2.usecase_type_count, 0) AS top2_usecase_type_count,
  coalesce(u2.usecase_type_monthly_dbu, 0) AS top2_usecase_type_monthly_dbu,
  coalesce(u3.usecase_type, '') AS top3_usecase_type,
  coalesce(u3.usecase_type_count, 0) AS top3_usecase_type_count,
  coalesce(u3.usecase_type_monthly_dbu, 0) AS top3_usecase_type_monthly_dbu,
  coalesce(sc1.slippage_category, '') AS top1_slippage_category,
  coalesce(sc1.slip_usecase_count, 0) AS top1_slip_usecase_count,
  coalesce(sc1desc.description, '') AS top1_slippage_category_description,
  coalesce(sc2.slippage_category, '') AS top2_slippage_category,
  coalesce(sc2.slip_usecase_count, 0) AS top2_slip_usecase_count,
  coalesce(sc2desc.description, '') AS top2_slippage_category_description,
  coalesce(sc3.slippage_category, '') AS top3_slippage_category,
  coalesce(sc3.slip_usecase_count, 0) AS top3_slip_usecase_count,
  coalesce(sc3desc.description, '') AS top3_slippage_category_description,
  coalesce(ac1.accel_category, '') AS top1_accel_category,
  coalesce(ac1.accel_usecase_count, 0) AS top1_accel_usecase_count,
  coalesce(ac1desc.description, '') AS top1_accel_category_description,
  coalesce(ac2.accel_category, '') AS top2_accel_category,
  coalesce(ac2.accel_usecase_count, 0) AS top2_accel_usecase_count,
  coalesce(ac2desc.description, '') AS top2_accel_category_description,
  coalesce(ac3.accel_category, '') AS top3_accel_category,
  coalesce(ac3.accel_usecase_count, 0) AS top3_accel_usecase_count,
  coalesce(ac3desc.description, '') AS top3_accel_category_description,
  coalesce(v1.vertical, '') AS top1_vertical,
  coalesce(v1.vertical_usecase_count, 0) AS top1_vertical_usecase_count,
  coalesce(v2.vertical, '') AS top2_vertical,
  coalesce(v2.vertical_usecase_count, 0) AS top2_vertical_usecase_count,
  coalesce(v3.vertical, '') AS top3_vertical,
  coalesce(v3.vertical_usecase_count, 0) AS top3_vertical_usecase_count,
  split(sms.sales_manager, ' ')[0] AS name,
  split(sms.sales_manager, ' ')[size(split(sms.sales_manager, ' '))-1] AS surname,
  concat(
    lower(split(sms.sales_manager, ' ')[0]), 
    '.', 
    lower(split(sms.sales_manager, ' ')[size(split(sms.sales_manager, ' '))-1]), 
    '@databricks.com'
  ) AS generated_email,
  concat(
    lower(split(sms.sales_manager, ' ')[0]), 
    '.', 
    lower(split(sms.sales_manager, ' ')[size(split(sms.sales_manager, ' '))-1]), 
    '@databricks.com'
  ) AS email,
  'correct' AS email_status
FROM sales_manager_summary sms
LEFT JOIN usecase_type_ranked u1
  ON sms.sales_manager = u1.sales_manager AND u1.rn = 1
LEFT JOIN usecase_type_ranked u2
  ON sms.sales_manager = u2.sales_manager AND u2.rn = 2
LEFT JOIN usecase_type_ranked u3
  ON sms.sales_manager = u3.sales_manager AND u3.rn = 3
LEFT JOIN slip_categories sc1
  ON sms.sales_manager = sc1.sales_manager AND sc1.rn = 1
LEFT JOIN slip_categories sc2
  ON sms.sales_manager = sc2.sales_manager AND sc2.rn = 2
LEFT JOIN slip_categories sc3
  ON sms.sales_manager = sc3.sales_manager AND sc3.rn = 3
LEFT JOIN IDENTIFIER(:catalog || '.' || :schema || '.slippage_categories') sc1desc
  ON sc1.slippage_category = sc1desc.type
LEFT JOIN IDENTIFIER(:catalog || '.' || :schema || '.slippage_categories') sc2desc
  ON sc2.slippage_category = sc2desc.type
LEFT JOIN IDENTIFIER(:catalog || '.' || :schema || '.slippage_categories') sc3desc
  ON sc3.slippage_category = sc3desc.type
LEFT JOIN accel_categories ac1
  ON sms.sales_manager = ac1.sales_manager AND ac1.rn = 1
LEFT JOIN accel_categories ac2
  ON sms.sales_manager = ac2.sales_manager AND ac2.rn = 2
LEFT JOIN accel_categories ac3
  ON sms.sales_manager = ac3.sales_manager AND ac3.rn = 3
LEFT JOIN IDENTIFIER(:catalog || '.' || :schema || '.acceleration_categories') ac1desc
  ON ac1.accel_category = ac1desc.type
LEFT JOIN IDENTIFIER(:catalog || '.' || :schema || '.acceleration_categories') ac2desc
  ON ac2.accel_category = ac2desc.type
LEFT JOIN IDENTIFIER(:catalog || '.' || :schema || '.acceleration_categories') ac3desc
  ON ac3.accel_category = ac3desc.type
LEFT JOIN vertical_categories v1
  ON sms.sales_manager = v1.sales_manager AND v1.rn = 1
LEFT JOIN vertical_categories v2
  ON sms.sales_manager = v2.sales_manager AND v2.rn = 2
LEFT JOIN vertical_categories v3
  ON sms.sales_manager = v3.sales_manager AND v3.rn = 3
ORDER BY sms.sales_manager;


In [0]:
# %sql
# insert into users.sam_lecorre.pipelineiq 
#   SELECT 
#     -- Core identifiers and timestamps
#     usecase_id,
#     last_modified_date,
#     CURRENT_DATE() - interval 1 day AS piq_last_updated,
    
#     -- Fields needed for AI inference (kept for reference)
#     competition_string,
    
#     -- Number of blockers (used in reporting view)
#     num_of_blockers,
    
#     -- AI enriched fields
#     context,
#     one_liner,
#     next_steps_summary,
#     implementation_notes_summary,
#     blockers_summary,
#     usecase_type,
#     business_usecase_type,
    
#     -- Confidence scoring
#     ai_confidence_score_advanced,
#     confidence_score,
#     confidence_level,
#     rationale_for_confidence_score,
#     pain_score,
#     champion_score,
#     implementationplan_score,
#     decisionprocess_score,
#     urgency_score,
#     competitionawareness_score,
#     majorblockers_score,
#     measurableimpact_score,
#     confidence_level_normalized,
    
#     -- Recommendations and classifications
#     next_best_action_recommendation,
#     likely_to_slip,
#     slippage_category,
#     can_be_accelerated,
#     accel_category
#   FROM users.luis_herrera.pipelineiq

In [None]:
%sql
use catalog IDENTIFIER(:catalog);
use schema IDENTIFIER(:schema);
CREATE OR REPLACE TABLE slippage_categories
USING DELTA
AS SELECT * FROM VALUES
  ('Technical', 'Slippage stems from missing features, technical blockers, or unresolved bugs'),
  ('Business', 'Slippage is due to shifting business priorities, unclear value, or deprioritization'),
  ('Stakeholder', 'Slippage results from lack of stakeholder buy-in, engagement, or sponsor turnover'),
  ('Budget', 'Slippage is caused by funding gaps, delayed approvals, or budget freezes'),
  ('Project Timelines', 'Slippage is driven by unrealistic schedules, missed milestones, or dependencies'),
  ('Data', 'Slippage comes from data issues (access, quality, privacy, or integration)'),
  ('Integration', 'Slippage is due to system connectivity, API, or tooling integration problems'),
  ('Partner', 'Slippage originates from partner-side limitations, bandwidth, or enablement gaps'),
  ('Hyperscaler', 'Slippage stems from hyperscaler constraints (region, quota, roadmap, or policies)'),
  ('Competition', 'Slippage is triggered by market shifts or competitive reprioritization'),
  ('External dependencies', 'Slippage comes from external third parties or customer-side dependencies'),
  ('Other', 'Slippage cause unclear')
AS t(type, description);

In [None]:
%sql 
use catalog IDENTIFIER(:catalog);
use schema IDENTIFIER(:schema);
CREATE OR REPLACE TABLE acceleration_categories 
USING DELTA
AS SELECT * FROM VALUES
  ('Implementation Planning', 'Acceleration comes from clarifying the WHAT/WHEN/WHO (critical path, milestones, owners)'),
  ('Resource Allocation', 'The plan exists but we need people/tools/compute NOW'),
  ('Stakeholder Engagement', 'Acceleration is through stakeholder alignment, exec cadence, or clearing decisions'),
  ('Technical Optimization', 'Acceleration is via performance, architecture, or integration tuning'),
  ('Innovation and Technology Enablement', 'Acceleration is via templates, automation, or accelerators (including AI assistants)'),
  ('Governance and Support', 'Acceleration is via operating rhythm (steering, dashboards, SLAs, support processes)'),
  ('Capability Development', 'Acceleration is via training/upskilling/knowledge transfer'),
  ('Partnership Leverage', 'Acceleration comes when a partner''s expertise/assets are the key lever'),
  ('Funding', 'Acceleration n is due to securing or expediting funding, budget approvals, or financial resources'),
  ('Other', 'Unclear or does not fit any other acceleration motion')
AS t(type, description);