In [0]:
# # Run cell on cluster restart or if receiving error: 
# # AttributeError: 'EMR' object has no attribute 'create_persistent_app_ui'

# %pip install --upgrade boto3 botocore
# %restart_python

## EMR Spark Event Log Analyzer
 
This script analyzes EMR clusters to extract performance metrics from Spark History Servers. It discovers EMR clusters, connects to their persistent Spark History Server UIs, fetches application, job, stage, and SQL query data, and then processes this information into Spark DataFrames for performance analysis and optimization insights.

## Required IAM Permissions
#### EMR
  - elasticmapreduce:ListClusters
  - elasticmapreduce:DescribeCluster
  - elasticmapreduce:ListSteps
  - elasticmapreduce:DescribeStep
  - elasticmapreduce:CreatePersistentAppUI
  - elasticmapreduce:GetPersistentAppUIPresignedURL
  - elasticmapreduce:ListInstanceGroups
  - elasticmapreduce:ListInstanceFleets
#### S3
  - s3:PutObject
  - s3:GetObject
  - s3:ListBucket
#### STS
  - sts:GetCallerIdentity



In [0]:
import datetime
from datetime import timedelta
import logging

# Configure logging
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)

# ----------------------------------------------------------------------
# Configuration Parameters
# ----------------------------------------------------------------------
# Parse and validate configuration
dbutils.widgets.text("aws_region", "us-east-1", "AWS Region")
dbutils.widgets.text("emr_cluster_arn", "", "EMR Cluster ARN (optional - leave blank to discover clusters)")
dbutils.widgets.text("timeout_seconds", "300", "Request Timeout (seconds)")
dbutils.widgets.text("max_applications", "50", "Max Applications to Analyze per Cluster")
dbutils.widgets.dropdown("environment", "dev", ["dev", "prod"], "Environment (dev/prod)")
dbutils.widgets.text("s3_output_path", "", "S3 Output Path (prod only)")

# Updated: cluster_states dropdown with new options
dbutils.widgets.dropdown("cluster_states", "TERMINATED,WAITING", ["TERMINATED", "WAITING", "TERMINATED,WAITING", "ALL"], "EMR Cluster States to Analyze")

dbutils.widgets.text("cluster_name_filter", "", "Cluster Name Filter (optional - partial name match)")
dbutils.widgets.text("max_clusters", "5", "Max Clusters to Analyze")

# New: Widgets for CreatedAfter and CreatedBefore dates
# Format: YYYY-MM-DD. Leave blank for no date filter.
dbutils.widgets.text("created_after_date", "", "EMR Clusters Created After (YYYY-MM-DD)")
dbutils.widgets.text("created_before_date", "", "EMR Clusters Created Before (YYYY-MM-DD)")

# New: Parameters for timeout and batch processing
dbutils.widgets.text("persistent_ui_timeout_seconds", "180", "Persistent App UI Timeout (seconds)")
dbutils.widgets.text("batch_size", "50", "Batch Size (clusters to process concurrently)") 
dbutils.widgets.text("batch_delay_seconds", "1800", "Delay Between Batches (seconds)")


AWS_REGION = dbutils.widgets.get("aws_region").strip() or "us-east-1"
EMR_CLUSTER_ARN = dbutils.widgets.get("emr_cluster_arn").strip()
TIMEOUT_SECONDS = int(dbutils.widgets.get("timeout_seconds") or "300")
MAX_APPLICATIONS = int(dbutils.widgets.get("max_applications") or "50")
CLUSTER_STATES = dbutils.widgets.get("cluster_states").strip()
CLUSTER_NAME_FILTER = dbutils.widgets.get("cluster_name_filter").strip()
MAX_CLUSTERS = int(dbutils.widgets.get("max_clusters") or "5")
ENVIRONMENT = dbutils.widgets.get("environment").strip()
S3_OUTPUT_PATH = dbutils.widgets.get("s3_output_path").strip()
PERSISTENT_UI_TIMEOUT_SECONDS = int(dbutils.widgets.get("persistent_ui_timeout_seconds") or "180")
BATCH_SIZE = int(dbutils.widgets.get("batch_size") or "3")
BATCH_DELAY_SECONDS = int(dbutils.widgets.get("batch_delay_seconds") or "60") 


# Date parameters
if ENVIRONMENT == "dev":
    CREATED_AFTER_DATE_STR = dbutils.widgets.get("created_after_date").strip()
    CREATED_BEFORE_DATE_STR = dbutils.widgets.get("created_before_date").strip()
    PARSED_CREATED_AFTER_DATE = None
    if CREATED_AFTER_DATE_STR:
        try:
            PARSED_CREATED_AFTER_DATE = datetime.datetime.strptime(CREATED_AFTER_DATE_STR, "%Y-%m-%d")
        except ValueError as e:
            logger.error("❌ Invalid format for created_after_date: %s. Expected YYYY-MM-DD.", CREATED_AFTER_DATE_STR, exc_info=True)
            raise ValueError(f"Invalid format for created_after_date: {CREATED_AFTER_DATE_STR}. Expected YYYY-MM-DD.") from e
    PARSED_CREATED_BEFORE_DATE = None
    if CREATED_BEFORE_DATE_STR:
        try:
            PARSED_CREATED_BEFORE_DATE = datetime.datetime.strptime(CREATED_BEFORE_DATE_STR, "%Y-%m-%d") + timedelta(days=1, seconds=-1)
        except ValueError as e:
            logger.error("❌ Invalid format for created_before_date: %s. Expected YYYY-MM-DD.", CREATED_BEFORE_DATE_STR, exc_info=True)
            raise ValueError(f"Invalid format for created_before_date: {CREATED_BEFORE_DATE_STR}. Expected YYYY-MM-DD.") from e
    # Validate date range
    if PARSED_CREATED_AFTER_DATE and PARSED_CREATED_BEFORE_DATE and PARSED_CREATED_AFTER_DATE >= PARSED_CREATED_BEFORE_DATE:
        logger.error("❌ created_after_date (%s) cannot be on or after created_before_date (%s).", CREATED_AFTER_DATE_STR, CREATED_BEFORE_DATE_STR, exc_info=True)
        raise ValueError("created_after_date cannot be on or after created_before_date.")
else:
    # In prod, analyze only the last 24 hours
    PARSED_CREATED_BEFORE_DATE = datetime.datetime.now()
    PARSED_CREATED_AFTER_DATE = PARSED_CREATED_BEFORE_DATE - timedelta(days=1)
    CREATED_AFTER_DATE_STR = PARSED_CREATED_AFTER_DATE.strftime("%Y-%m-%d")
    CREATED_BEFORE_DATE_STR = PARSED_CREATED_BEFORE_DATE.strftime("%Y-%m-%d")


# Updated: Parse cluster states based on the new dropdown options
if CLUSTER_STATES == "ALL":
    # Comprehensive list of all EMR cluster states for 'ALL' option
    CLUSTER_STATES_LIST = [
        'STARTING', 'BOOTSTRAPPING', 'RUNNING', 'WAITING',
        'TERMINATING', 'TERMINATED', 'TERMINATED_WITH_ERRORS'
    ]
elif CLUSTER_STATES == "TERMINATED":
    CLUSTER_STATES_LIST = ['TERMINATED']
elif CLUSTER_STATES == "WAITING":
    CLUSTER_STATES_LIST = ['WAITING']
elif CLUSTER_STATES == "TERMINATED,WAITING":
    CLUSTER_STATES_LIST = ['TERMINATED', 'WAITING']
else:
    # Default to TERMINATED and WAITING for any unexpected value
    CLUSTER_STATES_LIST = ['TERMINATED', 'WAITING']
    logger.warning("Invalid cluster_states value '%s'. Defaulting to ['TERMINATED', 'WAITING'].", CLUSTER_STATES)


print("Configuration:")
print(f"  Environment: {ENVIRONMENT}")
print(f"  AWS Region: {AWS_REGION}")
print(f"  EMR Cluster ARN: {EMR_CLUSTER_ARN or 'Auto-discover clusters'}")
print(f"  Timeout: {TIMEOUT_SECONDS} seconds")
print(f"  Max Applications per Cluster: {MAX_APPLICATIONS}")
print(f"  Cluster States to Analyze: {CLUSTER_STATES_LIST}")
print(f"  Cluster Name Filter: {CLUSTER_NAME_FILTER or 'None'}")
print(f"  Max Clusters to Analyze: {MAX_CLUSTERS}")
print(f"  Created After Date: {CREATED_AFTER_DATE_STR or 'None'}")
print(f"  Created Before Date: {CREATED_BEFORE_DATE_STR or 'None'}")
print(f"  S3 Output Path: {S3_OUTPUT_PATH or 'None (dev mode or not specified)'}")
print(f"  Persistent UI Timeout: {PERSISTENT_UI_TIMEOUT_SECONDS} seconds")
print(f"  Batch Size: {BATCH_SIZE} clusters")
print(f"  Batch Delay: {BATCH_DELAY_SECONDS} seconds")

Configuration:
  Environment: dev
  AWS Region: us-east-1
  EMR Cluster ARN: Auto-discover clusters
  Timeout: 300 seconds
  Max Applications per Cluster: 50
  Cluster States to Analyze: ['TERMINATED']
  Cluster Name Filter: None
  Max Clusters to Analyze: 100
  Created After Date: None
  Created Before Date: None
  S3 Output Path: s3://ttd-vertica-backups/env=test/vertica-ext/databricks-stage/sparkmetrics
  Persistent UI Timeout: 30 seconds
  Batch Size: 100 clusters
  Batch Delay: 30 seconds


### AWS Boto3 Helper Functions

In [0]:
"""
EMR Persistent App UI Client
This module provides functionality to create an EMR Persistent App UI, retrieve its details and presigned URL,
and establish an HTTP session with proper cookie management for Spark History Server access.
"""

import logging
import time
from typing import Dict, Optional, Tuple
from urllib.parse import urlparse

import boto3
import requests
from botocore.exceptions import ClientError

# Configure logging
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)


class ServerConfig:
    """Configuration class for EMR Persistent UI client."""

    def __init__(self, emr_cluster_arn: str, timeout: int = 300):
        """
        Initialize ServerConfig.

        :param emr_cluster_arn: The EMR cluster ARN
        :param timeout: Request timeout in seconds
        :raises ValueError: If emr_cluster_arn is invalid
        """
        if not emr_cluster_arn or not emr_cluster_arn.startswith("arn:aws:elasticmapreduce:"):
            raise ValueError("Invalid EMR cluster ARN format")
        self.emr_cluster_arn = emr_cluster_arn
        self.timeout = timeout


