In [0]:
SHOW CATALOGS LIKE 'skulytics_dev';

In [0]:
SHOW SCHEMAS IN skulytics_dev;


In [0]:
SELECT *
FROM skulytics_dev.default.dim_product


In [0]:
SELECT *
FROM skulytics_dev.default.fact_retail_summary_tbl


In [0]:
%python
# CELL 1: Imports and Global Helper Functions
# These functions are foundational for all subsequent scoring logic.
# -------------------------------------------------------------------------
import json
import re
from math import floor, ceil

# PySpark Imports required for table creation and persistence (Cell 9)
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from pyspark.sql import Row, SparkSession 

def clean_and_normalize(text):
    """Simple cleaning for keyword matching and word count."""
    if not text:
        return ""
    # Remove punctuation, convert to lowercase, and replace newlines/tabs with spaces
    text = re.sub(r'[^\w\s]', '', text).lower()
    return re.sub(r'\s+', ' ', text).strip()

def word_count(text):
    """Counts words based on whitespace separation."""
    return len(text.split()) if text else 0

print("Cell 1: Imports and Helper Functions defined.")

In [0]:
%python
# CELL 2: Configuration and Business Rules
# Defines all retailer-specific thresholds, keywords, and benefit lexicons.
# -------------------------------------------------------------------------

RETAILER_RULES = {
    'amazon': {
        'title': {'ideal_min': 80, 'ideal_max': 150, 'acceptable_min': 50, 'acceptable_max': 200},
        'description': {'ideal_min_words': 150, 'ideal_max_words': 300, 'acceptable_min_words': 80, 'acceptable_max_words': 400, 'too_short_max': 25, 'way_too_long_min': 400},
        'image_count': {'ideal': 6, 'good_min': 4, 'acceptable_min': 2}
    },
    'walmart': {
        'title': {'ideal_min': 50, 'ideal_max': 75, 'acceptable_min': 40, 'acceptable_max': 100},
        'description': {'ideal_min_words': 50, 'ideal_max_words': 150, 'acceptable_min_words': 25, 'acceptable_max_words': 250, 'too_short_max': 25, 'way_too_long_min': 250},
        'image_count': {'ideal': 4, 'good_min': 3, 'acceptable_min': 2}
    },
    'target': {
        'title': {'ideal_min': 21, 'ideal_max': 100, 'acceptable_min': 1, 'acceptable_max': 150},
        'description': {'ideal_min_words': 50, 'ideal_max_words': 200, 'acceptable_min_words': 25, 'acceptable_max_words': 300, 'too_short_max': 25, 'way_too_long_min': 300},
        'image_count': {'ideal': 4, 'good_min': 4, 'acceptable_min': 2} 
    }
}

CATEGORY_KEYWORDS = {
    ('Appliances', 'Coffee', 'amazon'): ["Coffee Maker", "programmable", "thermal carafe", "12-cup", "auto shut-off", "brew strength"],
    ('Appliances', 'Coffee', 'walmart'): ["coffee maker", "programmable", "stainless steel", "carafe", "auto shut-off"],
    ('Snacks', 'Granola', 'amazon'): ["granola bars", "organic", "oats", "honey", "snack", "gluten-free"],
    ('Snacks', 'Granola', 'walmart'): ["granola bars", "organic", "oats", "honey", "snack", "gluten-free"],
}

PRIMARY_CATEGORY_KEYWORDS = {
    ('Appliances', 'Coffee'): "coffee maker",
    ('Snacks', 'Granola'): "granola bars",
    ('Beauty', 'Skincare'): "night cream",
}

BENEFIT_VERBS = [
    "helps", "supports", "improves", "prevents", "protects", "ideal for",
    "perfect for", "enhances", "reduces", "increases"
]

print("Cell 2: Configuration loaded.")

In [0]:
%python
# CELL 3: Mock Data Setup (Explicitly defined input tables)
# This setup clearly defines the four data sources needed for the scoring engine.
# -------------------------------------------------------------------------

