In [0]:
import ast
import calendar
import pandas as pd
from datetime import datetime, timedelta
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, row_number, lit, lower, to_timestamp
from pyspark.sql.window import Window
from yipit_databricks_utils.helpers.gsheets import read_gsheet
from concurrent.futures import ThreadPoolExecutor, as_completed
import traceback
from yipit_databricks_utils import send_email
import asyncio

In [0]:
%run "/Workspace/Users/pfisch@yipitdata.com/corporate_transformation_blueprints/corporate_transformation_blueprints/retail_analytics_platform/freeport/module_template_function"

In [0]:
def execute_with_error_handling(func, func_name, *args, **kwargs):
    """
    Wrapper function to execute a function with error handling

    Args:
        func: Function to execute
        func_name: Name of the function (for logging)
        *args: Positional arguments for the function
        **kwargs: Keyword arguments for the function

    Returns:
        tuple: (success: bool, result: any, error: str)
    """
    try:
        print(f"🔄 Starting: {func_name} : {args}")
        result = func(*args, **kwargs)
        print(f"✅ Completed: {func_name} : {args}")
        return (True, result, None)
    except Exception as e:
        error_msg = f"❌ Error in {func_name}: {str(e)}\n{traceback.format_exc()}"
        print(error_msg)
        return (False, None, error_msg)

