In [None]:
!pip install polars sentence-transformers

In [None]:
print("--- Installing RAPIDS cuML for GPU Acceleration ---")
!pip install cudf-cu12 --extra-index-url=https://pypi.nvidia.com
!pip install cuml-cu12 --extra-index-url=https://pypi.nvidia.com

In [None]:
# --- COLAB SETUP ---
# Install the required libraries in the Colab environment
#!pip install polars sentence-transformers hdbscan scikit-learn

import polars as pl
import os
from tqdm import tqdm
from sentence_transformers import SentenceTransformer
from sklearn.cluster import HDBSCAN
import hdbscan # Explicitly import hdbscan if needed, though sklearn.cluster is used for the HDBSCAN model

# --- Import RAPIDS Libraries ---
import cudf
from cuml.cluster import HDBSCAN as cuML_HDBSCAN
# We will rename the cuML HDBSCAN to avoid conflicts
# --- CONFIGURATION PATHS (Colab Edition) ---

# IMPORTANT: You must upload the file 'african_questions_TRANSLATED_SAVED.parquet'
# to your Colab session. The path below assumes the file is in the root directory.
INPUT_FILE_TRANSLATED = 'african_questions_TRANSLATED_SAVED.parquet'

# Output directory for the final clustered file
OUTPUT_FOLDER = './translated_data'
os.makedirs(OUTPUT_FOLDER, exist_ok=True)
OUTPUT_FILE_CLUSTERED = os.path.join(OUTPUT_FOLDER, 'african_questions_CLUSTERED.parquet')

# --- PROCESSING PARAMETERS ---
# HDBSCAN parameters set for robustness and meaningful clusters
MIN_CLUSTER_SIZE = 20 # Minimum size of a cluster (recommended for initial stability)
MIN_SAMPLES = 5       # How conservative the clustering should be

# --- MAIN EXECUTION ---
print("--- STAGE: Thematic Clustering ---")

if not os.path.exists(INPUT_FILE_TRANSLATED):
    print(f"FATAL: Translated data not found at {INPUT_FILE_TRANSLATED}.")
    print("Please ensure you have uploaded 'african_questions_TRANSLATED_SAVED.parquet' to this Colab environment.")
else:
    # 1. Load the complete translated dataset
    print(f"Loading complete translated dataset from: {INPUT_FILE_TRANSLATED}")
    # We only need the English content column and any other existing columns (like detected_language)
    df_translated = pl.read_parquet(INPUT_FILE_TRANSLATED)

    # 2. Initialize the Sentence Transformer model for generating semantic embeddings
    # 'all-MiniLM-L6-v2' provides a good balance of speed and semantic quality.
    embedder = SentenceTransformer('all-MiniLM-L6-v2')
    print("Sentence Transformer Model loaded for embedding.")

    # 3. Generate Semantic Embeddings
    print("Generating Semantic Embeddings...")
    # Ensure we are using the English column for embedding
    questions_en = df_translated['question_content_en'].to_list()
    # The progress bar is useful for tracking this computationally heavy step
    embeddings_np = embedder.encode(questions_en, show_progress_bar=True)