class EMRPersistentUIClient:
    """Client for managing EMR Persistent App UI and HTTP sessions."""

    def __init__(self, server_config: ServerConfig):
        """
        Initialize the EMR client.

        :param server_config: ServerConfig object
        """
        self.emr_cluster_arn = server_config.emr_cluster_arn
        self.region = self.emr_cluster_arn.split(":")[3] # Extract region from ARN
        # Initialize boto3 client with credentials
        self.emr_client = boto3.client(
            "emr",
            region_name=self.region
        )
        self.session = requests.Session()
        self.persistent_ui_id: Optional[str] = None
        self.presigned_url: Optional[str] = None
        self.base_url: Optional[str] = None
        self.timeout: int = server_config.timeout
        self.presigned_url_ready: bool = False

    def create_persistent_app_ui(self) -> Dict:
        """
        Create a persistent app UI for the given cluster.

        :returns: Response from create-persistent-app-ui API call
        :raises ClientError: If the API call fails
        """
        logger.info("Creating persistent app UI for cluster: %s", self.emr_cluster_arn)
        try:
            response = self.emr_client.create_persistent_app_ui(
                TargetResourceArn=self.emr_cluster_arn
            )
            self.persistent_ui_id = response.get("PersistentAppUIId")
            runtime_role_enabled = response.get("RuntimeRoleEnabledCluster", False)
            logger.info("✅ Persistent App UI created successfully")
            logger.info(" Persistent UI ID: %s", self.persistent_ui_id)
            logger.info(" Runtime Role Enabled: %s", runtime_role_enabled)
            return response
        except ClientError as e:
            error_code = e.response["Error"]["Code"]
            error_message = e.response["Error"]["Message"]
            logger.error(
                "❌ Failed to create persistent app UI: %s - %s", error_code, error_message, exc_info=True
            )
            raise
        except Exception as e:
            logger.error("❌ Unexpected error creating persistent app UI: %s", str(e), exc_info=True)
            raise

    def get_presigned_url(self, ui_type: str = "SHS") -> str:
        """
        Get presigned URL for the persistent app UI.

        :param ui_type: Type of UI ('SHS' for Spark History Server)
        :returns: Presigned URL string
        :raises ValueError: If no persistent UI ID is available
        :raises ClientError: If the API call fails
        """
        if not self.persistent_ui_id:
            raise ValueError("No persistent UI ID available. Create one first.")
        logger.info(
            "Getting presigned URL for persistent app UI: %s (type: %s)", self.persistent_ui_id, ui_type
        )
        try:
            response = self.emr_client.get_persistent_app_ui_presigned_url(
                PersistentAppUIId=self.persistent_ui_id,
                PersistentAppUIType=ui_type
            )
            self.presigned_url_ready = response.get("PresignedURLReady")
            self.presigned_url = response.get("PresignedURL")
            # Extract base URL from presigned URL
            parsed_url = urlparse(self.presigned_url)
            self.base_url = f"{parsed_url.scheme}://{parsed_url.netloc}/shs"
            logger.info("✅ Presigned URL obtained successfully")
            logger.info(" Base URL: %s", self.base_url)
            return self.presigned_url
        except ClientError as e:
            error_code = e.response["Error"]["Code"]
            error_message = e.response["Error"]["Message"]
            logger.error(
                "❌ Failed to get presigned URL: %s - %s", error_code, error_message, exc_info=True
            )
            raise
        except Exception as e:
            logger.error("❌ Unexpected error getting presigned URL: %s", str(e), exc_info=True)
            raise

    def setup_http_session(self) -> requests.Session:
        """
        Set up HTTP session with proper headers and cookie management.

        :returns: Configured requests.Session object
        :raises ValueError: If no presigned URL is available
        """
        if not self.presigned_url:
            raise ValueError("No presigned URL available. Get one first.")
        logger.info("Setting up HTTP session with cookie management")
        # Configure session with appropriate headers
        self.session.headers.update(
            {
                "User-Agent": "EMR-Persistent-UI-Client/1.0",
                "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
                "Accept-Language": "en-US,en;q=0.5",
                "Accept-Encoding": "gzip, deflate",
                "Connection": "keep-alive",
                "Upgrade-Insecure-Requests": "1"
            }
        )
        try:
            # Make initial request to establish session and get cookies
            logger.info("Making initial request")
            response = self.session.get(
                self.presigned_url, timeout=self.timeout, allow_redirects=True
            )
            response.raise_for_status()
            logger.info("✅ HTTP session established successfully")
            logger.info(" Status Code: %s", response.status_code)
            logger.info(" Cookies: %s cookie(s) stored", len(self.session.cookies))
            # Log cookie details (without sensitive values)
            for cookie in self.session.cookies:
                logger.debug(" Cookie: %s (domain: %s)", cookie.name, cookie.domain)
            return self.session
        except requests.exceptions.RequestException as e:
            logger.error("❌ Failed to establish HTTP session: %s", str(e), exc_info=True)
            raise
        except Exception as e:
            logger.error("❌ Unexpected error setting up HTTP session: %s", str(e), exc_info=True)
            raise

    def initialize(self, max_wait_time: int) -> Tuple[str, requests.Session]:
        """
        Initialize the EMR Persistent UI client by creating a persistent app UI,
        polling until it is ready, getting a presigned URL, and setting up an HTTP session.

        :param max_wait_time: The maximum time in seconds to wait for the persistent UI to become ready.
        :returns: Tuple containing the base URL and configured session
        :raises ValueError: If the persistent UI does not become ready within the timeout period
        """
        # Step 1: Create persistent app UI
        self.create_persistent_app_ui()

        # Step 2: Poll for the presigned URL until it's ready
        wait_interval = 10  # Check every 10 seconds
        total_waited = 0
        url_is_ready = False

        logger.info("Waiting for Persistent App UI to become ready...")

        while total_waited < max_wait_time:
            try:
                # Directly call the boto3 client to check the ready status without failing
                response = self.emr_client.get_persistent_app_ui_presigned_url(
                    PersistentAppUIId=self.persistent_ui_id, PersistentAppUIType="SHS"
                )
                url_is_ready = response.get("PresignedURLReady", False)
            except ClientError as e:
                # This can happen if the UI is not yet fully initialized; we just wait and retry.
                logger.warning(
                    "Could not check for presigned URL, will retry. Error: %s", str(e)
                )

            if url_is_ready:
                logger.info("✅ Persistent App UI is ready.")
                break

            logger.info(
                "Persistent App UI not ready yet. Waiting %s seconds before retrying...",
                wait_interval
            )
            time.sleep(wait_interval)
            total_waited += wait_interval

        # After waiting, check if we succeeded
        if not url_is_ready:
            raise ValueError(
                f"Persistent App UI did not become ready after waiting {total_waited} seconds."
            )

        # Step 3: Get the actual presigned URL and setup its properties (now that we know it's ready)
        self.get_presigned_url()

        # Step 4: Setup HTTP session
        self.setup_http_session()

        return self.base_url, self.session


### EMR Cluster Discovery

In [0]:
from datetime import datetime, timedelta
from typing import List, Dict, Optional # Added Optional
from botocore.exceptions import ClientError


class EMRClusterDiscovery:
    """Discovery and management of EMR clusters."""


    def __init__(self, region: str):
        """
        Initialize the EMR cluster discovery client.


        :param region: AWS region
        :raises TypeError: If region is not a non-empty string.
        """
        if not isinstance(region, str) or not region:
            raise TypeError("AWS region must be a non-empty string.")
        self.region = region
        self.emr_client = boto3.client("emr", region_name=region)


    def discover_clusters(self,
                          states: Optional[List[str]] = None,
                          name_filter: Optional[str] = None,
                          max_clusters: int = 10,
                          created_after: Optional[datetime] = None,   # Modified: New parameter
                          created_before: Optional[datetime] = None) -> List[Dict]: # Modified: New parameter
        """
        Discover EMR clusters based on criteria.


        :param states: List of cluster states to filter by. Defaults to ['TERMINATED', 'WAITING'].
                       Refer to boto3 EMR documentation for valid states.
        :param name_filter: Partial name to filter clusters. Optional.
        :param max_clusters: Maximum number of clusters to return. Defaults to 10.
        :param created_after: datetime object. Only return clusters created after this time. Optional.
        :param created_before: datetime object. Only return clusters created before this time. Optional.
        :returns: List of cluster summaries, each a dictionary containing cluster metadata.
        :raises TypeError: If inputs are not of the correct type.
        :raises ValueError: If input values are out of acceptable ranges (e.g., max_clusters < 1).
        :raises ClientError: If AWS API call fails.
        """
        # Input validation
        if not isinstance(states, (list, type(None))):
            raise TypeError(":param states: must be a list of strings or None.")
        if states:
            for state in states:
                if not isinstance(state, str):
                    raise TypeError("Each state in the :param states: list must be a string.")
        if not isinstance(name_filter, (str, type(None))):
            raise TypeError(":param name_filter: must be a string or None.")
        if not isinstance(max_clusters, int):
            raise TypeError(":param max_clusters: must be an integer.")
        if max_clusters < 1:
            raise ValueError(":param max_clusters: must be at least 1.")
        # Corrected: Input validation for datetime objects
        if created_after is not None and not isinstance(created_after, datetime):
            raise TypeError(":param created_after: must be a datetime object or None.")
        if created_before is not None and not isinstance(created_before, datetime):
            raise TypeError(":param created_before: must be a datetime object or None.")
        if created_after and created_before and created_after >= created_before:
            raise ValueError(":param created_after: date cannot be on or after :param created_before: date.")



        logger.info("🔍 Discovering EMR clusters in region: %s", self.region)
        logger.info(" States: %s", states)
        logger.info(" Name filter: %s", name_filter or "None")
        logger.info(" Max clusters: %s", max_clusters)
        logger.info(" Created After: %s", created_after or "None")
        logger.info(" Created Before: %s", created_before or "None")


        try:
            # Build parameters for list_clusters
            list_clusters_params = {}
            if states:
                list_clusters_params['ClusterStates'] = states
            else:
                list_clusters_params['ClusterStates'] = ['TERMINATED', 'WAITING'] # Default as per new requirement
            if created_after:
                list_clusters_params['CreatedAfter'] = created_after
            if created_before:
                list_clusters_params['CreatedBefore'] = created_before


            # Use paginator to handle large cluster lists
            paginator = self.emr_client.get_paginator('list_clusters')
            page_iterator = paginator.paginate(**list_clusters_params)



            discovered_clusters = []
            for page in page_iterator:
                for cluster in page.get('Clusters', []):
                    # Apply name filter if specified
                    if name_filter and name_filter.lower() not in cluster.get('Name', '').lower():
                        continue


                    cluster_info = {
                        'cluster_id': cluster.get('Id'),
                        'cluster_name': cluster.get('Name'),
                        'cluster_arn': cluster.get('ClusterArn'),
                        'status': cluster.get('Status', {}).get('State'),
                        'creation_time': cluster.get('Status', {}).get('Timeline', {}).get('CreationDateTime'),
                        'normalized_instance_hours': cluster.get('NormalizedInstanceHours', 0)
                    }
                    discovered_clusters.append(cluster_info)


                    # Stop if we've reached max clusters
                    if len(discovered_clusters) >= max_clusters:
                        break
                if len(discovered_clusters) >= max_clusters:
                    break


            logger.info("✅ Discovered %s clusters", len(discovered_clusters))
            # Log cluster details
            for i, cluster in enumerate(discovered_clusters, 1):
                logger.info(" %d. %s (%s) - %s", i, cluster['cluster_name'], cluster['cluster_id'], cluster['status'])
            return discovered_clusters


        except ClientError as e:
            logger.error("❌ Failed to discover clusters: %s - %s", e.response["Error"]["Code"], e.response["Error"]["Message"], exc_info=True)
            raise
        except Exception as e:
            logger.error("❌ Unexpected error during cluster discovery: %s", str(e), exc_info=True)
            raise


    def get_cluster_details(self, cluster_id: str) -> Dict:
        """
        Get detailed information about a specific cluster.


        :param cluster_id: EMR cluster ID.
        :returns: Detailed cluster information dictionary.
        :raises TypeError: If cluster_id is not a string.
        :raises ClientError: If AWS API call fails.
        """
        if not isinstance(cluster_id, str) or not cluster_id:
            raise TypeError("Cluster ID must be a non-empty string.")
        try:
            response = self.emr_client.describe_cluster(ClusterId=cluster_id)
            cluster = response.get('Cluster', {})


            cluster_details = {
                'cluster_id': cluster.get('Id'),
                'cluster_name': cluster.get('Name'),
                'cluster_arn': cluster.get('ClusterArn'),
                'status': cluster.get('Status', {}).get('State'),
                'emr_release_label': cluster.get('ReleaseLabel'),
                'applications': [app.get('Name') for app in cluster.get('Applications', [])],
                'instance_count': 0,
                'master_instance_type': None,
                'core_instance_type': None,
                'log_uri': cluster.get('LogUri'),
                'ec2_attributes': cluster.get('Ec2InstanceAttributes', {}),
                'creation_time': cluster.get('Status', {}).get('Timeline', {}).get('CreationDateTime'),
                'ready_time': cluster.get('Status', {}).get('Timeline', {}).get('ReadyDateTime'),
                'normalized_instance_hours': cluster.get('NormalizedInstanceHours', 0)
            }


            # Attempt to get instance group information first
            try:
                instance_groups = self.emr_client.list_instance_groups(ClusterId=cluster_id)
                for group in instance_groups.get('InstanceGroups', []):
                    instance_type = group.get('InstanceType')
                    running_count = group.get('RunningInstanceCount', 0)
                    cluster_details['instance_count'] += running_count
                    if group.get('InstanceGroupType') == 'MASTER':
                        cluster_details['master_instance_type'] = instance_type
                    elif group.get('InstanceGroupType') == 'CORE':
                        cluster_details['core_instance_type'] = instance_type
                logger.info("Successfully retrieved instance group info for %s", cluster_id)
            except ClientError as ce:
                # Check if the error is specifically due to instance fleets being used
                if (ce.response["Error"]["Code"] == "InvalidRequestException" and
                        "Instance fleets and instance groups are mutually exclusive" in ce.response["Error"]["Message"]):
                    logger.info("Cluster %s uses instance fleets. Fetching instance fleet details.", cluster_id)
                    # If it uses instance fleets, try to get instance fleet information
                    try:
                        instance_fleets = self.emr_client.list_instance_fleets(ClusterId=cluster_id)
                        for fleet in instance_fleets.get('InstanceFleets', []):
                            # Sum up the target capacities for total instances
                            target_on_demand = fleet.get('TargetOnDemandCapacity', 0)
                            target_spot = fleet.get('TargetSpotCapacity', 0)
                            cluster_details['instance_count'] += target_on_demand + target_spot


                            # Extract master/core instance types from fleet details
                            # InstanceTypeSpecifications is a LIST, not a dict
                            instance_type_specs = fleet.get('InstanceTypeSpecifications', [])
                            if fleet.get('InstanceFleetType') == 'MASTER' and instance_type_specs:
                                cluster_details['master_instance_type'] = instance_type_specs[0].get('InstanceType')
                            elif fleet.get('InstanceFleetType') == 'CORE' and instance_type_specs:
                                cluster_details['core_instance_type'] = instance_type_specs[0].get('InstanceType')
                        logger.info("Successfully retrieved instance fleet info for %s", cluster_id)
                    except ClientError as fleet_ce:
                        logger.warning("Could not get instance fleet details for %s: %s - %s",
                                       cluster_id, fleet_ce.response["Error"]["Code"],
                                       fleet_ce.response["Error"]["Message"], exc_info=True)
                    except Exception as fleet_e:
                        logger.warning("Unexpected error getting instance fleet details for %s: %s",
                                       cluster_id, str(fleet_e), exc_info=True)
                else:
                    # Re-raise any other ClientErrors that are not the mutual exclusivity error
                    logger.warning("Could not get instance group details for %s: %s - %s",
                                   cluster_id, ce.response["Error"]["Code"],
                                   ce.response["Error"]["Message"], exc_info=True)
                    raise # Re-raise original ClientError
            except Exception as e:
                # Catch any other general exceptions during initial instance group/fleet fetching
                logger.warning("Unexpected error during instance details fetching for %s: %s",
                               cluster_id, str(e), exc_info=True)


            return cluster_details


        except ClientError as e:
            logger.error("❌ Failed to describe cluster %s: %s - %s",
                         cluster_id, e.response["Error"]["Code"],
                         e.response["Error"]["Message"], exc_info=True)
            raise
        except Exception as e:
            logger.error("❌ Unexpected error getting cluster details for %s: %s",
                         cluster_id, str(e), exc_info=True)
            raise


    def validate_cluster_for_analysis(self, cluster_info: Dict) -> bool:
        """
        Validate if a cluster is suitable for Spark History Server analysis.
        A cluster is suitable if it has the Spark application installed and is in a state
        where history data might be available.


        :param cluster_info: Cluster information dictionary.
        :returns: True if cluster is suitable for analysis, False otherwise.
        :raises TypeError: If cluster_info is not a dictionary or has incorrect types.
        """
        if not isinstance(cluster_info, dict):
            raise TypeError(":param cluster_info: must be a dictionary.")


        cluster_id = cluster_info.get('cluster_id')
        status = cluster_info.get('status')
        applications = cluster_info.get('applications', [])


        if not isinstance(cluster_id, str):
            raise TypeError("Cluster ID in cluster_info must be a string.")
        if not isinstance(status, str):
            raise TypeError("Status in cluster_info must be a string.")
        if not isinstance(applications, list):
            raise TypeError("Applications in cluster_info must be a list.")
        for app in applications:
            if not isinstance(app, str):
                raise TypeError("Each application name in cluster_info must be a string.")


        # Check if cluster has Spark application
        has_spark = any('Spark' in app for app in applications)
        if not has_spark:
            logger.warning("⚠️ Cluster %s does not have Spark installed - skipping", cluster_id)
            return False


        # Check if cluster is in a valid state for analysis.
        # Even 'TERMINATED' clusters can have history data.
        valid_states = ['RUNNING', 'WAITING', 'TERMINATED']
        if status not in valid_states:
            logger.warning("⚠️ Cluster %s is in state '%s' - may not have history data or is not a target state for analysis", cluster_id, status)
            return False # Exclude clusters not in RUNNING, WAITING, or TERMINATED


        logger.info("✅ Cluster %s is valid for analysis (Spark: %s, State: %s)", cluster_id, has_spark, status)
        return True