MOCK_CONTENT_TBL = [
    {
        'product_key': 'SKU015',
        'retailer_key': 'amazon',
        'title_text': "Coffee Maker 12 Cup",
        'description_text': "Makes coffee. 12 cup capacity. Brews fast.",
        'bullets_json': json.dumps(["12-cup capacity"]),
        'attributes_json': json.dumps({"capacity_cups": 12, "color": "Black", "material": "Stainless Steel"}),
        'image_count': 2
    },
    {
        'product_key': 'SKU001',
        'retailer_key': 'amazon',
        'title_text': "BrewMaster Supreme 12-Cup Thermal Programmable Coffee Maker with Stainless Steel Accents",
        'description_text': "The BrewMaster Supreme helps you start your day right. This machine supports fast brewing and enhances the flavor of your favorite beans. It protects against spills and prevents over-extraction, ideal for busy mornings. It reduces waste and increases your enjoyment, making it perfect for every kitchen.",
        'bullets_json': json.dumps([f"Feature {i}" for i in range(1, 9)]),
        'attributes_json': json.dumps({"capacity_cups": 12, "color": "Stainless Steel", "model": "Supreme"}),
        'image_count': 7
    },
    {
        'product_key': 'SKU003',
        'retailer_key': 'walmart',
        'title_text': "Organic Oats N' Honey Granola Bars (20 Count)",
        'description_text': "A delicious and healthy snack.",
        'bullets_json': json.dumps(["Organic ingredients", "Gluten-free", "Oats and Honey"]),
        'attributes_json': json.dumps({"flavor": "Honey", "count": 20, "Organic": "organic"}),
        'image_count': 3
    }
]

# Simulates fact_retail_summary_tbl
MOCK_FACT_RETAIL_SUMMARY_TBL = [
    {'product_key': 'SKU015', 'retailer_key': 'amazon', 'avg_rating': 3.5, 'review_count': 15, 'prior_month_review_count': 14},
    {'product_key': 'SKU001', 'retailer_key': 'amazon', 'avg_rating': 4.7, 'review_count': 250, 'prior_month_review_count': 230},
    {'product_key': 'SKU003', 'retailer_key': 'walmart', 'avg_rating': 4.1, 'review_count': 80, 'prior_month_review_count': 85}
]

# Simulates dim_brand
MOCK_DIM_BRAND = [
    {'product_key': 'SKU015', 'brand_name': 'BrewMaster'},
    {'product_key': 'SKU001', 'brand_name': 'BrewMaster'},
    {'product_key': 'SKU003', 'brand_name': 'Oatfield'},
]

# Simulates dim_product
MOCK_DIM_PRODUCT = [
    {'product_key': 'SKU015', 'major': 'Appliances', 'minor': 'Coffee'},
    {'product_key': 'SKU001', 'major': 'Appliances', 'minor': 'Coffee'},
    {'product_key': 'SKU003', 'major': 'Snacks', 'minor': 'Granola'},
]

print("Cell 3: Mock Data initialized explicitly for all four tables.")

In [0]:
%python
# CELL 4: Dimension 1: Title Score Logic (titlescore)
# -------------------------------------------------------------------------

def score_title(title_text, retailer_key, brand_name, category, attributes_json):
    """Dimension 1: Title Score (titlescore)"""
    title_text = title_text or ""
    rules = RETAILER_RULES.get(retailer_key, {})['title']
    
    # 1. Length compliance (0–40 pts)
    length = len(title_text)
    length_pts = 40 if rules['ideal_min'] <= length <= rules['ideal_max'] else (20 if rules['acceptable_min'] <= length <= rules['acceptable_max'] else 0)
    
    # 2. Brand presence (0–20 pts)
    brand_pts = 20 if brand_name and clean_and_normalize(brand_name) in clean_and_normalize(title_text) else 0
    
    # 3. Product category/type keyword (0–20 pts)
    category_pts = 0
    primary_keyword = PRIMARY_CATEGORY_KEYWORDS.get(category)
    if primary_keyword and primary_keyword in clean_and_normalize(title_text):
        category_pts = 20
    
    # 4. Key attributes present (0–20 pts)
    attribute_pts = 0
    matched_attributes_count = 0
    missing_attributes_list = []
    
    clean_title = clean_and_normalize(title_text)
    
    try:
        attributes = json.loads(attributes_json)
        for attr_key, attr_value in attributes.items():
            
            search_key = ""
            
            # --- FIX: Handle Boolean/True/False Attributes by searching for the KEY ---
            if isinstance(attr_value, bool) or str(attr_value).lower() in ['true', 'false']:
                # For boolean attributes (e.g., 'organic'), search for the key 'organic'
                search_key = attr_key.lower().replace('_', ' ')
            else:
                # For standard attributes (e.g., color, capacity), search for the value '12' or 'black'
                search_key = str(attr_value).lower().replace('-', ' ')
            # --- END FIX ---
            
            if search_key and search_key in clean_title:
                matched_attributes_count += 1
            else:
                missing_attributes_list.append(attr_key)
                
        if matched_attributes_count >= 2:
            attribute_pts = 20
        elif matched_attributes_count == 1:
            attribute_pts = 10
        
    except json.JSONDecodeError:
        print(f"Warning: Failed to parse attributes_json for {retailer_key}.")
        
    titlescore = min(100, length_pts + brand_pts + category_pts + attribute_pts)
    
    # Diagnostics
    diag = {
        'title_length': length,
        'title_length_pts': length_pts,
        'title_brand_pts': brand_pts,
        'title_category_pts': category_pts,
        'title_attribute_pts': attribute_pts,
        'missing_attributes_list': ", ".join(missing_attributes_list)
    }
    
    return titlescore, diag