# CRITICAL STEP: Convert NumPy array to cuDF DataFrame for GPU processing
    embeddings_gpu = cudf.DataFrame(embeddings_np)
    print(f"Embeddings converted to GPU-accelerated cuDF DataFrame ({embeddings_gpu.shape}).")


    # 4. Perform HDBSCAN Clustering using cuML (GPU)
    print(f"Performing HDBSCAN Clustering using cuML (min_cluster_size={MIN_CLUSTER_SIZE})...")

    # Initialize the cuML HDBSCAN model
    clusterer_gpu = cuML_HDBSCAN(
        min_cluster_size=MIN_CLUSTER_SIZE,
        min_samples=MIN_SAMPLES,
        metric='euclidean' # cuML is highly optimized for this distance metric
        # No need for n_jobs=-1, as cuML uses the entire GPU by default.
    )

    # Fit the clusterer and get the cluster assignments (IDs)
    clusterer_gpu.fit(embeddings_gpu)
    cluster_ids_gpu = clusterer_gpu.labels_.to_numpy() # Convert GPU results back to NumPy


    # 5. Add the new cluster ID column to the DataFrame
    df_translated = df_translated.with_columns(
        pl.Series(name="cluster_id", values=cluster_ids_gpu)
    )


    # 6. Save the results
    df_translated.write_parquet(OUTPUT_FILE_CLUSTERED, compression='zstd')
    print(f"\n✅ GPU Clustering complete. Results saved to: {OUTPUT_FILE_CLUSTERED}")

    # --- Final Output Summary ---
    print("\n--- Summary of Top 5 Largest Clusters (Thematic Needs) ---")
    # Cluster ID -1 represents noise (unclassifiable questions)
    cluster_counts = df_translated.filter(pl.col("cluster_id") != -1) \
                                  .group_by('cluster_id') \
                                  .count() \
                                  .sort('count', descending=True) \
                                  .head(5)

    print(cluster_counts)

    # Next step reminder
    print("\nNext step: Run the 'financial_intent_mapper.py' (in memory) to analyze these clusters.")

    print(cluster_counts)

    # Next step reminder
    print("\nNext step: Run the 'financial_intent_mapper.py' (in memory) to analyze these clusters.")

In [None]:

# --- CONFIGURATION ---
CLUSTERED_DATA_PATH = './translated_data/african_questions_CLUSTERED.parquet'
OUTPUT_TAXONOMY_PATH = './financial_taxonomy_SYNC.json'
SAMPLE_SIZE_PER_CLUSTER = 10
# *** USER REQUESTED CHANGE: Increased minimum cluster size ***
MIN_CLUSTER_COUNT_FOR_SEPARATE_ANALYSIS = 200
VIRTUAL_CLUSTER_ID = -999

# --- IMPORTANT: API KEY (Single key for synchronous requests) ---
# We will rely only on the first provided key from the environment variable.
API_KEY = os.environ.get("GEMINI_API_KEY", "AIzaSyDLkx8f7CAkx7Sw1tcPt8v3B_nbl2LVxsI") # <-- REPLACE THIS STRING

MODEL_NAME = "gemini-2.5-flash-preview-09-2025"
# *** ENDPOINT MODIFIED BACK TO SINGLE generateContent ***
BASE_API_URL = f"https://generativelanguage.googleapis.com/v1beta/models/{MODEL_NAME}:generateContent?key="

# --- LLM UTILITY FUNCTIONS ---

def create_single_request_payload(cluster_id, sample_questions):
    """
    Creates the JSON payload for a single cluster analysis.
    """
    system_prompt = (
        "You are a Senior Financial Product Analyst for a non-profit serving smallholder farmers. "
        "Your task is to analyze a sample of farmer questions from a single thematic cluster and assign a primary financial need "
        "based on the underlying problem. Use only the provided JSON schema for output."
    )

    if cluster_id == VIRTUAL_CLUSTER_ID:
        cluster_description = "from a combined batch of many small, diverse clusters"
        need_instruction = (
            "Determine the single, most common financial need and product idea that represents the DIVERSITY of the questions, "
            "or assign 'None/Information Need' if the questions are too varied."
        )
    else:
        cluster_description = f"from Cluster ID {cluster_id}"
        need_instruction = "Determine the single, most relevant financial product category, the underlying financial need, and a simple product idea."

    sample_text = "\n- " + "\n- ".join(sample_questions)
    user_query = (
        f"Analyze the following {len(sample_questions)} translated farmer questions {cluster_description}. "
        f"{need_instruction} "
        "Financial Need categories must ONLY be one of: Working Capital, Asset Financing, Risk Mitigation/Insurance, Liquidity/Savings, Market Access, or None/Information Need. "
        f"Questions:\n{sample_text}"
    )

    response_schema = {
        "type": "OBJECT",
        "properties": {
            "clusterId": {"type": "INTEGER", "description": "The input cluster ID."},
            "financialNeed": {
                "type": "STRING",
                "description": "The core financial problem. Choose one of: Working Capital, Asset Financing, Risk Mitigation/Insurance, Liquidity/Savings, Market Access, or None/Information Need."
            },
            "productOpportunity": {
                "type": "STRING",
                "description": "A specific, concise financial product idea to solve this need (e.g., 'Emergency Micro-loan for inputs', 'Crop-failure insurance')."
            }
        }
    }

    return {
        "contents": [{"parts": [{"text": user_query}]}],
        "systemInstruction": {"parts": [{"text": system_prompt}]},
        "generationConfig": {
            "responseMimeType": "application/json",
            "responseSchema": response_schema
        }
    }