### Spark History Server REST Interaction

In [0]:
import json
from datetime import datetime
from typing import List, Any, Dict, Optional

import requests
import logging

logger = logging.getLogger(__name__)

class SparkHistoryServerClient:
    """Client for interacting with Spark History Server REST API."""

    def __init__(self, base_url: str, session: requests.Session) -> None:
        """
        Initialize the Spark History Server client.

        :param base_url: Base URL for the Spark History Server
        :param session: Configured HTTP session with authentication
        """
        self.base_url = base_url
        self.session = session
        self.api_base = f"{base_url}/api/v1"

    def _make_request(self, endpoint: str, params: Optional[Dict] = None) -> Any:
        """
        Make a REST API request to the Spark History Server.

        :param endpoint: API endpoint (relative to /api/v1)
        :param params: Query parameters
        :returns: JSON response
        :raises requests.exceptions.RequestException: If request fails
        :raises json.JSONDecodeError: If JSON parsing fails
        """
        url = f"{self.api_base}/{endpoint}"
        try:
            response = self.session.get(url, params=params, timeout=30)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            logger.error("❌ Failed to make request to %s: %s", url, str(e), exc_info=True)
            raise
        except json.JSONDecodeError as e:
            logger.error("❌ Failed to parse JSON response from %s: %s", url, str(e), exc_info=True)
            raise

    def get_applications(self, status: Optional[str] = None, limit: int = 100) -> List[Dict]:
        """
        Get list of Spark applications.

        :param status: Filter by status (running, completed, failed)
        :param limit: Maximum number of applications to return
        :returns: List of application metadata
        """
        logger.info("Fetching applications (status: %s, limit: %s)", status, limit)
        params = {}
        if status:
            params['status'] = status
        if limit:
            params['limit'] = limit
        applications = self._make_request("applications", params)
        logger.info("✅ Retrieved %s applications", len(applications))
        return applications

    def get_application_details(self, app_id: str) -> Dict:
        """
        Get detailed information about a specific application.

        :param app_id: Application ID
        :returns: Application details
        """
        logger.info("Fetching application details for: %s", app_id)
        details = self._make_request(f"applications/{app_id}")
        logger.info("✅ Retrieved details for application: %s", app_id)
        return details

    def get_application_jobs(self, app_id: str, attempt_id: Optional[str] = None, status: Optional[str] = None) -> List[Dict]:
        """
        Get jobs for a specific application with enhanced parameters.

        :param app_id: Application ID
        :param attempt_id: The application attempt ID. If None, uses the base application endpoint.
        :param status: Filter by job status (running, succeeded, failed, unknown)
        :returns: List of jobs with ALL available fields
        """
        logger.info("Fetching jobs for application: %s (Attempt ID: %s, Status: %s)", app_id, attempt_id or "N/A", status or "All")
        endpoint = f"applications/{app_id}"
        if attempt_id:
            endpoint += f"/{attempt_id}"
        endpoint += "/jobs"
        
        params = {}
        if status:
            params['status'] = status
            
        jobs = self._make_request(endpoint, params)
        logger.info("✅ Retrieved %s jobs for application: %s", len(jobs), app_id)
        return jobs

    def get_job_details(self, app_id: str, job_id: int, attempt_id: Optional[str] = None) -> Dict:
        """
        Get detailed information about a specific job.

        :param app_id: Application ID
        :param job_id: Job ID
        :param attempt_id: The application attempt ID
        :returns: Job details with ALL available fields
        """
        logger.info("Fetching job details for job %s in application: %s", job_id, app_id)
        endpoint = f"applications/{app_id}"
        if attempt_id:
            endpoint += f"/{attempt_id}"
        endpoint += f"/jobs/{job_id}"
        
        job_details = self._make_request(endpoint)
        logger.info("✅ Retrieved job details for job %s", job_id)
        return job_details

    def get_application_stages(self, app_id: str, attempt_id: Optional[str] = None, 
                             status: Optional[str] = None, details: bool = True, 
                             with_summaries: bool = True, task_status: Optional[List[str]] = None,
                             quantiles: str = "0.0,0.25,0.5,0.75,1.0") -> List[Dict]:
        """
        Get stages for a specific application with ALL available parameters and data.

        :param app_id: Application ID
        :param attempt_id: The application attempt ID
        :param status: Filter by stage status (active, complete, pending, failed)
        :param details: Include task data
        :param with_summaries: Include task metrics distribution and executor metrics distribution
        :param task_status: List of task statuses to filter (RUNNING, SUCCESS, FAILED, KILLED, PENDING)
        :param quantiles: Quantiles for metric summaries
        :returns: List of stages with ALL available fields
        """
        logger.info("Fetching stages for application: %s (Attempt ID: %s)", app_id, attempt_id or "N/A")
        endpoint = f"applications/{app_id}"
        if attempt_id:
            endpoint += f"/{attempt_id}"
        endpoint += "/stages"
        
        params = {}
        if status:
            params['status'] = status
        if details:
            params['details'] = 'true'
        if with_summaries:
            params['withSummaries'] = 'true'
            params['quantiles'] = quantiles
        if task_status:
            for ts in task_status:
                params[f'taskStatus'] = ts  # This will create multiple taskStatus params
                
        stages = self._make_request(endpoint, params)
        logger.info("✅ Retrieved %s stages for application: %s", len(stages), app_id)
        return stages

    def get_stage_details(self, app_id: str, stage_id: int, stage_attempt_id: int = 0,
                         details: bool = True, with_summaries: bool = True,
                         task_status: Optional[List[str]] = None,
                         quantiles: str = "0.0,0.25,0.5,0.75,1.0") -> Dict:
        """
        Get detailed information about a specific stage attempt.

        :param app_id: Application ID
        :param stage_id: Stage ID
        :param stage_attempt_id: Stage attempt ID
        :param details: Include task data
        :param with_summaries: Include metrics distribution
        :param task_status: List of task statuses to filter
        :param quantiles: Quantiles for metric summaries
        :returns: Stage details with ALL available fields
        """
        logger.info("Fetching stage details for stage %s/%s in application: %s", stage_id, stage_attempt_id, app_id)
        endpoint = f"applications/{app_id}/stages/{stage_id}/{stage_attempt_id}"
        
        params = {}
        if details:
            params['details'] = 'true'
        if with_summaries:
            params['withSummaries'] = 'true'
            params['quantiles'] = quantiles
        if task_status:
            for ts in task_status:
                params[f'taskStatus'] = ts
                
        stage_details = self._make_request(endpoint, params)
        logger.info("✅ Retrieved stage details for stage %s/%s", stage_id, stage_attempt_id)
        return stage_details

    def get_stage_tasks(self, app_id: str, stage_id: int, stage_attempt: int = 0,
                       offset: Optional[int] = None, length: Optional[int] = None,
                       sort_by: Optional[str] = None, status: Optional[str] = None) -> List[Dict]:
        """
        Get tasks for a specific stage with ALL available parameters and fields.

        :param app_id: Application ID
        :param stage_id: Stage ID
        :param stage_attempt: Stage attempt number
        :param offset: Offset for pagination
        :param length: Number of tasks to return
        :param sort_by: Sort criteria (runtime, -runtime, etc.)
        :param status: Filter by task status (running, success, killed, failed, unknown)
        :returns: List of tasks with ALL available fields
        """
        logger.info("Fetching tasks for stage %s (attempt %s) in application: %s", stage_id, stage_attempt, app_id)
        endpoint = f"applications/{app_id}/stages/{stage_id}/{stage_attempt}/taskList"
        
        params = {}
        if offset is not None:
            params['offset'] = offset
        if length is not None:
            params['length'] = length
        if sort_by:
            params['sortBy'] = sort_by
        if status:
            params['status'] = status
            
        tasks = self._make_request(endpoint, params)
        logger.info("✅ Retrieved %s tasks for stage %s", len(tasks), stage_id)
        return tasks

    def get_stage_task_summary(self, app_id: str, stage_id: int, stage_attempt: int = 0,
                              quantiles: str = "0.0,0.25,0.5,0.75,1.0") -> Dict:
        """
        Get task summary metrics for a specific stage.

        :param app_id: Application ID
        :param stage_id: Stage ID
        :param stage_attempt: Stage attempt number
        :param quantiles: Quantiles for metric summaries
        :returns: Task summary with ALL available metrics
        """
        logger.info("Fetching task summary for stage %s (attempt %s) in application: %s", stage_id, stage_attempt, app_id)
        endpoint = f"applications/{app_id}/stages/{stage_id}/{stage_attempt}/taskSummary"
        
        params = {'quantiles': quantiles}
        task_summary = self._make_request(endpoint, params)
        logger.info("✅ Retrieved task summary for stage %s", stage_id)
        return task_summary

    def get_application_executors(self, app_id: str, all_executors: bool = True) -> List[Dict]:
        """
        Get executors for a specific application with ALL available fields.

        :param app_id: Application ID
        :param all_executors: If True, get all executors (active and dead), otherwise only active
        :returns: List of executors with ALL available fields
        """
        logger.info("Fetching executors for application: %s (all: %s)", app_id, all_executors)
        endpoint = f"applications/{app_id}/allexecutors" if all_executors else f"applications/{app_id}/executors"
        executors = self._make_request(endpoint)
        logger.info("✅ Retrieved %s executors for application: %s", len(executors), app_id)
        return executors

    def get_application_sql_queries(self, app_id: str, min_duration_minutes: int = 0, 
                                  attempt_id: Optional[str] = None, details: bool = True,
                                  plan_description: bool = True, offset: Optional[int] = None,
                                  length: Optional[int] = None) -> List[Dict]:
        """
        Get SQL queries for a specific application with ALL available parameters and fields.

        :param app_id: Application ID
        :param min_duration_minutes: Minimum duration in minutes for SQL queries to include
        :param attempt_id: The application attempt ID
        :param details: Include details of Spark plan nodes
        :param plan_description: Enable physical plan description
        :param offset: Offset for pagination
        :param length: Number of queries to return
        :returns: List of SQL query data with ALL available fields
        """
        logger.info("Fetching SQL queries for application: %s (min_duration_minutes: %s, Attempt ID: %s)", 
                   app_id, min_duration_minutes, attempt_id or "N/A")
        endpoint = f"applications/{app_id}"
        if attempt_id:
            endpoint += f"/{attempt_id}"
        endpoint += "/sql"
        
        params = {}
        if not details:
            params['details'] = 'false'
        if not plan_description:
            params['planDescription'] = 'false'
        if offset is not None:
            params['offset'] = offset
        if length is not None:
            params['length'] = length
            
        sql_queries = self._make_request(endpoint, params)
        logger.info("✅ Retrieved %s raw SQL queries for application: %s", len(sql_queries), app_id)

        # Filter by duration and include the full JSON
        filtered_queries = []
        for query in sql_queries:
            duration_ms = query.get("duration", 0)
            if duration_ms >= (min_duration_minutes * 60 * 1000):
                filtered_queries.append(query)
        
        logger.info("✅ Filtered to %s SQL queries with duration >= %s minutes for application: %s", 
                   len(filtered_queries), min_duration_minutes, app_id)
        return filtered_queries

    def get_sql_query_details(self, app_id: str, execution_id: int, details: bool = True,
                             plan_description: bool = True) -> Dict:
        """
        Get detailed information about a specific SQL query.

        :param app_id: Application ID
        :param execution_id: SQL execution ID
        :param details: Include metric details
        :param plan_description: Enable physical plan description
        :returns: SQL query details with ALL available fields
        """
        logger.info("Fetching SQL query details for execution %s in application: %s", execution_id, app_id)
        endpoint = f"applications/{app_id}/sql/{execution_id}"
        
        params = {}
        if not details:
            params['details'] = 'false'
        if not plan_description:
            params['planDescription'] = 'false'
            
        query_details = self._make_request(endpoint, params)
        logger.info("✅ Retrieved SQL query details for execution %s", execution_id)
        return query_details

    def get_application_environment(self, app_id: str) -> Dict:
        """
        Get environment details for a specific application.

        :param app_id: Application ID
        :returns: Environment details with ALL available fields
        """
        logger.info("Fetching environment details for application: %s", app_id)
        environment = self._make_request(f"applications/{app_id}/environment")
        logger.info("✅ Retrieved environment details for application: %s", app_id)
        return environment

    def get_application_storage_rdd(self, app_id: str) -> List[Dict]:
        """
        Get RDD storage information for a specific application.

        :param app_id: Application ID
        :returns: List of stored RDDs with ALL available fields
        """
        logger.info("Fetching RDD storage info for application: %s", app_id)
        rdd_storage = self._make_request(f"applications/{app_id}/storage/rdd")
        logger.info("✅ Retrieved RDD storage info for application: %s", app_id)
        return rdd_storage

    def get_rdd_storage_details(self, app_id: str, rdd_id: int) -> Dict:
        """
        Get detailed storage information for a specific RDD.

        :param app_id: Application ID
        :param rdd_id: RDD ID
        :returns: RDD storage details with ALL available fields
        """
        logger.info("Fetching RDD storage details for RDD %s in application: %s", rdd_id, app_id)
        rdd_details = self._make_request(f"applications/{app_id}/storage/rdd/{rdd_id}")
        logger.info("✅ Retrieved RDD storage details for RDD %s", rdd_id)
        return rdd_details