print("Cell 4: Title Score logic defined.")

In [0]:
%python
# CELL 5: Dimension 2: Description Score Logic (descscore)
# Measures word count compliance, bullet point structure, and benefit language.
# -------------------------------------------------------------------------

def score_description(description_text, bullets_json, retailer_key):
    """Dimension 2: Description Score (descscore)"""
    description_text = description_text or ""
    rules = RETAILER_RULES.get(retailer_key, {})['description']
    
    # 1. Word count (0–40 pts)
    word_ct = word_count(description_text)
    length_pts = 0
    if rules['ideal_min_words'] <= word_ct <= rules['ideal_max_words']:
        length_pts = 40
    elif rules['acceptable_min_words'] <= word_ct <= rules['acceptable_max_words']:
        length_pts = 25
    elif 0 < word_ct < rules['acceptable_min_words']:
        length_pts = 10 # Too Short (but not empty)
    
    # 2. Bullet structure (0–30 pts)
    bullets_pts = 0
    bullet_count = 0
    try:
        bullets = json.loads(bullets_json)
        bullet_count = len(bullets) if isinstance(bullets, list) else 0
        if bullet_count >= 8:
            bullets_pts = 30
        elif bullet_count >= 5:
            bullets_pts = 25
        elif bullet_count >= 2:
            bullets_pts = 15
    except json.JSONDecodeError:
        pass
        
    # 3. Benefit language (0–30 pts)
    benefit_pts = 0
    matched_benefits = set()
    normalized_desc = clean_and_normalize(description_text)
    for verb in BENEFIT_VERBS:
        if verb in normalized_desc:
            matched_benefits.add(verb)
            
    distinct_benefit_count = len(matched_benefits)
    if distinct_benefit_count >= 5:
        benefit_pts = 30
    elif distinct_benefit_count >= 3:
        benefit_pts = 25
    elif distinct_benefit_count >= 1:
        benefit_pts = 15
        
    descscore = min(100, length_pts + bullets_pts + benefit_pts)
    
    # Diagnostics
    diag = {
        'desc_word_count': word_ct,
        'desc_bullets_count': bullet_count, # Internal diagnostic
        'desc_benefits_count': distinct_benefit_count, # Internal diagnostic
    }
    
    return descscore, diag

print("Cell 5: Description Score logic defined.")

In [0]:
%python
# CELL 6: Dimension 3: Image Score Logic (imagescore)
# Measures image count compliance against retailer expectations and variety proxy.
# -------------------------------------------------------------------------

def score_images(image_count, retailer_key):
    """Dimension 3: Image Score (imagescore)"""
    
    # 1. Image count (0–70 pts)
    count_pts = 0
    
    if retailer_key == 'amazon':
        if image_count >= 6: count_pts = 70
        elif image_count >= 4: count_pts = 55
        elif image_count >= 2: count_pts = 35
    
    elif retailer_key == 'walmart':
        if image_count >= 4: count_pts = 70
        elif image_count == 3: count_pts = 55
        elif image_count == 2: count_pts = 35

    elif retailer_key == 'target':
        if image_count >= 4 and image_count <= 6: count_pts = 70
        elif image_count == 3: count_pts = 55 
        elif image_count == 2: count_pts = 35
    
    # 2. Variety proxy (0–30 pts)
    variety_pts = 0
    if image_count >= 4:
        variety_pts = 30
    elif image_count == 3:
        variety_pts = 20
    elif image_count == 2:
        variety_pts = 10
        
    imagescore = min(100, count_pts + variety_pts)
    
    # Diagnostics
    diag = {'image_count': image_count}
    
    return imagescore, diag