def call_gemini_api_with_retry(payload, api_key, max_retries=5):
    """
    Makes a single API call with exponential backoff for rate limit handling.
    """
    api_url = f"{BASE_API_URL}{api_key}"

    for attempt in range(max_retries):
        try:
            response = requests.post(api_url, headers={'Content-Type': 'application/json'}, json=payload)
            response.raise_for_status()

            raw_response = response.json()

            if raw_response.get('candidates'):
                json_string = raw_response['candidates'][0]['content']['parts'][0].get('text', '{}')

                # Handle common markdown wrapping issue
                if json_string.strip().startswith("```json"):
                    json_string = json_string.strip()[7:-3].strip()

                parsed_result = json.loads(json_string)
                return parsed_result
            else:
                raise ValueError("Model returned no candidates or empty content.")

        except requests.exceptions.RequestException as e:
            status_code = response.status_code if response is not None else 0

            if status_code == 429 or status_code == 500 or status_code == 503:
                wait_time = 10 * (2 ** attempt) + random.uniform(0, 5) # Added jitter
                print(f"\n[API ERROR {status_code}] Rate limit hit or transient error. Waiting {wait_time:.2f}s before retrying...")
                time.sleep(wait_time)
                continue

            if status_code in [400, 403]:
                print(f"\n[CRITICAL API ERROR {status_code}] Check API key, permissions, or payload structure.")
                raise Exception(f"Failed to connect: {status_code}. Aborting.")

            print(f"[Failure] API Call failed (Attempt {attempt+1}/{max_retries}): {e}. Retrying...")
            time.sleep(2 ** attempt)

    raise Exception(f"Failed to get successful API response after {max_retries} retries for cluster.")

# --- MAIN EXECUTION ---

