**Extract spark log and analyze**

- Pass notebook id, app id, livy id
- mount a lakehouse to save the file to a location as log is a zip file thats typically 100s of MB

In [1]:
def get_spark_app_logs(client, workspace_id, notebook_id, livy_id, app_id, output_path):
    """
    Retrieves Spark application logs from Microsoft Fabric and saves them to the specified path.
    
    Parameters:
    -----------
    client : FabricRestClient
        An authenticated Fabric REST client
    workspace_id : str
        The ID of the workspace
    notebook_id : str
        The ID of the notebook
    livy_id : str
        The ID of the Livy session
    app_id : str
        The Spark application ID
    output_path : str
        Path where the compressed log file will be saved
    
    Returns:
    --------
    str
        Path to the saved file or error message
    """
    import os
       
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    endpoint = f"https://api.fabric.microsoft.com/v1/workspaces/{workspace_id}/notebooks/{notebook_id}/livySessions/{livy_id}/applications/{app_id}/1/logs"
    
    try:
        
        response = client.get(endpoint)
        if response.status_code != 200:
            return f"Error: Received status code {response.status_code}. Response: {response.text}"
        with open(output_path, "wb") as f:
            f.write(response.content)
        
        file_size = len(response.content)
        return f"Successfully saved logs ({file_size/1024/1024:.2f} MB) to {output_path}"
    
    except Exception as e:
        return f"Error retrieving logs: {str(e)}"


def extract_and_analyze_spark_logs(compressed_log_path, extract_path):
    """
    Extracts and analyzes Spark application logs, saving the extracted content
    and providing a basic analysis.

    """
    import os
    import json
    import zipfile
    import gzip
    import io
    
    os.makedirs(extract_path, exist_ok=True)
    
    results = {
        "success": False,
        "format": "unknown",
        "extracted_files": [],
        "log_summary": {},
        "error": None
    }
    
    try:
        with open(compressed_log_path, "rb") as f:
            header = f.read(16)       

        if header.startswith(b'PK\x03\x04'):
            results["format"] = "zip"
            
            
            with zipfile.ZipFile(compressed_log_path, 'r') as zip_ref:
                file_list = zip_ref.namelist()
                results["extracted_files"] = file_list
                zip_ref.extractall(extract_path)
                if file_list:
                    log_file_path = os.path.join(extract_path, file_list[0])
                    results["primary_log"] = log_file_path
                    results["log_summary"] = analyze_spark_log(log_file_path)
        
        elif header.startswith(b'\x1f\x8b\x08'):
            results["format"] = "gzip"
            with gzip.open(compressed_log_path, 'rb') as f:
                decompressed_content = f.read()
            extracted_file = os.path.join(extract_path, "application_log.txt")
            with open(extracted_file, "wb") as f:
                f.write(decompressed_content)
            
            results["extracted_files"] = [extracted_file]
            results["primary_log"] = extracted_file
            results["log_summary"] = analyze_spark_log(extracted_file)
        
        else:

            results["format"] = "uncompressed"
            results["extracted_files"] = [compressed_log_path]
            results["primary_log"] = compressed_log_path
            results["log_summary"] = analyze_spark_log(compressed_log_path)
        
        results["success"] = True
        
    except Exception as e:
        results["error"] = str(e)
    
    return results