print("Cell 6: Image Score logic defined.")

In [0]:
%python
# CELL 7: Dimension 4: Keyword Score Logic (keywordscore)
# -------------------------------------------------------------------------

def score_keywords(title_text, description_text, bullets_json, category, retailer_key):
    """Dimension 4: Keyword Score (keywordscore)"""
    
    # FIX: Concatenate the category tuple with the retailer_key to form a flat 3-item tuple key
    if category and retailer_key:
        lookup_key = category + (retailer_key,)
    else:
        lookup_key = None
        
    target_keywords = CATEGORY_KEYWORDS.get(lookup_key)
    
    if not target_keywords:
        return 0, {'keyword_coverage_pct': 0.0, 'matched_keywords': 0}
        
    # Combine content
    combined_content = [title_text, description_text]
    try:
        bullets = json.loads(bullets_json)
        if isinstance(bullets, list):
            combined_content.extend(bullets)
    except:
        pass
    
    searchable_string = clean_and_normalize(" ".join(filter(None, combined_content)))
    
    # Count matches
    total_target = len(target_keywords)
    matched_keywords = set()
    
    for keyword in target_keywords:
        # FIX: Normalize the individual target keyword before checking against the content.
        normalized_keyword = clean_and_normalize(keyword)
        
        if normalized_keyword and normalized_keyword in searchable_string:
            matched_keywords.add(keyword)
            
    matched_count = len(matched_keywords)
    coverage_pct = (matched_count / total_target) * 100
    
    # Map to score
    if coverage_pct >= 80:
        keywordscore = 100
    elif coverage_pct >= 60:
        keywordscore = 75
    elif coverage_pct >= 40:
        keywordscore = 50
    elif coverage_pct >= 20:
        keywordscore = 30
    else:
        keywordscore = 10
        
    # Diagnostics
    diag = {
        'keyword_coverage_pct': round(coverage_pct, 1),
        'matched_keywords': matched_count # Internal diagnostic
    }
    
    return keywordscore, diag

print("Cell 7: Keyword Score logic defined.")

In [0]:
%python
# CELL 8: Dimensions 5 & 6: Rating and Review Score Logic (ratingscore, reviewscore)
# Measures customer satisfaction (rating) and social proof (review volume/recency).
# -------------------------------------------------------------------------

def score_rating(avg_rating):
    """Dimension 5: Rating Score (ratingscore)"""
    if avg_rating is None:
        return 0, {}

    raw_score = (avg_rating / 5.0) * 100

    if avg_rating >= 4.5:
        bucketed_score = raw_score 
    elif avg_rating >= 4.0:
        range_0_1 = (avg_rating - 4.0) / 0.4
        bucketed_score = 80 + (range_0_1 * 9)
    elif avg_rating >= 3.1:
        range_0_1 = (avg_rating - 3.1) / 0.8
        bucketed_score = 60 + (range_0_1 * 19)
    else: # 0.0 - 3.0
        bucketed_score = min(raw_score, 60)
        
    ratingscore = round(bucketed_score)
    
    return ratingscore, {}

def score_reviews(review_count, prior_month_review_count):
    """Dimension 6: Review Score (reviewscore)"""
    
    # 1. Volume (0–80 pts)
    volume_pts = 0
    if review_count >= 200:
        volume_pts = 80
    elif review_count >= 50:
        volume_pts = 60
    elif review_count >= 10:
        volume_pts = 30
    elif review_count > 0:
        volume_pts = 10
        
    # 2. Recency / momentum (0–20 pts)
    recency_pts = 10 # Default
    if prior_month_review_count is not None:
        if review_count >= prior_month_review_count * 1.10:
            recency_pts = 20 # 10%+ growth
        elif review_count < prior_month_review_count * 0.90:
            recency_pts = 0 # Declining
        
    reviewscore = min(100, volume_pts + recency_pts)
    
    return reviewscore, {'prior_month_review_count': prior_month_review_count} # Internal diagnostic