if __name__ == "__main__":
    !pip install polars

    if not API_KEY or API_KEY.startswith("YOUR_"):
        print("\n--- WARNING: API Key Not Provided ---")
        print("Please replace the placeholder string in the API_KEY variable with your actual Gemini API key.")
        exit(1)

    print(f"Starting synchronous analysis using one API key ({API_KEY[-4:]})...")
    print(f"Minimum cluster size for separate analysis set to: {MIN_CLUSTER_COUNT_FOR_SEPARATE_ANALYSIS}")


    if not os.path.exists(CLUSTERED_DATA_PATH):
        print(f"FATAL: Clustered data not found at {CLUSTERED_DATA_PATH}. Please ensure the previous step was completed successfully.")
        exit(1)
    else:
        print("Loading clustered data for Synchronous Financial Intent Mapping...")
        df = pl.read_parquet(CLUSTERED_DATA_PATH, columns=['question_content_en', 'cluster_id'])

        # 1. Filter out noise and count cluster sizes
        df_clusters = df.filter(pl.col("cluster_id") != -1)
        cluster_counts = df_clusters.group_by('cluster_id').count().sort("cluster_id")

        # 2. Separate large clusters from small clusters
        large_cluster_counts_df = cluster_counts.filter(pl.col("count") >= MIN_CLUSTER_COUNT_FOR_SEPARATE_ANALYSIS)
        large_cluster_ids = large_cluster_counts_df['cluster_id'].to_list()
        small_cluster_ids = cluster_counts.filter(pl.col("count") < MIN_CLUSTER_COUNT_FOR_SEPARATE_ANALYSIS)['cluster_id'].to_list()

        # 3. Identify all clusters to analyze
        # Use a slice to ensure we are working with a fresh list for appending
        clusters_to_analyze = large_cluster_ids[:]
        if small_cluster_ids:
            clusters_to_analyze.append(VIRTUAL_CLUSTER_ID)

        # --- Sanity check added to diagnose the skipped cluster ---
        print(f"Total clusters to process: {len(clusters_to_analyze)} out of {df_clusters.select(pl.col('cluster_id').unique().count()).item()} total clusters.")
        print(f"Confirmed clusters to process: {clusters_to_analyze}")

        # 4. Prepare cluster samples
        cluster_samples = {}
        for row in tqdm(large_cluster_counts_df.rows(named=True), total=len(large_cluster_ids), desc="Sampling Large Clusters"):
            cluster_id = row['cluster_id']
            cluster_size = row['count']
            n_sample = min(SAMPLE_SIZE_PER_CLUSTER, cluster_size)
            sample_df = df_clusters.filter(pl.col("cluster_id") == cluster_id).sample(n=n_sample, with_replacement=False, seed=42)
            cluster_samples[cluster_id] = sample_df['question_content_en'].to_list()

        if small_cluster_ids:
            df_small_clusters = df_clusters.filter(pl.col("cluster_id").is_in(small_cluster_ids))
            total_small_cluster_size = df_small_clusters.height
            n_sample_virtual = min(SAMPLE_SIZE_PER_CLUSTER, total_small_cluster_size)
            sample_df_virtual = df_small_clusters.sample(n=n_sample_virtual, with_replacement=False, seed=42)
            cluster_samples[VIRTUAL_CLUSTER_ID] = sample_df_virtual['question_content_en'].to_list()


        # 5. Perform the Synchronous LLM Analysis
        taxonomy_results = []
        success_count = 0
        error_count = 0

        print(f"\nStarting Synchronous Classification for {len(clusters_to_analyze)} clusters...")

        for cluster_id in tqdm(clusters_to_analyze, desc="Classifying Financial Intent"):
            # Check if the cluster was actually sampled (prevents the 'Warning' from breaking execution)
            if cluster_id not in cluster_samples:
                 error_count += 1
                 print(f"\n[SKIP ERROR] Cluster ID {cluster_id} was identified for processing but not found in samples. Skipping.")
                 taxonomy_results.append({
                    "clusterId": cluster_id,
                    "financialNeed": "SETUP_ERROR",
                    "productOpportunity": "Missing cluster sample due to setup error."
                 })
                 continue

            sample_questions = cluster_samples[cluster_id]
            request_payload = create_single_request_payload(cluster_id, sample_questions)

            try:
                result = call_gemini_api_with_retry(request_payload, API_KEY)

                # Check if the result has the necessary fields and the correct cluster ID
                if result and result.get('clusterId') == cluster_id:
                    taxonomy_results.append(result)
                    success_count += 1
                else:
                    error_count += 1
                    taxonomy_results.append({
                        "clusterId": cluster_id,
                        "financialNeed": "VALIDATION_ERROR",
                        "productOpportunity": "Parsed JSON missing required fields or incorrect ID."
                    })

            except Exception as e:
                error_count += 1
                taxonomy_results.append({
                    "clusterId": cluster_id,
                    "financialNeed": "API_FAILURE",
                    "productOpportunity": f"Synchronous API call failed: {str(e)}"
                })


        # 6. Save the final results
        if taxonomy_results:
            os.makedirs(os.path.dirname(OUTPUT_TAXONOMY_PATH), exist_ok=True)
            with open(OUTPUT_TAXONOMY_PATH, 'w') as f:
                json.dump(taxonomy_results, f, indent=4)

            print(f"\n✅ Synchronous Financial Taxonomy Attempt Complete! Results saved to: {OUTPUT_TAXONOMY_PATH}")
            print(f"   Successes: {success_count}, Failures: {error_count}")

            # 7. Print a summary of the top financial needs
            needs_list = [res['financialNeed'] for res in taxonomy_results if res['financialNeed'] not in ['API_FAILURE', 'VALIDATION_ERROR', 'SETUP_ERROR']]
            needs_count = Counter(needs_list).most_common()

            print("\n--- Summary of Top Financial Needs Mapped to Clusters ---")
            for need, count in needs_count:
                print(f"- {need}: {count} Clusters")

        print("\n**CRITICAL ADVICE:**")
        print("This synchronous approach is highly vulnerable to rate limits and will be slow.")
        print("To guarantee completion, please use the provided **offline_financial_intent_mapper.py** file.")

Key Strategic Takeaway for Producers Direct
The data divides the user base into two distinct groups based on their need frequency:

