<center>

# **View Pipeline Metadata**

</center>

# **Purpose**
#### This notebook provides a view into the pipeline execution metadata, allowing you to:
- Track pipeline runs and their status
- Monitor row counts for each table
- Identify failed runs and view error messages
- Analyze execution durations and performance

# **Install Dependencies**

In [None]:
!pip install --quiet deltalake==0.18.2
!pip install --quiet pandas
!pip install --quiet matplotlib

# **Load Libraries and Functions**

In [None]:
%run AquaQuiver_functions

In [None]:
from deltalake import DeltaTable
import pyarrow.compute as pc
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime, timedelta

# **Read Metadata Table**

In [None]:
# Read metadata table
lakehouse_properties = notebookutils.lakehouse.get("Base")
abfss_path = lakehouse_properties["properties"]["abfsPath"]
delta_table_path = f"{abfss_path}/Tables/pipeline_metadata"

aadToken = notebookutils.credentials.getToken('storage')
storage_options={"bearer_token": aadToken, "use_fabric_endpoint": "true"}

try:
    dt = DeltaTable(delta_table_path, storage_options=storage_options)
    metadata_arrow = dt.to_pyarrow_table()
    metadata_df = metadata_arrow.to_pandas()
    print(f"Total metadata records: {len(metadata_df)}")
except Exception as e:
    print(f"Error reading metadata table: {str(e)}")
    metadata_df = pd.DataFrame()

# **Recent Pipeline Runs**

In [None]:
if not metadata_df.empty:
    # Show last 10 runs
    recent_runs = metadata_df.sort_values('run_timestamp', ascending=False).head(10)
    display(recent_runs[['run_timestamp', 'notebook_name', 'table_name', 'layer', 'row_count', 'status', 'duration_seconds']])
else:
    print("No metadata records found")

# **Pipeline Run Summary by Session**

In [None]:
if not metadata_df.empty:
    # Group by run_id to show session summary
    session_summary = metadata_df.groupby('run_id').agg({
        'run_timestamp': 'min',
        'notebook_name': 'count',
        'row_count': 'sum',
        'status': lambda x: 'FAILED' if 'FAILED' in x.values else 'SUCCESS',
        'duration_seconds': 'max'
    }).rename(columns={
        'run_timestamp': 'session_start',
        'notebook_name': 'tables_processed',
        'row_count': 'total_rows',
        'status': 'session_status',
        'duration_seconds': 'total_duration'
    })
    
    session_summary = session_summary.sort_values('session_start', ascending=False).head(10)
    print("\nRecent Pipeline Sessions:")
    display(session_summary)
else:
    print("No session data available")

# **Failed Runs**

In [None]:
if not metadata_df.empty:
    failed_runs = metadata_df[metadata_df['status'] == 'FAILED']
    
    if not failed_runs.empty:
        print(f"Found {len(failed_runs)} failed runs:")
        display(failed_runs[['run_timestamp', 'notebook_name', 'table_name', 'error_message']].sort_values('run_timestamp', ascending=False))
    else:
        print("No failed runs found!")
else:
    print("No metadata records found")

# **Performance Metrics**

In [None]:
if not metadata_df.empty:
    # Calculate average processing time by table
    avg_duration = metadata_df[metadata_df['status'] == 'SUCCESS'].groupby('table_name')['duration_seconds'].agg(['mean', 'min', 'max', 'count'])
    avg_duration.columns = ['avg_seconds', 'min_seconds', 'max_seconds', 'run_count']
    avg_duration = avg_duration.sort_values('avg_seconds', ascending=False)
    
    print("\nProcessing Time by Table:")
    display(avg_duration)
    
    # Plot if we have data
    if len(avg_duration) > 0:
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))
        
        # Bar chart of average processing time
        avg_duration['avg_seconds'].plot(kind='barh', ax=ax1)
        ax1.set_title('Average Processing Time by Table')
        ax1.set_xlabel('Seconds')
        
        # Row counts by table
        avg_rows = metadata_df[metadata_df['status'] == 'SUCCESS'].groupby('table_name')['row_count'].mean().sort_values(ascending=False)
        avg_rows.plot(kind='barh', ax=ax2)
        ax2.set_title('Average Row Count by Table')
        ax2.set_xlabel('Rows')
        
        plt.tight_layout()
        plt.show()
else:
    print("No metadata records found")

# **Pipeline Execution Timeline**

In [None]:
if not metadata_df.empty and len(metadata_df) > 1:
    # Create timeline plot
    metadata_df['run_timestamp'] = pd.to_datetime(metadata_df['run_timestamp'])
    
    # Group by date and count runs
    daily_runs = metadata_df.groupby(metadata_df['run_timestamp'].dt.date).agg({
        'notebook_name': 'count',
        'row_count': 'sum',
        'status': lambda x: (x == 'FAILED').sum()
    }).rename(columns={'notebook_name': 'total_runs', 'row_count': 'total_rows', 'status': 'failed_runs'})
    
    if len(daily_runs) > 0:
        fig, axes = plt.subplots(2, 1, figsize=(14, 8))
        
        # Plot runs per day
        daily_runs[['total_runs', 'failed_runs']].plot(ax=axes[0], kind='bar', stacked=False)
        axes[0].set_title('Pipeline Executions Per Day')
        axes[0].set_xlabel('Date')
        axes[0].set_ylabel('Number of Runs')
        axes[0].legend(['Total Runs', 'Failed Runs'])
        
        # Plot rows processed per day
        daily_runs['total_rows'].plot(ax=axes[1], kind='line', marker='o')
        axes[1].set_title('Total Rows Processed Per Day')
        axes[1].set_xlabel('Date')
        axes[1].set_ylabel('Row Count')
        axes[1].grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
else:
    print("Insufficient data for timeline visualization")