print("Cell 8: Rating and Review Score logic defined.")

In [0]:
%python
# CELL 9: Overall Scoring Engine and Validation (Updated Lookups)
# Combines all subscores into the weighted contentscore and runs the test data.
# -------------------------------------------------------------------------

def calculate_content_score(product_content, performance_data, brand_name, category):
    """
    Core engine to calculate all 7 dimension scores and overall Content Score.
    """
    retailer_key = product_content['retailer_key']
    
    # Initialize master results dictionary
    results = {
        'product_key': product_content['product_key'],
        'retailer_key': retailer_key,
        'titlescore': 0, 'descscore': 0, 'imagescore': 0, 
        'keywordscore': 0, 'ratingscore': 0, 'reviewscore': 0, 
        'contentscore': 0, 
        'title_length': 0, 'desc_word_count': 0, 'image_count': 0, 
        'keyword_coverage_pct': 0.0, 'missing_attributes_list': ""
    }
    
    # Execute Dimension Scoring
    ts, ts_diag = score_title(product_content['title_text'], retailer_key, brand_name, category, product_content['attributes_json'])
    results['titlescore'] = ts
    results.update(ts_diag)
    
    ds, ds_diag = score_description(product_content['description_text'], product_content['bullets_json'], retailer_key)
    results['descscore'] = ds
    results.update(ds_diag)
    
    iscore, is_diag = score_images(product_content['image_count'], retailer_key)
    results['imagescore'] = iscore
    results.update(is_diag)
    
    ks, ks_diag = score_keywords(product_content['title_text'], product_content['description_text'], product_content['bullets_json'], category, retailer_key)
    results['keywordscore'] = ks
    results.update(ks_diag)
    
    rs, _ = score_rating(performance_data.get('avg_rating'))
    results['ratingscore'] = rs
    
    revs, revs_diag = score_reviews(performance_data.get('review_count', 0), performance_data.get('prior_month_review_count'))
    results['reviewscore'] = revs
    
    # D7: Overall Content Score (contentscore) - Weighted Average
    contentscore = round(
        0.25 * results['titlescore'] + 
        0.20 * results['descscore'] + 
        0.20 * results['imagescore'] + 
        0.15 * results['keywordscore'] + 
        0.10 * results['ratingscore'] + 
        0.10 * results['reviewscore']
    )
    results['contentscore'] = contentscore
    
    # Clean up diagnostics for final output table
    final_results = {k: v for k, v in results.items() if k not in ['title_length_pts', 'title_brand_pts', 'title_category_pts', 'title_attribute_pts', 'desc_bullets_count', 'desc_benefits_count', 'prior_month_review_count', 'matched_keywords']}
    
    return final_results

# ------------------------------------------------------
# Output Table Schema Definition and Name
# ------------------------------------------------------

OUTPUT_TABLE_NAME = "skulytics_dev.default.product_content_scores_tbl"

# Define the schema for the output table
OUTPUT_SCHEMA = StructType([
    StructField("product_key", StringType(), False),
    StructField("retailer_key", StringType(), False),
    StructField("datekey", StringType(), False),
    StructField("titlescore", IntegerType(), False),
    StructField("descscore", IntegerType(), False),
    StructField("imagescore", IntegerType(), False),
    StructField("keywordscore", IntegerType(), False),
    StructField("ratingscore", IntegerType(), False),
    StructField("reviewscore", IntegerType(), False),
    StructField("contentscore", IntegerType(), False),
    StructField("title_length", IntegerType(), True),
    StructField("desc_word_count", IntegerType(), True),
    StructField("image_count", IntegerType(), True),
    StructField("keyword_coverage_pct", DoubleType(), True),
    StructField("missing_attributes_list", StringType(), True),
])