The Information Majority (The Bulk): Focus on information-delivery channels. These users need rapid, accurate, and localized advice.

The Financial Minority (The High-Value Opportunity): Focus financial product development on the specific themes represented by the 5 financially-classified clusters. Do not try to solve the financial need of the entire user base; target the small, specific, high-intent groups.

*Possible strategic implications of financial need category count*

**None/Information Needed:11 Clusters**	Core Value Proposition: The most frequent and massive demand from farmers is for **advice, knowledge, and technical guidance** (e.g., pest control, planting methods, soil health). This confirms that scaling the quality and speed of technical response is the organization's current core mandate based on user behavior.

**Working Capital: 2 Clusters**	High-Demand, Transactional Need: These clusters represent **immediate operational needs**, likely for inputs like seeds and fertilizer. This is an immediate, high-priority product opportunity for a **short-term micro-loan or credit facility**.

**Risk Mitigation/Insurance:	2 Clusters**	Protection Need: These clusters reflect **farmer concerns about external threats** (weather, pests, disease). This signals a strong opportunity for developing **targeted, parametric insurance products **(like weather-indexed insurance).

**Asset Financing:	1 Cluster**	Growth Need: This cluster likely relates to **needs for high-value equipment** like solar pumps, storage facilities, or machinery. This points toward opportunities for **group loans or lease-to-own programs designed for capital investment and long-term farm growth.**

In [None]:

# --- CONFIGURATION ---
CLUSTERED_DATA_PATH = './translated_data/african_questions_CLUSTERED.parquet'
TAXONOMY_PATH = './financial_taxonomy_SYNC.json' # Using the output from the last successful step
OUTPUT_DF_PATH = './analysis_data/farmer_questions_FOR_ANALYSIS.parquet'
VIRTUAL_CLUSTER_ID = -999

# --- SEVERITY MAPPING ---
# Define the weight for each financial need in the Severity calculation (0.0 to 1.0)
# Financial needs are given higher severity scores (S-scores) than information needs (I-scores).
# The Final Severity Score = Cluster Frequency * Severity Weight
SEVERITY_WEIGHTS = {
    "Working Capital": 0.9,       # High urgency (inputs needed NOW)
    "Risk Mitigation/Insurance": 0.8, # High importance (livelihood protection)
    "Asset Financing": 0.7,       # Medium importance (long-term growth)
    "Liquidity/Savings": 0.5,     # Medium importance (financial resilience)
    "Market Access": 0.4,         # Lower importance (often informational, but affects income)
    "None/Information Need": 0.2, # Lowest urgency/financial impact
    "API_FAILURE": 0.0,           # Set failure clusters to zero weight
    "VALIDATION_ERROR": 0.0,
    "SETUP_ERROR": 0.0
}