In [0]:
async def freeport_all_tables(demo_name, sandbox_schema, prod_schema, sol_owner, special_attribute_column_original, market_share=True, shopper_insights=True, pro_module=True, pricing_n_promo=True, max_workers=4):
    results = {}

    # ============================================
    # PHASE 2: Parallel Execution of Independent Modules
    # These can run in parallel
    # ============================================
    print("=" * 80)
    print("🚀 PHASE 1: Running Parallel Modules")
    print("=" * 80)

    parallel_tasks = []
    print("🚀 erro?1")
    if special_attribute_column_original == True:
        special_attribute_column_original = []
    print(special_attribute_column_original)
    special_attribute_column = special_attribute_column_original
    special_attribute_column.append("null")


    for column in special_attribute_column:
        freeport_module(sandbox_schema, prod_schema, demo_name, "market_share_standard_calendar", use_sampling=False, sample_fraction=0.01, column=column)

        module_type =  "market_share_nrf_calendar"
        freeport_module(sandbox_schema, prod_schema, demo_name, module_type, use_sampling=False, sample_fraction=0.01, column=column)

        module_type =  "market_share_for_column"
        freeport_module(sandbox_schema, prod_schema, demo_name, module_type, use_sampling=False, sample_fraction=0.01, column=column)

    # freeport_module(sandbox_schema, prod_schema, demo_name, "geographic_analysis")
    # Collect tasks that can run in parallel

    # ============================================================================
    # CORE TABLES
    # ============================================================================
    # Foundation tables used across all analysis modules

    parallel_tasks.append({
        "func": freeport_module,
        "name": "filter_items",
        "args": (sandbox_schema, prod_schema, demo_name, "filter_items")})

    parallel_tasks.append({
        "func": freeport_module,
        "name": "client_specs",
        "args": (sandbox_schema, prod_schema, demo_name, "client_specs")})

    parallel_tasks.append({
        "func": freeport_module,
        "name": "panel_stats",
        "args": (sandbox_schema, prod_schema, demo_name, "panel_stats")})
        
    parallel_tasks.append({
        "func": freeport_module,
        "name": "sample_size_guardrail",
        "args": (sandbox_schema, prod_schema, demo_name,"sample_size_guardrail")})


    # ============================================================================
    # ANALYSIS MODULES
    # ============================================================================

    # ----------------------------------------------------------------------------
    # GEO - Geographic Analysis
    # ----------------------------------------------------------------------------

    parallel_tasks.append({
        "func": freeport_module,
        "name": "geographic_analysis",
        "args": (sandbox_schema, prod_schema, demo_name, "geographic_analysis")})

    # ----------------------------------------------------------------------------
    # MARKET SHARE
    # ----------------------------------------------------------------------------
    # Note: Market Share tables with LOOP are excluded from this config
    # They require special handling for dynamic column generation:
    #   - market_share_for_column (with LOOP)
    #   - market_share_for_column_nrf (with LOOP)
    #   - market_share_for_column_std (with LOOP)


    # ----------------------------------------------------------------------------
    # PRO INSIGHTS
    # ----------------------------------------------------------------------------
    if pro_module == True:
        parallel_tasks.append({
            "func": freeport_module,
            "name": "pro_insights",
            "args": (sandbox_schema, prod_schema, demo_name, "pro_insights")})


    # ----------------------------------------------------------------------------
    # PRODUCT ANALYSIS (SKU)
    # ----------------------------------------------------------------------------

    parallel_tasks.append({
        "func": freeport_module,
        "name": "sku_analysis",
        "args": (sandbox_schema, prod_schema, demo_name, "sku_analysis")})

    parallel_tasks.append({
        "func": freeport_module,
        "name": "sku_time_series",
        "args": (sandbox_schema, prod_schema, demo_name, "sku_time_series")})

    parallel_tasks.append({
        "func": freeport_module,
        "name": "sku_detail",
        "args": (sandbox_schema, prod_schema, demo_name, "sku_detail")})


    # ----------------------------------------------------------------------------
    # RET LEAKAGE (Retail Leakage Analysis)
    # ----------------------------------------------------------------------------

    if shopper_insights == True:
        parallel_tasks.append({
            "func": freeport_module,
            "name": "leakage_users",
            "args": (sandbox_schema, prod_schema, demo_name, "leakage_users")})

        parallel_tasks.append({
            "func": freeport_module,
            "name": "leakage_retailer",
            "args": (sandbox_schema, prod_schema, demo_name, "leakage_retailer")})

        parallel_tasks.append({
            "func": freeport_module,
            "name": "category_closure",
            "args": (sandbox_schema, prod_schema, demo_name, "category_closure")})

        parallel_tasks.append({
            "func": freeport_module,
            "name": "leakage_product",
            "args": (sandbox_schema, prod_schema, demo_name, "leakage_product")})

        parallel_tasks.append({
            "func": freeport_module,
            "name": "market_share",
            "args": (sandbox_schema, prod_schema, demo_name, "market_share")})

    # ----------------------------------------------------------------------------
    # SHOPPER INSIGHTS
    # ----------------------------------------------------------------------------
    if shopper_insights == True:
        parallel_tasks.append({
            "func": freeport_module,
            "name": "shopper_filter_items",
            "args": (sandbox_schema, prod_schema, demo_name,  "shopper_filter_items")})

    # ----------------------------------------------------------------------------
    # TARIFFS
    # ----------------------------------------------------------------------------

    if pricing_n_promo == True:
        parallel_tasks.append({
            "func": freeport_module,
            "name": "tariffs_month_grouping",
            "args": (sandbox_schema, prod_schema, demo_name, "tariffs_month_grouping")})

        parallel_tasks.append({
            "func": freeport_module,
            "name": "tariffs_month_stable_products_list",
            "args": (sandbox_schema, prod_schema, demo_name, "tariffs_month_stable_products_list")})

        parallel_tasks.append({
            "func": freeport_module,
            "name": "tariffs_month_product_filled_prices",
            "args": (sandbox_schema, prod_schema, demo_name, "tariffs_month_product_filled_prices")})

        parallel_tasks.append({
            "func": freeport_module,
            "name": "tariffs_week_grouping",
            "args": (sandbox_schema, prod_schema, demo_name, "tariffs_week_grouping")})

        parallel_tasks.append({
            "func": freeport_module,
            "name": "tariffs_week_stable_products_list",
            "args": (sandbox_schema, prod_schema, demo_name, "tariffs_week_stable_products_list")})

        parallel_tasks.append({
            "func": freeport_module,
            "name": "tariffs_week_product_filled_prices",
            "args": (sandbox_schema, prod_schema, demo_name, "tariffs_week_product_filled_prices")})

    # Execute tasks in parallel
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_task = {
            executor.submit(
                execute_with_error_handling,
                task["func"],
                task["name"],
                *task["args"]
            ): task["name"]
            for task in parallel_tasks
        }

        for future in as_completed(future_to_task):
            task_name = future_to_task[future]
            success, result, error = future.result()
            results[task_name] = {"success": success, "error": error}

    # ============================================
    # Summary Report
    # ============================================

    summary = []
    summary.append
    summary.append("=" * 80)
    summary.append(f"📈 EXECUTION SUMMARY: {demo_name}")
    summary.append("=" * 80)

    total_tasks = len(results)
    successful_tasks = sum(1 for r in results.values() if r["success"])
    failed_tasks = total_tasks - successful_tasks

    summary.append(f"✅ Successful: {successful_tasks}/{total_tasks}")
    summary.append(f"❌ Failed: {failed_tasks}/{total_tasks}")
    summary.append("\n")

    if failed_tasks > 0:
        email_title = f"PROBLEM IN FREEPORT : {demo_name}"
        print(email_title)
        recipient_emails = ["pfisch@yipitdata.com"]
        recipient_emails.append(sol_owner)

        summary.append("❌ Failed Modules:")
        for module, result in results.items():
            if not result["success"]:
                summary.append(f"  - {module}")
                if result["error"]:
                    summary.append(f"    Error: {result['error'][:200]}...")
    else:
        email_title = f"success in freeport : {demo_name}"
        print(email_title)
        recipient_emails = ["pfisch@yipitdata.com"]
        summary.append("hash_for_email_filtering : aRiSb8fh8GXsg1upJqQ92P4SPwFmS8IIC49vVKQS01X3cPio02")

    summary.append("=" * 80)

    body = ""
    for i in summary:
        body = body + i + "<br />"

    send_email(subject=email_title,\
            body = body,\
            to_emails=recipient_emails)

In [0]:
demo_name = "testesteelauder_v38"
sandbox_schema = "ydx_internal_analysts_sandbox"
prod_schema = "ydx_internal_analysts_gold"
sol_owner = "pfisch+solowner@gmail.com"
special_attribute_column_original = []
market_share=True
shopper_insights=True
pro_module=True
pricing_n_promo=True
max_workers=4

#module_type =  "market_share_standard_calendar"
#column = "null"

In [0]:
await freeport_all_tables(demo_name, sandbox_schema, prod_schema, sol_owner, special_attribute_column_original, market_share=True, shopper_insights=True, pro_module=True, pricing_n_promo=True, max_workers=4)