def run_validation():
    """Runs the scoring engine on mock data, prints the results, and writes to a table."""
    print("--- Content Quality Scoring Engine Results ---")
    print("--- (Schema: product_content_scores_tbl) ---\n")
    
    all_scores_data = []
    
    # Get the list of field names in the correct schema order to ensure positional data mapping
    schema_field_names = [f.name for f in OUTPUT_SCHEMA.fields]
    
    # ITERATE OVER THE PRIMARY CONTENT TABLE
    for content in MOCK_CONTENT_TBL:
        sku = content['product_key']
        retailer = content['retailer_key']
        
        # 1. Lookup Performance Data (simulates JOIN with fact_retail_summary_tbl)
        perf_data = next((p for p in MOCK_FACT_RETAIL_SUMMARY_TBL if p['product_key'] == sku and p['retailer_key'] == retailer), {})
        
        # 2. Lookup Brand Name (simulates JOIN with dim_brand)
        brand_name = next((d['brand_name'] for d in MOCK_DIM_BRAND if d['product_key'] == sku), None)
        
        # 3. Lookup Category Data (simulates JOIN with dim_product)
        # Note: Category is passed as a tuple (major, minor)
        category_data = next(((d['major'], d['minor']) for d in MOCK_DIM_PRODUCT if d['product_key'] == sku), (None, None))
        
        # EXECUTE THE CORE SCORING ENGINE
        score_data = calculate_content_score(
            product_content=content,
            performance_data=perf_data,
            brand_name=brand_name,
            category=category_data
        )
        
        # Add datekey and store results
        score_data['datekey'] = '2025-12-01'
        all_scores_data.append(score_data)
        
        # Print results in a readable format (for notebook output)
        print(f"[{sku} @ {retailer.upper()}]")
        print("  Subscores:")
        print(f"    Title Score:    {score_data['titlescore']:>3} (L{score_data.get('title_length_pts', 0)} + B{score_data.get('title_brand_pts', 0)} + C{score_data.get('title_category_pts', 0)} + A{score_data.get('title_attribute_pts', 0)})")
        print(f"    Description:    {score_data['descscore']:>3}")
        print(f"    Image Score:    {score_data['imagescore']:>3}")
        print(f"    Keyword Score:  {score_data['keywordscore']:>3}")
        print(f"    Rating Score:   {score_data['ratingscore']:>3}")
        print(f"    Review Score:   {score_data['reviewscore']:>3}")
        print(f"  --> OVERALL SCORE:  {score_data['contentscore']:>3}")
        print("  Diagnostics:")
        print(f"    Title Length: {score_data['title_length']} chars")
        print(f"    Desc Word Count: {score_data['desc_word_count']}")
        print(f"    Image Count: {score_data['image_count']}")
        print(f"    Keyword Coverage: {score_data['keyword_coverage_pct']}%")
        print(f"    Missing Attributes: {score_data['missing_attributes_list'] or 'None'}")
        print("-" * 40)

    # ------------------------------------------------------
    # NEW LOGIC: Create DataFrame and Write to Table
    # ------------------------------------------------------
    
    if all_scores_data:
        print("\n--- Persistence: Writing Results to Databricks Catalog ---")
        
        try:
            # 1. Convert Python dictionaries to list of values, enforcing schema order
            rows_of_values = []
            for d in all_scores_data:
                # Retrieve values in the exact order of OUTPUT_SCHEMA fields
                row_values = [d.get(name) for name in schema_field_names]
                rows_of_values.append(row_values)
            
            # 2. Create Spark DataFrame (using the list of values and the explicit schema)
            df_scores = spark.createDataFrame(rows_of_values, schema=OUTPUT_SCHEMA)
            
            # 3. Write DataFrame to the Delta Table (creating or overwriting it)
            df_scores.write \
                .mode("overwrite") \
                .option("mergeSchema", "true") \
                .saveAsTable(OUTPUT_TABLE_NAME)
                
            print(f"Successfully wrote {len(all_scores_data)} records to table: {OUTPUT_TABLE_NAME}")
            print(f"You can verify the table content using SQL: SELECT * FROM {OUTPUT_TABLE_NAME}")
            
        except NameError:
            print("ERROR: 'spark' session is not defined. Cannot write to table.")
            print("This script is ready for PySpark, but needs to be run inside a connected Databricks cluster.")
        except Exception as e:
            print(f"ERROR during table write: {e}")

    if all_scores_data:
        print("\n--- Final Output Table (product_content_scores_tbl) Structure Example ---")
        example_row = all_scores_data[0]
        print(json.dumps(example_row, indent=2))
        
run_validation()

In [0]:
select * from skulytics_dev.default.product_content_scores_tbl