def calculate_cluster_metrics():
    """
    Loads the clustered data, merges the financial taxonomy, calculates cluster
    frequency, and assigns a base severity score.
    """
    print("Starting Severity Analysis Preprocessing...")

    # 1. Load DataFrames
    if not os.path.exists(CLUSTERED_DATA_PATH):
        print(f"FATAL: Clustered data not found at {CLUSTERED_DATA_PATH}. Aborting.")
        return
    df_questions = pl.read_parquet(CLUSTERED_DATA_PATH)

    if not os.path.exists(TAXONOMY_PATH):
        print(f"FATAL: Financial taxonomy not found at {TAXONOMY_PATH}. Aborting.")
        return
    with open(TAXONOMY_PATH, 'r') as f:
        taxonomy_data = json.load(f)

    # 2. Convert taxonomy list to Polars DataFrame (though not strictly needed, good for inspection)
    # taxonomy_df = pl.DataFrame(taxonomy_data) # Not used below, simplifying to dict lookup

    # 3. Calculate Cluster Size and Frequency

    # Filter out noise (-1) clusters for frequency analysis
    df_filtered = df_questions.filter(pl.col("cluster_id") != -1)

    # Calculate the total number of non-noise questions
    total_questions = df_filtered.height

    # Group by cluster ID to get size (count)
    cluster_size_df = df_filtered.group_by('cluster_id').count().rename({"count": "cluster_size"})

    # Calculate frequency (size / total_questions)
    cluster_size_df = cluster_size_df.with_columns(
        (pl.col("cluster_size") / total_questions).alias("cluster_frequency")
    )

    # 4. Merge Taxonomy into Cluster Metrics

    # Create a mapping dictionary from the taxonomy data
    taxonomy_map = {item['clusterId']: item for item in taxonomy_data}

    # Function to lookup financial data for each cluster
    def get_financial_data(cluster_id):
        # Handle the virtual cluster ID lookup specifically
        lookup_id = VIRTUAL_CLUSTER_ID if cluster_id not in taxonomy_map else cluster_id

        # If the cluster ID is large (>=200) or the Virtual Cluster, use its specific entry
        if lookup_id in taxonomy_map:
            return taxonomy_map[lookup_id]

        # If the cluster ID is small (<200) but NOT the Virtual Cluster ID,
        # it means it was a small cluster, so we use the Virtual Cluster's classification.
        if cluster_id not in taxonomy_map and VIRTUAL_CLUSTER_ID in taxonomy_map:
            return taxonomy_map[VIRTUAL_CLUSTER_ID]

        # Default for unclassified clusters (should only happen if -999 failed)
        return {"financialNeed": "UNCATEGORIZED", "productOpportunity": "Data missing."}

    # Apply the mapping logic to the cluster_size_df
    # We apply the classification based on the logic:
    # - If cluster is large, use its specific classification.
    # - If cluster is small, use the Virtual Cluster's (-999) classification.
    cluster_metrics = cluster_size_df.with_columns(
        # FIX: Changed .apply() to .map_elements() for Polars compatibility
        pl.col("cluster_id").map_elements(lambda id: get_financial_data(id)['financialNeed'], return_dtype=pl.String).alias("financial_need"),
        pl.col("cluster_id").map_elements(lambda id: get_financial_data(id)['productOpportunity'], return_dtype=pl.String).alias("product_opportunity")
    )

    # 5. Assign Severity Weights
    cluster_metrics = cluster_metrics.with_columns(
        # FIX: Changed .apply() to .map_elements() for Polars compatibility
        pl.col("financial_need").map_elements(lambda need: SEVERITY_WEIGHTS.get(need, 0.0), return_dtype=pl.Float64).alias("severity_weight")
    )

    # 6. Calculate Final Severity Score (Frequency * Severity Weight)
    # This is the core calculation for prioritization: score = F * W
    cluster_metrics = cluster_metrics.with_columns(
        (pl.col("cluster_frequency") * pl.col("severity_weight")).alias("severity_score")
    )

    # 7. Finalize and Save
    # Sort descending to put the highest priority clusters at the top
    final_df = cluster_metrics.sort("severity_score", descending=True)

    os.makedirs(os.path.dirname(OUTPUT_DF_PATH), exist_ok=True)
    final_df.write_parquet(OUTPUT_DF_PATH)

    print(f"\n✅ Severity Analysis Preprocessing Complete! Metrics saved to: {OUTPUT_DF_PATH}")
    print("\n--- Top 5 Clusters by Severity Score ---")
    print(final_df.head(5))

    print("\nNext Steps:")
    print("1. Review the final sorted table to identify the highest priority clusters.")
    print("2. The next file will present the final strategic report and data visualization.")

if __name__ == "__main__":
    !pip install polars
    calculate_cluster_metrics()

In [None]:
import matplotlib.pyplot as plt
import os
import textwrap

# --- CONFIGURATION ---
INPUT_DF_PATH = './analysis_data/farmer_questions_FOR_ANALYSIS.parquet'
REPORT_FILENAME = './analysis_data/top_priority_clusters_report.txt'
PLOT_FILENAME = './analysis_data/severity_score_bar_chart.png'
TOP_N_CLUSTERS = 10 # Number of top clusters to visualize and report