### Metrics Analysis Functions

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType, ArrayType, MapType, IntegerType, BooleanType
from typing import List, Dict, Tuple, Optional, Any
import pandas as pd
import logging
import json

logger = logging.getLogger(__name__)

class SparkMetricsAnalyzer:
    """Analyzer for Spark application metrics and performance data."""

    def __init__(self, spark: SparkSession) -> None:
        """
        Initialize the metrics analyzer.
        :param spark: Spark session
        """
        self.spark = spark

    def _flatten_dict(self, d: Dict, parent_key: str = '', sep: str = '_') -> Dict:
        """
        Flatten a nested dictionary.
        
        :param d: Dictionary to flatten
        :param parent_key: Parent key prefix
        :param sep: Separator for nested keys
        :returns: Flattened dictionary
        """
        items = []
        for k, v in d.items():
            new_key = f"{parent_key}{sep}{k}" if parent_key else k
            if isinstance(v, dict):
                items.extend(self._flatten_dict(v, new_key, sep=sep).items())
            elif isinstance(v, list):
                # Convert lists to JSON strings for storage
                items.append((new_key, json.dumps(v) if v else None))
            else:
                items.append((new_key, v))
        return dict(items)

    def analyze_application_performance(self, app_data: Dict) -> List[Dict]:
        """
        Analyze performance metrics for a single application, extracting ALL available fields.
        
        :param app_data: Application data from Spark History Server
        :returns: List of comprehensive performance analysis results
        """
        app_id = app_data.get('id')
        app_name = app_data.get('name') or 'Unknown'
        attempts = app_data.get('attempts', [])
        
        if not attempts:
            return []

        all_attempts_analysis = []
        for attempt in attempts:
            # Flatten the attempt data to capture ALL fields
            flattened_attempt = self._flatten_dict(attempt)
            
            # Create comprehensive analysis with all available fields
            analysis = {
                'application_id': app_id,
                'application_name': app_name,
                'cluster_id': '',
                'cluster_name': '',
                'raw_json': json.dumps(app_data)  # Store complete JSON for reference
            }
            
            # Add all flattened fields
            analysis.update(flattened_attempt)
            
            all_attempts_analysis.append(analysis)
        
        return all_attempts_analysis

    def analyze_job_performance(self, jobs: List[Dict]) -> List[Dict]:
        """
        Analyze performance metrics for jobs, extracting ALL available fields.
        
        :param jobs: List of job data
        :returns: Comprehensive job performance analysis
        """
        job_analysis = []
        for job in jobs:
            # Flatten the job data to capture ALL fields
            flattened_job = self._flatten_dict(job)
            
            analysis = {
                'cluster_id': '',
                'cluster_name': '',
                'application_id': '',
                'raw_json': json.dumps(job)  # Store complete JSON for reference
            }
            
            # Add all flattened fields
            analysis.update(flattened_job)
            
            job_analysis.append(analysis)
        return job_analysis

    def analyze_stage_performance(self, stages: List[Dict]) -> List[Dict]:
        """
        Analyze performance metrics for stages, extracting ALL available fields.
        
        :param stages: List of stage data
        :returns: Comprehensive stage performance analysis
        """
        stage_analysis = []
        for stage in stages:
            # Flatten the stage data to capture ALL fields
            flattened_stage = self._flatten_dict(stage)
            
            analysis = {
                'cluster_id': '',
                'cluster_name': '',
                'application_id': '',
                'raw_json': json.dumps(stage)  # Store complete JSON for reference
            }
            
            # Add all flattened fields
            analysis.update(flattened_stage)
            
            stage_analysis.append(analysis)
        return stage_analysis

    def analyze_task_performance(self, tasks: List[Dict]) -> List[Dict]:
        """
        Analyze performance metrics for tasks, extracting ALL available fields.
        
        :param tasks: List of task data
        :returns: Comprehensive task performance analysis
        """
        task_analysis = []
        for task in tasks:
            # Flatten the task data to capture ALL fields
            flattened_task = self._flatten_dict(task)
            
            analysis = {
                'cluster_id': '',
                'cluster_name': '',
                'application_id': '',
                'stage_id': '',
                'stage_attempt_id': '',
                'raw_json': json.dumps(task)  # Store complete JSON for reference
            }
            
            # Add all flattened fields
            analysis.update(flattened_task)
            
            task_analysis.append(analysis)
        return task_analysis

    def analyze_executor_performance(self, executors: List[Dict]) -> List[Dict]:
        """
        Analyze performance metrics for executors, extracting ALL available fields.
        
        :param executors: List of executor data
        :returns: Comprehensive executor performance analysis
        """
        executor_analysis = []
        for executor in executors:
            # Flatten the executor data to capture ALL fields
            flattened_executor = self._flatten_dict(executor)
            
            analysis = {
                'cluster_id': '',
                'cluster_name': '',
                'application_id': '',
                'raw_json': json.dumps(executor)  # Store complete JSON for reference
            }
            
            # Add all flattened fields
            analysis.update(flattened_executor)
            
            executor_analysis.append(analysis)
        return executor_analysis

    def analyze_sql_queries(self, sql_queries: List[Dict]) -> List[Dict]:
        """
        Analyze SQL query metrics, extracting ALL available fields.
        
        :param sql_queries: List of SQL query data
        :returns: Comprehensive SQL query analysis
        """
        sql_analysis = []
        for query in sql_queries:
            # Flatten the query data to capture ALL fields
            flattened_query = self._flatten_dict(query)
            
            analysis = {
                'cluster_id': '',
                'cluster_name': '',
                'application_id': '',
                'raw_json': json.dumps(query)  # Store complete JSON for reference
            }
            
            # Add all flattened fields
            analysis.update(flattened_query)
            
            sql_analysis.append(analysis)
        return sql_analysis

    def create_dynamic_dataframes(
        self,
        applications_analysis: List[Dict],
        jobs_analysis: List[Dict],
        stages_analysis: List[Dict],
        tasks_analysis: List[Dict],
        sql_analysis: List[Dict],
        executors_analysis: List[Dict]
        ) -> Tuple[Optional[Any], Optional[Any], Optional[Any], Optional[Any], Optional[Any], Optional[Any]]:
        """
        Create Spark DataFrames from analysis results with dynamic schemas based on actual data.
        This approach captures ALL fields present in the JSON responses.
        
        :param applications_analysis: Application analysis results
        :param jobs_analysis: Job analysis results
        :param stages_analysis: Stage analysis results
        :param tasks_analysis: Task analysis results
        :param sql_analysis: SQL analysis results
        :param executors_analysis: Executor analysis results
        :returns: Tuple of DataFrames with comprehensive schemas
        """
        def create_df_from_data(data: List[Dict], df_name: str):
            if not data:
                return None
            try:
                # Let Spark infer the schema from the data
                df = self.spark.createDataFrame(data)
                logger.info("✅ Created %s DataFrame with %s rows and %s columns", 
                           df_name, df.count(), len(df.columns))
                return df
            except Exception as e:
                logger.error("❌ Failed to create %s DataFrame: %s", df_name, str(e), exc_info=True)
                return None

        # Create DataFrames with dynamic schemas
        apps_df = create_df_from_data(applications_analysis, "applications")
        jobs_df = create_df_from_data(jobs_analysis, "jobs") 
        stages_df = create_df_from_data(stages_analysis, "stages")
        tasks_df = create_df_from_data(tasks_analysis, "tasks")
        sql_df = create_df_from_data(sql_analysis, "sql")
        executors_df = create_df_from_data(executors_analysis, "executors")

        return apps_df, jobs_df, stages_df, tasks_df, sql_df, executors_df