def analyze_spark_log(log_file_path):
    """
    Analyzes a Spark log file, extracting key events and information.
    Intentionaly limited to 10 executors and jobs

    """
    import json
    import os
    from datetime import datetime
    summary = {
        "file_size_mb": os.path.getsize(log_file_path) / 1024 / 1024,
        "application": {},
        "environment": {},
        "jobs": [],
        "stages": [],
        "exceptions": [],
        "executors": []
    }
    
    event_counts = {}
    
    try:
        with open(log_file_path, 'r') as f:
            line_count = 0
            for line in f:
                line_count += 1
                if line_count % 100000 == 0:
                    print(f"Analyzed {line_count} lines...")
                
                try:
                    event = json.loads(line.strip())
                    event_type = event.get("Event", "Unknown")

                    event_counts[event_type] = event_counts.get(event_type, 0) + 1

                    if event_type == "SparkListenerApplicationStart":
                        summary["application"] = {
                            "name": event.get("App Name", "Unknown"),
                            "id": event.get("App ID", "Unknown"),
                            "start_time": event.get("Timestamp", 0),
                            "start_time_formatted": datetime.fromtimestamp(
                                event.get("Timestamp", 0)/1000
                            ).strftime('%Y-%m-%d %H:%M:%S') if event.get("Timestamp") else "Unknown"
                        }

                    elif event_type == "SparkListenerApplicationEnd":
                        summary["application"]["end_time"] = event.get("Timestamp", 0)
                        end_time = event.get("Timestamp", 0)
                        start_time = summary["application"].get("start_time", 0)
                        
                        if end_time and start_time:
                            duration_sec = (end_time - start_time) / 1000
                            summary["application"]["duration_sec"] = duration_sec
                            summary["application"]["duration_formatted"] = f"{duration_sec//60:.0f}m {duration_sec%60:.1f}s"
                            summary["application"]["end_time_formatted"] = datetime.fromtimestamp(
                                end_time/1000
                            ).strftime('%Y-%m-%d %H:%M:%S')

                    elif event_type == "SparkListenerEnvironmentUpdate":
                        if "JVM Information" in event:
                            summary["environment"]["jvm"] = event["JVM Information"]
                        if "Spark Properties" in event:
                            key_props = ["spark.app.name", "spark.driver.memory", "spark.executor.memory", 
                                        "spark.executor.cores", "spark.executor.instances", 
                                        "spark.serializer", "spark.sql.shuffle.partitions"] #add more as needed
                            summary["environment"]["properties"] = {
                                k: v for k, v in event.get("Spark Properties", {}).items() 
                                if any(key in k for key in key_props)
                            }
                    
                    # Extract job info
                    elif event_type == "SparkListenerJobEnd":
                        if len(summary["jobs"]) < 10:  # Limit to first 10 jobs
                            summary["jobs"].append({
                                "id": event.get("Job ID"),
                                "result": event.get("Job Result", {}).get("Result", "Unknown"),
                                "completion_time": event.get("Completion Time")
                            })
                    
                    # Extract stage info
                    elif event_type == "SparkListenerStageCompleted":
                        if len(summary["stages"]) < 10:  # Limit to first 10 stages
                            stage_info = event.get("Stage Info", {})
                            summary["stages"].append({
                                "id": stage_info.get("Stage ID"),
                                "name": stage_info.get("Stage Name"),
                                "tasks": stage_info.get("Number of Tasks"),
                                "successful_tasks": stage_info.get("Number of Complete Tasks"),
                                "failed_tasks": stage_info.get("Number of Failed Tasks", 0)
                            })
                    
                    # Extract executor info
                    elif event_type == "SparkListenerExecutorAdded":
                        if len(summary["executors"]) < 10:  # Limit to first 10 executors
                            summary["executors"].append({
                                "id": event.get("Executor ID"),
                                "added_time": event.get("Timestamp"),
                                "resources": event.get("Resources", {})
                            })
                    
                    if "Exception" in event_type or any(err in json.dumps(event) for err in ["Error", "error", "exception", "Exception", "Failed", "failed"]):
                        if len(summary["exceptions"]) < 20:  # Limit to first 20 exceptions
                            summary["exceptions"].append(event)
                    
                except json.JSONDecodeError:
                    continue
        
        summary["event_counts"] = event_counts
        summary["total_events"] = sum(event_counts.values())
        
    except Exception as e:
        summary["error"] = str(e)
    
    return summary

In [2]:

import sempy.fabric as fabric

client = fabric.FabricRestClient()

workspaceid = "91eb8bf4-20c3-47d1-bf43-2dd8dafec93f"
notebookid = "996b5d64-bee7-4aaa-92b6-ff2ba76d0fc8"
livyid = "fbd4b8a6-0461-4d15-9501-cd2acc4180d2"
appid = "application_1747317648565_0001"

compressed_log_path = "/lakehouse/default/Files/spark_logs.zip"
extract_dir = "/lakehouse/default/Files/spark_logs_extracted"

result = get_spark_app_logs(client, workspaceid, notebookid, livyid, appid, compressed_log_path)
print(result)

if "Successfully" in result:
    analysis = extract_and_analyze_spark_logs(compressed_log_path, extract_dir)
    
    if analysis["success"]:
        print(f"Log analysis complete. Format: {analysis['format']}")
        print(f"Extracted files: {analysis['extracted_files']}")       

        app_info = analysis["log_summary"]["application"]
        print("\nApplication Summary:")
        print(f"Name: {app_info.get('name', 'Unknown')}")
        print(f"ID: {app_info.get('id', 'Unknown')}")
        print(f"Duration: {app_info.get('duration_formatted', 'Unknown')}")

        exceptions = analysis["log_summary"]["exceptions"]
        if exceptions:
            print(f"\nFound {len(exceptions)} exceptions/errors")
            for i, exc in enumerate(exceptions[:3]): 
                print(f"Exception {i+1}: {exc.get('Event', 'Unknown')}")
        else:
            print("\nNo exceptions found in logs")

        import json
        with open(f"{extract_dir}/log_analysis.json", "w") as f:
            json.dump(analysis["log_summary"], f, indent=2)
        print(f"\nDetailed analysis saved to {extract_dir}/log_analysis.json")
    else:
        print(f"Error analyzing logs: {analysis['error']}")
else:
    print("Log retrieval failed, cannot proceed with analysis")

Successfully saved logs (30.08 MB) to /lakehouse/default/Files/spark_logs.zip
Log analysis complete. Format: zip
Extracted files: ['application_1747317648565_0001_1']

Application Summary:
Name: nee_cgov_fbd4b8a6-0461-4d15-9501-cd2acc4180d2
ID: application_1747317648565_0001
Duration: 45m 41.0s

Found 20 exceptions/errors
Exception 1: SparkListenerEnvironmentUpdate
Exception 2: SparkListenerTaskStart
Exception 3: SparkListenerTaskEnd

Detailed analysis saved to /lakehouse/default/Files/spark_logs_extracted/log_analysis.json