def generate_strategic_report():
    """
    Loads the scored data, generates a structured text report, and creates a
    visualization of the top priority clusters for executive review.
    """
    print("Starting Final Strategic Report Generation...")

    if not os.path.exists(INPUT_DF_PATH):
        print(f"FATAL: Input data for analysis not found at {INPUT_DF_PATH}. Please run the Preprocessor step first.")
        return

    # 1. Load the final scored data
    df = pl.read_parquet(INPUT_DF_PATH)

    # Filter out clusters with zero severity (API failures, UNCATEGORIZED)
    df_ranked = df.filter(pl.col("severity_score") > 0).sort("severity_score", descending=True)
    df_top = df_ranked.head(TOP_N_CLUSTERS)

    if df_top.is_empty():
        print("WARNING: No clusters found with a severity score greater than zero.")
        return

    # 2. Prepare the Text Report
    report_content = []

    report_content.append("="*80)
    report_content.append(f"PRODUCERS DIRECT: TOP {TOP_N_CLUSTERS} PRIORITIZED NEEDS ANALYSIS")
    report_content.append("="*80)
    report_content.append(f"Total Clusters Analyzed: {df.height} (excluding noise)")
    report_content.append(f"Total Questions Analyzed: {df['cluster_size'].sum()}")
    report_content.append("\nThis report ranks clusters by 'Severity Score' (Cluster Frequency * Financial Urgency Weight).")
    report_content.append("It identifies the most impactful problems requiring immediate strategic attention.")
    report_content.append("="*80)

    for i, row in enumerate(df_top.rows(named=True)):
        cluster_id = row['cluster_id']
        size = row['cluster_size']
        frequency = row['cluster_frequency'] * 100
        severity_score = row['severity_score'] * 100 # Display score as a percentage for readability
        need = row['financial_need']
        opportunity = row['product_opportunity']

        report_content.append(f"\nRANK #{i+1} | SCORE: {severity_score:.4f}% | Cluster ID: {cluster_id}")
        report_content.append(f"  > Primary Need: {need}")
        report_content.append(f"  > Size/Frequency: {size} questions ({frequency:.2f}% of total)")

        # Wrap the opportunity text for clean output
        wrapped_opportunity = textwrap.fill(
            f"  > Strategic Opportunity: {opportunity}",
            width=75,
            initial_indent="    ",
            subsequent_indent="    "
        )
        report_content.append(wrapped_opportunity)
        report_content.append("-" * 40)

    # Save the text report
    os.makedirs(os.path.dirname(REPORT_FILENAME), exist_ok=True)
    with open(REPORT_FILENAME, 'w') as f:
        f.write('\n'.join(report_content))
    print(f"\n✅ Strategic Text Report saved to: {REPORT_FILENAME}")

    # 3. Generate Visualization

    # Prepare data for plotting
    plot_data = df_top.sort("severity_score", descending=False) # Sort ascending for correct bar chart order

    scores = plot_data['severity_score'].to_list()
    # Create labels combining ID and Need, and wrap long text
    labels = [
        f"ID {row['cluster_id']} ({row['financial_need']})"
        for row in plot_data.rows(named=True)
    ]

    # Define colors based on need type (Financial vs. Information)
    colors = ['#1f77b4' if 'None/Information' not in n else '#ff7f0e' for n in plot_data['financial_need'].to_list()]

    plt.style.use('ggplot')
    fig, ax = plt.subplots(figsize=(10, 6))

    # Create the horizontal bar chart
    ax.barh(labels, scores, color=colors)

    ax.set_title(f"Top {TOP_N_CLUSTERS} Priority Clusters Ranked by Severity Score", fontsize=14)
    ax.set_xlabel("Severity Score (Frequency × Weight)", fontsize=12)
    ax.set_ylabel("Cluster ID and Primary Need", fontsize=12)

    # Add a legend
    from matplotlib.lines import Line2D
    legend_elements = [
        Line2D([0], [0], color='#1f77b4', lw=4, label='Direct Financial Need'),
        Line2D([0], [0], color='#ff7f0e', lw=4, label='Information Need')
    ]
    ax.legend(handles=legend_elements, loc='lower right')

    plt.tight_layout()
    plt.savefig(PLOT_FILENAME)
    print(f"✅ Visualization saved to: {PLOT_FILENAME}")

    print("\nFinal Analysis Complete.")

if __name__ == "__main__":
    # Ensure Polars is installed if running outside the flow
    try:
        import polars as pl
    except ImportError:
        os.system('pip install polars matplotlib')
        import polars as pl

    generate_strategic_report()