### Cluster Analyzer


In [0]:
import concurrent.futures
from typing import Dict, List, Any, Optional, Tuple
import time
import logging

logger = logging.getLogger(__name__)

def analyze_single_cluster(
    cluster_info: Dict,
    shs_client_class: Any,
    analyzer_class: Any,
    timeout_seconds: int,
    max_applications: int,
    spark_session: 'SparkSession',
    persistent_ui_timeout: int
) -> Dict[str, Any]:
    """
    Analyzes a single EMR cluster, fetching application, job, stage, task, executor, and SQL query data.
    Updated to process applications sequentially to avoid resource limits.
    """
    cluster_id = cluster_info['cluster_id']
    cluster_name = cluster_info['cluster_name']
    cluster_arn = cluster_info['cluster_arn']
    logger.info("🕵️ Starting analysis for cluster: %s (%s)", cluster_name, cluster_id)

    cluster_analysis_results = {
        'cluster_id': cluster_id, 'cluster_name': cluster_name,
        'status': cluster_info.get('status', 'UNKNOWN'),
        'total_applications': 0, 'total_jobs': 0, 'total_stages': 0, 'total_tasks': 0, 'total_sql_queries': 0, 'total_executors': 0,
        'analysis_status': 'FAILED', 'applications': [], 'jobs': [], 'stages': [], 'tasks': [],
        'sql_queries': [], 'executors': [], 'applications_endpoint_success': None,
        'jobs_endpoint_success': None, 'sql_endpoint_success': None, 'executors_endpoint_success': None, 'error_message': ""
    }

    try:
        server_config = ServerConfig(emr_cluster_arn=cluster_arn, timeout=timeout_seconds)
        emr_client = EMRPersistentUIClient(server_config)
        logger.info("🔑 Initializing EMR Persistent UI connection for %s...", cluster_name)
        base_url, session = emr_client.initialize(max_wait_time=persistent_ui_timeout)
        logger.info("📊 Setting up Spark History Server client for %s...", cluster_name)
        shs_client = shs_client_class(base_url, session)
        analyzer = analyzer_class(spark_session)

        logger.info("📄 Fetching Spark applications for %s...", cluster_name)
        try:
            applications = shs_client.get_applications(limit=max_applications)
            cluster_analysis_results['applications_endpoint_success'] = True
        except Exception as app_ex:
            cluster_analysis_results['applications_endpoint_success'] = False
            cluster_analysis_results['error_message'] += f"Applications endpoint error: {str(app_ex)}; "
            logger.error("❌ Failed to fetch applications for %s: %s", cluster_name, str(app_ex), exc_info=True)
            return cluster_analysis_results

        if not applications:
            logger.warning("⚠️ No applications found in Spark History Server for %s", cluster_name)
            cluster_analysis_results['analysis_status'] = 'NO_APPLICATIONS'
            return cluster_analysis_results

        logger.info("✅ Found %s applications to analyze in %s", len(applications), cluster_name)

        # Process applications sequentially instead of concurrently
        for app in applications:
            app_id = app.get('id')
            try:
                app_data = process_single_application(app_id, shs_client, analyzer, cluster_analysis_results)
                if app_data:
                    cluster_analysis_results['applications'].extend(app_data['applications'])
                    cluster_analysis_results['jobs'].extend(app_data['jobs'])
                    cluster_analysis_results['stages'].extend(app_data['stages'])
                    cluster_analysis_results['tasks'].extend(app_data['tasks'])  # ADD TASKS
                    cluster_analysis_results['sql_queries'].extend(app_data['sql_queries'])
                    cluster_analysis_results['executors'].extend(app_data['executors'])
                    cluster_analysis_results['total_applications'] += 1
                    cluster_analysis_results['total_jobs'] += len(app_data['jobs'])
                    cluster_analysis_results['total_stages'] += len(app_data['stages'])
                    cluster_analysis_results['total_tasks'] += len(app_data['tasks'])  # ADD TASKS COUNT
                    cluster_analysis_results['total_sql_queries'] += len(app_data['sql_queries'])
                    cluster_analysis_results['total_executors'] += len(app_data['executors'])
            except Exception as app_e:
                logger.error(" ❌ Error analyzing application %s in cluster %s: %s", app_id, cluster_name, str(app_e), exc_info=True)
                continue # Continue with next application

        cluster_analysis_results['analysis_status'] = 'COMPLETED'
        logger.info("✅ Completed analysis for cluster: %s", cluster_name)
        return cluster_analysis_results
    except Exception as e:
        logger.error("❌ Failed to analyze cluster %s: %s", cluster_name, str(e), exc_info=True)
        cluster_analysis_results['analysis_status'] = 'FAILED'
        cluster_analysis_results['error_message'] += f"Cluster analysis error: {str(e)}"
        return cluster_analysis_results

def process_single_application(app_id: str, shs_client: Any, analyzer: Any, cluster_results: Dict) -> Optional[Dict[str, Any]]:
    """
    Processes a single Spark application's data with comprehensive data extraction.
    """
    logger.info("🔍 Analyzing application: %s", app_id)
    app_results = {'applications': [], 'jobs': [], 'stages': [], 'tasks': [], 'sql_queries': [], 'executors': []}

    try:
        app_details = shs_client.get_application_details(app_id)
        latest_attempt_id_to_use = None
        attempts = app_details.get('attempts')

        if attempts and isinstance(attempts, list) and len(attempts) > 0:
            app_results['applications'] = analyzer.analyze_application_performance(app_details)
            attempt_ids = [int(a['attemptId']) for a in attempts if a.get('attemptId')]
            if attempt_ids:
                latest_attempt_id = max(attempt_ids)
                if latest_attempt_id > 1:
                    latest_attempt_id_to_use = str(latest_attempt_id)
        else:
            logger.warning("⚠️ Application %s has no attempts data. Skipping further analysis.", app_id)
            return None

        # Enhanced Jobs Analysis
        try:
            jobs = shs_client.get_application_jobs(app_id, attempt_id=latest_attempt_id_to_use)
            if jobs:
                app_results['jobs'] = analyzer.analyze_job_performance(jobs)
                # Get detailed job information for each job
                for job in jobs:
                    job_id = job.get('jobId')
                    if job_id is not None:
                        try:
                            job_details = shs_client.get_job_details(app_id, job_id, latest_attempt_id_to_use)
                            # Could extend job analysis with detailed data here
                        except Exception as job_detail_ex:
                            logger.warning("⚠️ Failed to fetch job details for job %s: %s", job_id, str(job_detail_ex))

            if cluster_results['jobs_endpoint_success'] is None:
                cluster_results['jobs_endpoint_success'] = True
        except Exception as job_ex:
            if cluster_results['jobs_endpoint_success'] is None:
                cluster_results['jobs_endpoint_success'] = False
            cluster_results['error_message'] += f"Jobs endpoint error: {str(job_ex)}; "
            logger.error(" ❌ Failed to fetch jobs for application %s: %s", app_id, str(job_ex), exc_info=True)

        # Enhanced Stages Analysis with comprehensive data
        try:
            stages = shs_client.get_application_stages(
                app_id,
                attempt_id=latest_attempt_id_to_use,
                details=True,
                with_summaries=True,
                quantiles="0.0,0.25,0.5,0.75,1.0"
            )

            if stages:
                app_results['stages'] = analyzer.analyze_stage_performance(stages)

                # Get tasks for each stage
                for stage in stages:
                    stage_id = stage.get('stageId')
                    stage_attempt_id = stage.get('attemptId', 0)
                    if stage_id is not None:
                        try:
                            # Get all tasks for this stage
                            tasks = shs_client.get_stage_tasks(app_id, stage_id, stage_attempt_id)
                            if tasks:
                                # Add stage context to tasks
                                for task in tasks:
                                    task['stage_id'] = stage_id
                                    task['stage_attempt_id'] = stage_attempt_id
                                task_analysis = analyzer.analyze_task_performance(tasks)
                                app_results['tasks'].extend(task_analysis)

                            # Get task summary
                            task_summary = shs_client.get_stage_task_summary(app_id, stage_id, stage_attempt_id)
                            # Could store task summary data separately if needed
                        except Exception as task_ex:
                            logger.warning("⚠️ Failed to fetch tasks for stage %s: %s", stage_id, str(task_ex))

        except Exception as stage_ex:
            logger.error(" ❌ Failed to fetch stages for application %s: %s", app_id, str(stage_ex), exc_info=True)

        # Enhanced SQL Analysis
        try:
            sql_queries = shs_client.get_application_sql_queries(
                app_id,
                attempt_id=latest_attempt_id_to_use,
                details=True,
                plan_description=True
            )

            if sql_queries:
                app_results['sql_queries'] = analyzer.analyze_sql_queries(sql_queries)

                # Get detailed information for each SQL query
                for query in sql_queries:
                    execution_id = query.get('id')
                    if execution_id is not None:
                        try:
                            query_details = shs_client.get_sql_query_details(app_id, execution_id)
                            # Could extend SQL analysis with detailed data here
                        except Exception as sql_detail_ex:
                            logger.warning("⚠️ Failed to fetch SQL query details for execution %s: %s", execution_id, str(sql_detail_ex))

            if cluster_results['sql_endpoint_success'] is None:
                cluster_results['sql_endpoint_success'] = True
        except Exception as sql_ex:
            if cluster_results['sql_endpoint_success'] is None:
                cluster_results['sql_endpoint_success'] = False
            cluster_results['error_message'] += f"SQL endpoint error: {str(sql_ex)}; "
            logger.error(" ❌ Failed to fetch SQL queries for application %s: %s", app_id, str(sql_ex), exc_info=True)

        # Enhanced Executor Analysis
        try:
            executors = shs_client.get_application_executors(app_id, all_executors=True)
            if executors:
                app_results['executors'] = analyzer.analyze_executor_performance(executors)
            if cluster_results['executors_endpoint_success'] is None:
                cluster_results['executors_endpoint_success'] = True
        except Exception as executor_ex:
            if cluster_results['executors_endpoint_success'] is None:
                cluster_results['executors_endpoint_success'] = False
            cluster_results['error_message'] += f"Executors endpoint error: {str(executor_ex)}; "
            logger.error(" ❌ Failed to fetch executors for application %s: %s", app_id, str(executor_ex), exc_info=True)

        return app_results

    except Exception as e:
        logger.error(" ❌ Failed to analyze application %s: %s", app_id, str(e), exc_info=True)
        return None

def process_clusters_in_batches(
    clusters_to_analyze: List[Dict],
    batch_size: int,
    batch_delay_seconds: int,
    analyzer_instance: Any,
    spark_session: Any
) -> Tuple[List[Dict], List[Dict], List[Dict], List[Dict], List[Dict], List[Dict], List[Dict]]:  # UPDATED RETURN TYPE
    """
    Process clusters in sequential batches to avoid hitting the 200 Persistent UI limit.
    """
    all_applications_analysis = []
    all_jobs_analysis = []
    all_stages_analysis = []
    all_tasks_analysis = []  # ADDED TASKS
    all_sql_analysis = []
    all_executors_analysis = []
    cluster_summaries = []

    total_batches = (len(clusters_to_analyze) + batch_size - 1) // batch_size

    for batch_num in range(0, len(clusters_to_analyze), batch_size):
        batch_clusters = clusters_to_analyze[batch_num:batch_num + batch_size]
        current_batch = (batch_num // batch_size) + 1

        logger.info("🔄 Processing batch %d/%d: clusters %d-%d",
                    current_batch, total_batches,
                    batch_num + 1, min(batch_num + batch_size, len(clusters_to_analyze)))

        # Process this batch with limited concurrency
        with concurrent.futures.ThreadPoolExecutor(max_workers=len(batch_clusters)) as executor:
            batch_futures = {
                executor.submit(
                    analyze_single_cluster,
                    c_info,
                    SparkHistoryServerClient,
                    SparkMetricsAnalyzer,
                    TIMEOUT_SECONDS,
                    MAX_APPLICATIONS,
                    spark_session,
                    PERSISTENT_UI_TIMEOUT_SECONDS
                ): c_info['cluster_id']
                for c_info in batch_clusters
            }

            for future in concurrent.futures.as_completed(batch_futures):
                cluster_id = batch_futures[future]
                try:
                    cluster_results = future.result()
                    if cluster_results:
                        cluster_summaries.append({
                            'cluster_id': cluster_results['cluster_id'],
                            'cluster_name': cluster_results['cluster_name'],
                            'status': cluster_results['status'],
                            'analysis_status': cluster_results['analysis_status'],
                            'status_details': summarize_cluster_status(cluster_results),
                            'total_applications': cluster_results['total_applications'],
                            'total_jobs': cluster_results['total_jobs'],
                            'total_stages': cluster_results['total_stages'],
                            'total_tasks': cluster_results['total_tasks'],  # ADDED TASKS
                            'total_sql_queries': cluster_results['total_sql_queries'],
                            'total_executors': cluster_results['total_executors']
                        })

                        # Add cluster info to all results - UPDATED TO INCLUDE TASKS
                        for item_list, key in [(all_applications_analysis, 'applications'),
                                              (all_jobs_analysis, 'jobs'),
                                              (all_stages_analysis, 'stages'),
                                              (all_tasks_analysis, 'tasks'),  # ADDED TASKS
                                              (all_sql_analysis, 'sql_queries'),
                                              (all_executors_analysis, 'executors')]:
                            for data in cluster_results[key]:
                                data['cluster_id'] = cluster_results['cluster_id']
                                data['cluster_name'] = cluster_results['cluster_name']
                                item_list.append(data)
                except Exception as e:
                    logger.error("❌ Error processing results for cluster %s: %s", cluster_id, str(e), exc_info=True)
                    # Add failed cluster to summaries
                    if not any(s['cluster_id'] == cluster_id for s in cluster_summaries):
                        cluster_name = next((c['cluster_name'] for c in batch_clusters if c['cluster_id'] == cluster_id), 'Unknown')
                        cluster_summaries.append({
                            'cluster_id': cluster_id,
                            'cluster_name': cluster_name,
                            'status': 'FAILED',
                            'total_applications': 0,
                            'total_jobs': 0,
                            'total_stages': 0,
                            'total_tasks': 0,  # ADDED TASKS
                            'total_sql_queries': 0,
                            'total_executors': 0,
                            'analysis_status': 'FAILED',
                            'status_details': f"Error processing cluster results: {str(e)}"
                        })

        # Wait between batches to allow AWS cleanup (except for the last batch)
        if current_batch < total_batches:
            logger.info("⏳ Waiting %d seconds between batches to allow AWS Persistent UI cleanup...", batch_delay_seconds)
            time.sleep(batch_delay_seconds)

    # UPDATED RETURN STATEMENT TO INCLUDE TASKS
    return all_applications_analysis, all_jobs_analysis, all_stages_analysis, all_tasks_analysis, all_sql_analysis, all_executors_analysis, cluster_summaries

def summarize_cluster_status(cluster_results: Dict) -> str:
    """
    Generate status details based on cluster analysis results.
    """
    status_details = ""
    if cluster_results['analysis_status'] == 'FAILED':
        status_details = cluster_results.get('error_message', 'Unknown error - see logs')
    elif cluster_results['analysis_status'] == 'COMPLETED':
        endpoint_statuses = []
        if cluster_results.get('applications_endpoint_success') is not None:
            endpoint_statuses.append(f"applications: {'OK' if cluster_results['applications_endpoint_success'] else 'FAILED'}")
        if cluster_results.get('jobs_endpoint_success') is not None:
            endpoint_statuses.append(f"jobs: {'OK' if cluster_results['jobs_endpoint_success'] else 'FAILED'}")
        if cluster_results.get('sql_endpoint_success') is not None:
            endpoint_statuses.append(f"sql: {'OK' if cluster_results['sql_endpoint_success'] else 'FAILED'}")
        if cluster_results.get('executors_endpoint_success') is not None:
            endpoint_statuses.append(f"executors: {'OK' if cluster_results['executors_endpoint_success'] else 'FAILED'}")
        status_details = "; ".join(endpoint_statuses) if endpoint_statuses else "Endpoints not checked"
    elif cluster_results['analysis_status'] == 'NO_APPLICATIONS':
        status_details = "No applications found in Spark History Server"
    return status_details


### Main Execution

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from typing import Dict, Any
import logging

logger = logging.getLogger(__name__)

def main_analysis() -> Dict[str, Any]:
    """
    Main function to execute the complete Spark History Server analysis.
    Updated to use sequential batch processing to avoid 200 Persistent UI limit.
    """
    try:
        logger.info("🚀 Starting EMR Spark History Server Analysis")

        clusters_to_analyze = []
        if EMR_CLUSTER_ARN:
            logger.info("🎯 Single cluster mode - using provided EMR Cluster ARN")
            cluster_id = EMR_CLUSTER_ARN.split('/')[-1]
            clusters_to_analyze.append({'cluster_id': cluster_id, 'cluster_name': 'User-Specified', 'cluster_arn': EMR_CLUSTER_ARN, 'status': 'UNKNOWN'})
        else:
            logger.info("🌐 Multi-cluster discovery mode - searching for EMR clusters")
            discovery = EMRClusterDiscovery(AWS_REGION)
            discovered_clusters = discovery.discover_clusters(
                states=CLUSTER_STATES_LIST,
                name_filter=CLUSTER_NAME_FILTER,
                max_clusters=MAX_CLUSTERS,
                created_after=PARSED_CREATED_AFTER_DATE,
                created_before=PARSED_CREATED_BEFORE_DATE
            )

            if not discovered_clusters:
                logger.warning("⚠️ No clusters found matching criteria")
                return {'cluster_summaries_df': None, 'applications_df': None, 'jobs_df': None, 'stages_df': None, 'tasks_df': None, 'sql_df': None, 'executors_df': None, 'summary': {'total_clusters_analyzed': 0, 'total_applications': 0, 'total_jobs': 0, 'total_stages': 0, 'total_tasks': 0, 'total_sql_queries': 0, 'total_executors': 0}}

            for cluster_summary in discovered_clusters:
                try:
                    cluster_details = discovery.get_cluster_details(cluster_summary['cluster_id'])
                    if discovery.validate_cluster_for_analysis(cluster_details):
                        clusters_to_analyze.append({'cluster_id': cluster_details['cluster_id'], 'cluster_name': cluster_details['cluster_name'], 'cluster_arn': cluster_details['cluster_arn'], 'status': cluster_details['status'], 'details': cluster_details})
                except Exception as e:
                    logger.error("❌ Failed to validate cluster %s: %s", cluster_summary['cluster_id'], str(e), exc_info=True)
                    continue

        if not clusters_to_analyze:
            logger.error("❌ No valid clusters found for analysis")
            return {'cluster_summaries_df': None, 'applications_df': None, 'jobs_df': None, 'stages_df': None, 'tasks_df': None, 'sql_df': None, 'executors_df': None, 'summary': {'total_clusters_analyzed': 0, 'total_applications': 0, 'total_jobs': 0, 'total_stages': 0, 'total_tasks': 0, 'total_sql_queries': 0, 'total_executors': 0}}

        logger.info("📊 Will analyze %s cluster(s) in batches of %s", len(clusters_to_analyze), BATCH_SIZE)

        spark = SparkSession.builder.appName("EMR-Multi-Cluster-Spark-History-Analysis").getOrCreate()
        analyzer_instance = SparkMetricsAnalyzer(spark)

        # Process clusters in batches - UPDATED TO INCLUDE TASKS
        all_applications_analysis, all_jobs_analysis, all_stages_analysis, all_tasks_analysis, all_sql_analysis, all_executors_analysis, cluster_summaries = process_clusters_in_batches(
            clusters_to_analyze, BATCH_SIZE, BATCH_DELAY_SECONDS, analyzer_instance, spark
        )

        logger.info("📊 Creating analysis DataFrames...")
        # UPDATED TO INCLUDE TASKS
        apps_df, jobs_df, stages_df, tasks_df, sql_df, executors_df = analyzer_instance.create_dynamic_dataframes(
            all_applications_analysis, all_jobs_analysis, all_stages_analysis, all_tasks_analysis, all_sql_analysis, all_executors_analysis
        )

        cluster_summary_df = None
        if cluster_summaries:
            cluster_summary_schema = StructType([
                StructField('cluster_id', StringType(), True),
                StructField('cluster_name', StringType(), True),
                StructField('status', StringType(), True),
                StructField('analysis_status', StringType(), True),
                StructField('status_details', StringType(), True),
                StructField('total_applications', IntegerType(), True),
                StructField('total_jobs', IntegerType(), True),
                StructField('total_stages', IntegerType(), True),
                StructField('total_tasks', IntegerType(), True),  # ADDED TASKS
                StructField('total_sql_queries', IntegerType(), True),
                StructField('total_executors', IntegerType(), True)
            ])
            cluster_summary_df = spark.createDataFrame(cluster_summaries, schema=cluster_summary_schema)

        logger.info("📝 Multi-Cluster Analysis Complete - Summary Results:")
        print("\n" + "="*100 + "\n📊 CLUSTER ANALYSIS SUMMARY\n" + "="*100)
        if cluster_summary_df:
            print(f"Cluster Summary ({cluster_summary_df.count()} clusters):")
            cluster_summary_df.show(truncate=False)

        if apps_df and apps_df.count() > 0:
            logger.info(" • Total Applications analyzed: %s", apps_df.count())
            print("\n=== TOP 10 APPLICATIONS BY DURATION (ACROSS ALL CLUSTERS) ===")
            apps_df.select("cluster_name", "application_name", "attemptId", "duration").orderBy("duration", ascending=False).show(10, truncate=False)

        if jobs_df and jobs_df.count() > 0:
            logger.info(" • Total Jobs analyzed: %s", jobs_df.count())
            print("\n=== JOB SUCCESS RATES BY CLUSTER ===")
            jobs_df.select("cluster_name", "application_id", "name", "status", "numTasks", "numCompletedTasks").orderBy("numTasks", ascending=False).show(10, truncate=False)

        if stages_df and stages_df.count() > 0:
            logger.info(" • Total Stages analyzed: %s", stages_df.count())
            print("\n=== TOP 10 STAGES BY DATA PROCESSED (ACROSS ALL CLUSTERS) ===")
            stages_df.select("cluster_name", "application_id", "name", "inputBytes", "outputBytes").orderBy("inputBytes", ascending=False).show(10, truncate=False)

        # ADDED TASKS ANALYSIS
        if tasks_df and tasks_df.count() > 0:
            logger.info(" • Total Tasks analyzed: %s", tasks_df.count())
            print("\n=== TOP 10 TASKS BY RUNTIME (ACROSS ALL CLUSTERS) ===")
            tasks_df.select("cluster_name", "application_id", "stage_id", "taskId", "duration").orderBy("duration", ascending=False).show(10, truncate=False)

        if sql_df and sql_df.count() > 0:
            logger.info(" • Total SQL Queries analyzed: %s", sql_df.count())
            print("\n=== TOP 10 SQL QUERIES BY DURATION (ACROSS ALL CLUSTERS) ===")
            sql_df.select("cluster_name", "application_id", "id", "description", "duration").orderBy("duration", ascending=False).show(10, truncate=False)

        if executors_df and executors_df.count() > 0:
            logger.info(" • Total Executors analyzed: %s", executors_df.count())
            print("\n=== TOP 10 EXECUTORS BY MEMORY USAGE (ACROSS ALL CLUSTERS) ===")
            executors_df.select("cluster_name", "application_id", "id", "hostPort", "memoryUsed", "totalCores").orderBy("memoryUsed", ascending=False).show(10, truncate=False)

        logger.info("🎉 Multi-cluster analysis complete! All data returned as DataFrames.")
        
        # UPDATED SUMMARY TO INCLUDE TASKS
        summary = {
            'total_clusters_analyzed': len([c for c in cluster_summaries if c['analysis_status'] == 'COMPLETED']),
            'total_applications': len(all_applications_analysis),
            'total_jobs': len(all_jobs_analysis),
            'total_stages': len(all_stages_analysis),
            'total_tasks': len(all_tasks_analysis),  # ADDED TASKS
            'total_sql_queries': len(all_sql_analysis),
            'total_executors': len(all_executors_analysis)
        }

        # UPDATED RETURN TO INCLUDE TASKS
        return {'cluster_summaries_df': cluster_summary_df, 'applications_df': apps_df, 'jobs_df': jobs_df, 'stages_df': stages_df, 'tasks_df': tasks_df, 'sql_df': sql_df, 'executors_df': executors_df, 'summary': summary}

    except Exception as e:
        logger.error("❌ Multi-cluster analysis failed: %s", str(e), exc_info=True)
        raise

try:
    results = main_analysis()
    cluster_summaries_df = results['cluster_summaries_df']
    applications_df = results['applications_df']
    jobs_df = results['jobs_df']
    stages_df = results['stages_df']
    tasks_df = results['tasks_df']  # ADDED TASKS
    sql_df = results['sql_df']
    executors_df = results['executors_df']
    analysis_summary = results['summary']

    print("\n" + "="*100 + "\n✨ EMR MULTI-CLUSTER SPARK HISTORY SERVER ANALYSIS COMPLETED! ✨\n" + "="*100)
    # UPDATED PRINT STATEMENT TO INCLUDE TASKS
    print(f"✅ Analyzed {analysis_summary['total_clusters_analyzed']} clusters\n✅ Analyzed {analysis_summary['total_applications']} applications\n✅ Analyzed {analysis_summary['total_jobs']} jobs\n✅ Analyzed {analysis_summary['total_stages']} stages\n✅ Analyzed {analysis_summary['total_tasks']} tasks\n✅ Analyzed {analysis_summary['total_sql_queries']} SQL queries\n✅ Analyzed {analysis_summary['total_executors']} executors")

    # UPDATED TO INCLUDE TASKS
    print("\n📊 DataFrames available for analysis:\n • cluster_summaries_df\n • applications_df\n • jobs_df\n • stages_df\n • tasks_df\n • sql_df\n • executors_df\n • analysis_summary")

    print("\n💡 Example usage:\n applications_df.filter(col('duration') > 10000).show()\n stages_df.groupBy('cluster_name').agg(sum('inputBytes')).show()\n sql_df.select('description', 'raw_json').show(truncate=False)\n executors_df.select('cluster_name', 'id', 'memoryUsed', 'totalCores').show()\n tasks_df.select('cluster_name', 'taskId', 'duration', 'status').show()\n" + "="*100)

    if ENVIRONMENT == "prod" and S3_OUTPUT_PATH:
        logger.info("📤 Writing analysis results to S3: %s", S3_OUTPUT_PATH)
        try:
            if cluster_summaries_df and cluster_summaries_df.count() > 0:
                cluster_summaries_df.write.mode("overwrite").parquet(f"{S3_OUTPUT_PATH}/cluster_summaries/")
            if applications_df and applications_df.count() > 0:
                applications_df.write.mode("overwrite").parquet(f"{S3_OUTPUT_PATH}/applications/")
            if jobs_df and jobs_df.count() > 0:
                jobs_df.write.mode("overwrite").parquet(f"{S3_OUTPUT_PATH}/jobs/")
            if stages_df and stages_df.count() > 0:
                stages_df.write.mode("overwrite").parquet(f"{S3_OUTPUT_PATH}/stages/")
            if tasks_df and tasks_df.count() > 0:  # ADDED TASKS S3 WRITE
                tasks_df.write.mode("overwrite").parquet(f"{S3_OUTPUT_PATH}/tasks/")
            if sql_df and sql_df.count() > 0:
                sql_df.write.mode("overwrite").parquet(f"{S3_OUTPUT_PATH}/sql/")
            if executors_df and executors_df.count() > 0:
                executors_df.write.mode("overwrite").parquet(f"{S3_OUTPUT_PATH}/executors/")
            logger.info("✅ All analysis results successfully written to S3")
        except Exception as s3_error:
            logger.error("❌ Failed to write results to S3: %s", str(s3_error), exc_info=True)
            print(f"⚠️ Warning: S3 write failed, but analysis completed successfully. Error: {str(s3_error)}")
    elif ENVIRONMENT == "dev":
        logger.info("🧪 Running in DEV mode - S3 output skipped")
        print("🧪 DEV Mode: Analysis results available in DataFrames only (no S3 output)")
    else:
        logger.info("🚫 PROD mode but no S3 output path specified - S3 output skipped")
        print("⚠️ PROD mode detected but no S3 output path provided - results available in DataFrames only")

except Exception as e:
    print(f"\n❌ ANALYSIS FAILED: {str(e)}\nPlease check the logs above for detailed error information.")


2025-08-29 19:15:04,436 - INFO - 🚀 Starting EMR Spark History Server Analysis
2025-08-29 19:15:04,437 - INFO - 🌐 Multi-cluster discovery mode - searching for EMR clusters
2025-08-29 19:15:04,439 - INFO - 🔍 Discovering EMR clusters in region: us-east-1
2025-08-29 19:15:04,440 - INFO -  States: ['TERMINATED']
2025-08-29 19:15:04,440 - INFO -  Name filter: None
2025-08-29 19:15:04,440 - INFO -  Max clusters: 100
2025-08-29 19:15:04,440 - INFO -  Created After: None
2025-08-29 19:15:04,440 - INFO -  Created Before: None
2025-08-29 19:15:04,537 - INFO - Received command c on object id p0
2025-08-29 19:15:04,681 - INFO - ✅ Discovered 100 clusters
2025-08-29 19:15:04,681 - INFO -  1. spark-canary-job-aws @ 2025-08-29T17:13:00+00:00 :: Airflow :: Test (j-2LDTC0KC4RWZR) - TERMINATED
2025-08-29 19:15:04,681 - INFO -  2. KongmingEtlCluster @ 2025-08-27T10:00:00+00:00 :: Airflow :: Test (j-1HX6NYOHT3PI3) - TERMINATED
2025-08-29 19:15:04,681 - INFO -  3. KongmingEtlCluster @ 2025-08-27T10:00:00+00:


📊 CLUSTER ANALYSIS SUMMARY
Cluster Summary (100 clusters):


2025-08-29 19:18:47,537 - INFO - Received command c on object id p0


+---------------+------------------------------------------------------------------------------------------+----------+---------------+---------------------------------------------------------------------------------------------------------------------------------+------------------+----------+------------+-----------+-----------------+---------------+
|cluster_id     |cluster_name                                                                              |status    |analysis_status|status_details                                                                                                                   |total_applications|total_jobs|total_stages|total_tasks|total_sql_queries|total_executors|
+---------------+------------------------------------------------------------------------------------------+----------+---------------+---------------------------------------------------------------------------------------------------------------------------------+------------------+------

2025-08-29 19:18:48,035 - INFO -  • Total Applications analyzed: 22



=== TOP 10 APPLICATIONS BY DURATION (ACROSS ALL CLUSTERS) ===


2025-08-29 19:18:48,341 - INFO - 🎉 Multi-cluster analysis complete! All data returned as DataFrames.
2025-08-29 19:18:48,344 - INFO - 🧪 Running in DEV mode - S3 output skipped


+-------------------------------------------------------------------+------------------------------------------------------+---------+--------+
|cluster_name                                                       |application_name                                      |attemptId|duration|
+-------------------------------------------------------------------+------------------------------------------------------+---------+--------+
|Gautam: CleanRoomUserIdSellerAgg                                   |CleanRoomUserIdSellerAgg                              |1        |2067182 |
|KongmingEtlCluster @ 2025-08-27T10:00:00+00:00 :: Airflow :: Test  |DailyScoreSet-class-job.DailyOfflineScoringSet        |1        |328898  |
|KongmingEtlCluster @ 2025-08-27T10:00:00+00:00 :: Airflow :: Test  |DailyScoreSet-class-job.DailyOfflineScoringSet        |1        |319869  |
|KongmingEtlCluster @ 2025-08-27T10:00:00+00:00 :: Airflow :: Test  |DailyScoreSet-class-job.DailyOfflineScoringSet        |1        |31

### Table Exploration

In [0]:
display(analysis_summary)

{'total_clusters_analyzed': 16,
 'total_applications': 22,
 'total_jobs': 0,
 'total_stages': 0,
 'total_tasks': 0,
 'total_sql_queries': 0,
 'total_executors': 0}

In [0]:
display(cluster_summaries_df)

cluster_id,cluster_name,status,analysis_status,status_details,total_applications,total_jobs,total_stages,total_tasks,total_sql_queries,total_executors
j-ZBIDJZSYAUC5,spark-canary-job-aws @ 2025-08-26T19:23:33.087323+00:00 :: Airflow :: Test,TERMINATED,FAILED,Cluster analysis error: 502 Server Error: Bad Gateway for url: https://p-zbidjzsyauc5.emrappui-prod.us-east-1.amazonaws.com/shs/,0,0,0,0,0,0
j-XWR1ATD9D25P,KongmingETLClusterCalibration @ 2025-08-25T10:00:00+00:00 :: Airflow :: Test,TERMINATED,NO_APPLICATIONS,No applications found in Spark History Server,0,0,0,0,0,0
j-2LDTC0KC4RWZR,spark-canary-job-aws @ 2025-08-29T17:13:00+00:00 :: Airflow :: Test,TERMINATED,COMPLETED,applications: OK; jobs: FAILED; sql: FAILED; executors: FAILED,1,0,0,0,0,0
j-CK04NK3092PH,spark-canary-job-aws @ 2025-08-25T17:15:55.695896+00:00 :: Airflow :: Test,TERMINATED,COMPLETED,applications: OK; jobs: FAILED; sql: FAILED; executors: FAILED,1,0,0,0,0,0
j-1SRRJD4VIGE8I,spark-canary-job-aws @ 2025-08-28T16:24:51.236322+00:00 :: Airflow :: Test,TERMINATED,COMPLETED,applications: OK; jobs: FAILED; sql: FAILED; executors: FAILED,1,0,0,0,0,0
j-1JGD3V2O0786C,PreprocessPreBidCluster @ 2025-08-18T08:00:00+00:00 :: Airflow :: Test,TERMINATED,FAILED,Cluster analysis error: 502 Server Error: Bad Gateway for url: https://p-1jgd3v2o0786c.emrappui-prod.us-east-1.amazonaws.com/shs/,0,0,0,0,0,0
j-1S25M8TOS4B9V,KongmingEtlCluster @ 2025-08-27T10:00:00+00:00 :: Airflow :: Test,TERMINATED,COMPLETED,applications: OK; jobs: FAILED; sql: FAILED; executors: FAILED,2,0,0,0,0,0
j-CJVZOKNNY5Y3,PreprocessPreBidCluster @ 2025-08-19T08:00:00+00:00 :: Airflow :: Test,TERMINATED,FAILED,Cluster analysis error: 502 Server Error: Bad Gateway for url: https://p-cjvzoknny5y3.emrappui-prod.us-east-1.amazonaws.com/shs/,0,0,0,0,0,0
j-2LQQYXCT2O98L,KongmingEtlCluster @ 2025-08-27T10:00:00+00:00 :: Airflow :: Test,TERMINATED,COMPLETED,applications: OK; jobs: FAILED; sql: FAILED; executors: FAILED,2,0,0,0,0,0
j-2BH4MH6ZYF1NN,ContainmentRecordsPreBid @ 2025-08-19T08:00:00+00:00 :: Airflow :: Test,TERMINATED,FAILED,Cluster analysis error: 502 Server Error: Bad Gateway for url: https://p-2bh4mh6zyf1nn.emrappui-prod.us-east-1.amazonaws.com/shs/,0,0,0,0,0,0


In [0]:
display(applications_df)

appSparkVersion,application_id,application_name,attemptId,cluster_id,cluster_name,completed,duration,endTime,endTimeEpoch,lastUpdated,lastUpdatedEpoch,raw_json,sparkUser,startTime,startTimeEpoch
3.3.2-amzn-0.1,application_1756490232506_0001,CanaryPipeline-class-jobs.dataproc.canary.CanaryJob,1,j-2LDTC0KC4RWZR,spark-canary-job-aws @ 2025-08-29T17:13:00+00:00 :: Airflow :: Test,True,22520,2025-08-29T17:58:50.155GMT,1756490330155,2025-08-29T17:59:52.000GMT,1756490392000,"{""id"": ""application_1756490232506_0001"", ""name"": ""CanaryPipeline-class-jobs.dataproc.canary.CanaryJob"", ""attempts"": [{""attemptId"": ""1"", ""startTime"": ""2025-08-29T17:58:27.635GMT"", ""endTime"": ""2025-08-29T17:58:50.155GMT"", ""lastUpdated"": ""2025-08-29T17:59:52.000GMT"", ""duration"": 22520, ""sparkUser"": ""hadoop"", ""completed"": true, ""appSparkVersion"": ""3.3.2-amzn-0.1"", ""startTimeEpoch"": 1756490307635, ""lastUpdatedEpoch"": 1756490392000, ""endTimeEpoch"": 1756490330155}]}",hadoop,2025-08-29T17:58:27.635GMT,1756490307635
3.3.2-amzn-0.1,application_1756238221578_0001,CanaryPipeline-class-jobs.dataproc.canary.CanaryJob,1,j-CK04NK3092PH,spark-canary-job-aws @ 2025-08-25T17:15:55.695896+00:00 :: Airflow :: Test,True,21512,2025-08-26T19:58:44.682GMT,1756238324682,2025-08-26T19:59:39.000GMT,1756238379000,"{""id"": ""application_1756238221578_0001"", ""name"": ""CanaryPipeline-class-jobs.dataproc.canary.CanaryJob"", ""attempts"": [{""attemptId"": ""1"", ""startTime"": ""2025-08-26T19:58:23.170GMT"", ""endTime"": ""2025-08-26T19:58:44.682GMT"", ""lastUpdated"": ""2025-08-26T19:59:39.000GMT"", ""duration"": 21512, ""sparkUser"": ""hadoop"", ""completed"": true, ""appSparkVersion"": ""3.3.2-amzn-0.1"", ""startTimeEpoch"": 1756238303170, ""lastUpdatedEpoch"": 1756238379000, ""endTimeEpoch"": 1756238324682}]}",hadoop,2025-08-26T19:58:23.170GMT,1756238303170
3.3.2-amzn-0.1,application_1756398594692_0001,CanaryPipeline-class-jobs.dataproc.canary.CanaryJob,1,j-1SRRJD4VIGE8I,spark-canary-job-aws @ 2025-08-28T16:24:51.236322+00:00 :: Airflow :: Test,True,21735,2025-08-28T16:31:39.411GMT,1756398699411,2025-08-28T16:32:24.000GMT,1756398744000,"{""id"": ""application_1756398594692_0001"", ""name"": ""CanaryPipeline-class-jobs.dataproc.canary.CanaryJob"", ""attempts"": [{""attemptId"": ""1"", ""startTime"": ""2025-08-28T16:31:17.676GMT"", ""endTime"": ""2025-08-28T16:31:39.411GMT"", ""lastUpdated"": ""2025-08-28T16:32:24.000GMT"", ""duration"": 21735, ""sparkUser"": ""hadoop"", ""completed"": true, ""appSparkVersion"": ""3.3.2-amzn-0.1"", ""startTimeEpoch"": 1756398677676, ""lastUpdatedEpoch"": 1756398744000, ""endTimeEpoch"": 1756398699411}]}",hadoop,2025-08-28T16:31:17.676GMT,1756398677676
3.5.5-amzn-0,application_1756435989506_0001,DailyScoreSet-class-job.DailyOfflineScoringSet,1,j-1S25M8TOS4B9V,KongmingEtlCluster @ 2025-08-27T10:00:00+00:00 :: Airflow :: Test,True,319869,2025-08-29T03:00:16.549GMT,1756436416549,2025-08-29T03:01:30.000GMT,1756436490000,"{""id"": ""application_1756435989506_0001"", ""name"": ""DailyScoreSet-class-job.DailyOfflineScoringSet"", ""attempts"": [{""attemptId"": ""1"", ""startTime"": ""2025-08-29T02:54:56.680GMT"", ""endTime"": ""2025-08-29T03:00:16.549GMT"", ""lastUpdated"": ""2025-08-29T03:01:30.000GMT"", ""duration"": 319869, ""sparkUser"": ""hadoop"", ""completed"": true, ""appSparkVersion"": ""3.5.5-amzn-0"", ""startTimeEpoch"": 1756436096680, ""lastUpdatedEpoch"": 1756436490000, ""endTimeEpoch"": 1756436416549}]}",hadoop,2025-08-29T02:54:56.680GMT,1756436096680
3.5.5-amzn-0,application_1756435989506_0002,DailyAttributedEvents-class-job.DailyAttributedEvents,1,j-1S25M8TOS4B9V,KongmingEtlCluster @ 2025-08-27T10:00:00+00:00 :: Airflow :: Test,False,0,1969-12-31T23:59:59.999GMT,-1,2025-08-29T18:27:27.454GMT,1756492047454,"{""id"": ""application_1756435989506_0002"", ""name"": ""DailyAttributedEvents-class-job.DailyAttributedEvents"", ""attempts"": [{""attemptId"": ""1"", ""startTime"": ""2025-08-29T03:00:39.179GMT"", ""endTime"": ""1969-12-31T23:59:59.999GMT"", ""lastUpdated"": ""2025-08-29T18:27:27.454GMT"", ""duration"": 0, ""sparkUser"": ""hadoop"", ""completed"": false, ""appSparkVersion"": ""3.5.5-amzn-0"", ""startTimeEpoch"": 1756436439179, ""lastUpdatedEpoch"": 1756492047454, ""endTimeEpoch"": -1}]}",hadoop,2025-08-29T03:00:39.179GMT,1756436439179
3.5.5-amzn-0,application_1756435822132_0002,DailyAttributedEvents-class-job.DailyAttributedEvents,1,j-2LQQYXCT2O98L,KongmingEtlCluster @ 2025-08-27T10:00:00+00:00 :: Airflow :: Test,True,65920,2025-08-29T02:59:10.706GMT,1756436350706,2025-08-29T03:01:48.000GMT,1756436508000,"{""id"": ""application_1756435822132_0002"", ""name"": ""DailyAttributedEvents-class-job.DailyAttributedEvents"", ""attempts"": [{""attemptId"": ""1"", ""startTime"": ""2025-08-29T02:58:04.786GMT"", ""endTime"": ""2025-08-29T02:59:10.706GMT"", ""lastUpdated"": ""2025-08-29T03:01:48.000GMT"", ""duration"": 65920, ""sparkUser"": ""hadoop"", ""completed"": true, ""appSparkVersion"": ""3.5.5-amzn-0"", ""startTimeEpoch"": 1756436284786, ""lastUpdatedEpoch"": 1756436508000, ""endTimeEpoch"": 1756436350706}]}",hadoop,2025-08-29T02:58:04.786GMT,1756436284786
3.5.5-amzn-0,application_1756435822132_0001,DailyScoreSet-class-job.DailyOfflineScoringSet,1,j-2LQQYXCT2O98L,KongmingEtlCluster @ 2025-08-27T10:00:00+00:00 :: Airflow :: Test,True,310995,2025-08-29T02:57:36.755GMT,1756436256755,2025-08-29T03:01:43.000GMT,1756436503000,"{""id"": ""application_1756435822132_0001"", ""name"": ""DailyScoreSet-class-job.DailyOfflineScoringSet"", ""attempts"": [{""attemptId"": ""1"", ""startTime"": ""2025-08-29T02:52:25.760GMT"", ""endTime"": ""2025-08-29T02:57:36.755GMT"", ""lastUpdated"": ""2025-08-29T03:01:43.000GMT"", ""duration"": 310995, ""sparkUser"": ""hadoop"", ""completed"": true, ""appSparkVersion"": ""3.5.5-amzn-0"", ""startTimeEpoch"": 1756435945760, ""lastUpdatedEpoch"": 1756436503000, ""endTimeEpoch"": 1756436256755}]}",hadoop,2025-08-29T02:52:25.760GMT,1756435945760
3.3.2-amzn-0.1,application_1756142359759_0001,CanaryPipeline-class-jobs.dataproc.canary.CanaryJob,1,j-N11M3Y9IQDEM,spark-canary-job-aws @ 2025-08-25T17:15:55.695896+00:00 :: Airflow,True,20911,2025-08-25T17:20:54.060GMT,1756142454060,2025-08-25T17:22:01.000GMT,1756142521000,"{""id"": ""application_1756142359759_0001"", ""name"": ""CanaryPipeline-class-jobs.dataproc.canary.CanaryJob"", ""attempts"": [{""attemptId"": ""1"", ""startTime"": ""2025-08-25T17:20:33.149GMT"", ""endTime"": ""2025-08-25T17:20:54.060GMT"", ""lastUpdated"": ""2025-08-25T17:22:01.000GMT"", ""duration"": 20911, ""sparkUser"": ""hadoop"", ""completed"": true, ""appSparkVersion"": ""3.3.2-amzn-0.1"", ""startTimeEpoch"": 1756142433149, ""lastUpdatedEpoch"": 1756142521000, ""endTimeEpoch"": 1756142454060}]}",hadoop,2025-08-25T17:20:33.149GMT,1756142433149
3.3.2-amzn-0.1,application_1755540700327_0001,CanaryPipeline-class-jobs.dataproc.canary.CanaryJob,1,j-2C9L9Y9WYQ6H8,spark-canary-job-aws @ 2025-08-18T15:02:04.727576+00:00 :: Airflow,True,22014,2025-08-18T18:13:51.696GMT,1755540831696,2025-08-18T18:15:12.000GMT,1755540912000,"{""id"": ""application_1755540700327_0001"", ""name"": ""CanaryPipeline-class-jobs.dataproc.canary.CanaryJob"", ""attempts"": [{""attemptId"": ""1"", ""startTime"": ""2025-08-18T18:13:29.682GMT"", ""endTime"": ""2025-08-18T18:13:51.696GMT"", ""lastUpdated"": ""2025-08-18T18:15:12.000GMT"", ""duration"": 22014, ""sparkUser"": ""hadoop"", ""completed"": true, ""appSparkVersion"": ""3.3.2-amzn-0.1"", ""startTimeEpoch"": 1755540809682, ""lastUpdatedEpoch"": 1755540912000, ""endTimeEpoch"": 1755540831696}]}",hadoop,2025-08-18T18:13:29.682GMT,1755540809682
3.3.2-amzn-0.1,application_1755296864657_0001,CanaryPipeline-class-jobs.dataproc.canary.CanaryJob,1,j-137KTJ1T8J5TS,spark-canary-job-aws @ 2025-08-15T22:24:16.775314+00:00 :: Airflow,True,20543,2025-08-15T22:29:13.894GMT,1755296953894,2025-08-15T22:30:30.000GMT,1755297030000,"{""id"": ""application_1755296864657_0001"", ""name"": ""CanaryPipeline-class-jobs.dataproc.canary.CanaryJob"", ""attempts"": [{""attemptId"": ""1"", ""startTime"": ""2025-08-15T22:28:53.351GMT"", ""endTime"": ""2025-08-15T22:29:13.894GMT"", ""lastUpdated"": ""2025-08-15T22:30:30.000GMT"", ""duration"": 20543, ""sparkUser"": ""hadoop"", ""completed"": true, ""appSparkVersion"": ""3.3.2-amzn-0.1"", ""startTimeEpoch"": 1755296933351, ""lastUpdatedEpoch"": 1755297030000, ""endTimeEpoch"": 1755296953894}]}",hadoop,2025-08-15T22:28:53.351GMT,1755296933351


In [0]:
display(executors_df)

In [0]:
display(jobs_df)

In [0]:
display(stages_df)

In [0]:
display(tasks_df)

In [0]:
display(sql_df)