In [104]:
from google.cloud import storage
import pandas as pd
import matplotlib.pyplot as plt
import json
from datetime import datetime
from urllib.parse import urlparse
import plotly.express as px
import seaborn as sns



def read_logs_from_gs_uri(gs_uri):
    """
    Read logs from a Google Cloud Storage file given a gs:// URI.
    """
    # Parse the gs:// URI
    parsed = urlparse(gs_uri)
    if parsed.scheme != "gs":
        raise ValueError(f"Invalid URI scheme: {gs_uri}. Must start with 'gs://'.")

    bucket_name = parsed.netloc
    file_path = parsed.path.lstrip('/')

    # Initialize GCS client
    client = storage.Client()
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(file_path)
    
    # Download the JSON content as a string
    logs_json = blob.download_as_string()

    # Parse the JSON string into a list of dictionaries
    logs = json.loads(logs_json)
    return logs

def analyze_message_flow(logs,desc = ''):
  
    # Create a DataFrame from the logs
    df = pd.DataFrame(logs)

    # Convert the 'timestamp' column to datetime type
    df['timestamp'] = pd.to_datetime(df['timestamp'])

    # Sort by 'timestamp' to maintain proper order
    df = df.sort_values('timestamp')

    # Create an interactive scatter plot for the message states over time
    fig = px.line(
        df,
        x='timestamp',
        y='message_id',
        color='state',  # Color by the state to differentiate states visually
        title= desc +  "Plot for Message Transitions Over Time",
        labels={"timestamp": "Time", "message id": "Message_ID"},
        hover_data=["message_id"],
        template="plotly_dark",  # Optional: dark theme
        markers=True
    )

    # Customize layout for better readability
    fig.update_layout(
        hovermode="closest",
        xaxis=dict(showgrid=True, title="Timestamp"),
        yaxis=dict(showgrid=True, title="State"),
        margin=dict(t=40, b=40, l=40, r=40),  # Adjust margins to make space for labels
    )
    
    # Show the plot
    fig.show()




In [67]:
def calculate_latency_per_messages(logs):
  df = pd.DataFrame(logs)
  df['timestamp'] = pd.to_datetime(df['timestamp'])  # Convert to datetime
  pending_times = df[df['state'] == 'pending'].set_index('message_id')['timestamp']
  completed_times = df[df['state'] == 'completed'].set_index('message_id')['timestamp']
  latency = completed_times - pending_times
  latency_df =  latency.reset_index(name='latency')
  latency_df['latency_seconds'] = latency_df['latency'].dt.total_seconds()
  # Summary statistics
  # stats = latency_df['latency_seconds'].describe()
  # print(stats)
  return latency_df


In [119]:

def calculate_throughput(logs, time_interval='10S'):
  """
  Calculate throughput (tasks completed per time interval) from a list of logs.
  
  Args:
      logs (list): List of dictionaries containing log data. Each log should include 'timestamp' and 'state'.
      time_interval (str): Pandas offset string for time intervals (e.g., '1T' for 1 minute).
  
  Returns:
      pd.DataFrame: DataFrame with time intervals and corresponding throughput.
  """
  # Convert logs to a DataFrame
  df = pd.DataFrame(logs)
  
  # Ensure 'timestamp' is in datetime format
  df['timestamp'] = pd.to_datetime(df['timestamp'])
  
  # Filter logs for the 'completed' state
  completed_logs = df[df['state'] == 'completed']
  
  # Calculate throughput by resampling based on time intervals
  throughput = completed_logs.set_index('timestamp').resample(time_interval).size()
  
  print(f"throughput => {throughput}")
  
  # Reset index and rename the throughput column
  return throughput.reset_index(name='throughput')


In [96]:
import plotly.graph_objects as go

def plot_throughput_trends(throughput_df, desc=''):
    """
    Plots interactive throughput trends over time using Plotly.

    Args:
        throughput_df (pd.DataFrame): DataFrame with `timestamp` and `throughput` columns.
        desc (str): Description or title prefix for the plot.
    """
    fig = go.Figure()

    # Add throughput trace
    fig.add_trace(
        go.Scatter(
            x=throughput_df['timestamp'],
            y=throughput_df['throughput'],
            mode='lines+markers',
            marker=dict(color='green'),
            line=dict(color='green'),
            name='Throughput'
        )
    )

    # Update layout
    fig.update_layout(
        title=f'{desc} Plot for Throughput Trends Over Time',
        xaxis_title='Time',
        yaxis_title='Tasks Completed (Throughput)',
        template='plotly_dark',
        legend=dict(title='Legend'),
        xaxis=dict(showgrid=True),
        yaxis=dict(showgrid=True),
    )

    fig.show()

def plot_latency_trends(latency_df, desc=''):
    """
    Plots interactive latency trends over message IDs using Plotly.

    Args:
        latency_df (pd.DataFrame): DataFrame with `message_id` and `latency_seconds` columns.
        desc (str): Description or title prefix for the plot.
    """
    fig = go.Figure()

    # Add latency trace
    fig.add_trace(
        go.Scatter(
            x=latency_df['message_id'],
            y=latency_df['latency_seconds'],
            mode='lines+markers',
            marker=dict(color='blue'),
            line=dict(color='blue'),
            name='Latency'
        )
    )

    # Update layout
    fig.update_layout(
        title=f'{desc} Plot for Latency Trends Over Time',
        xaxis_title='Message ID',
        yaxis_title='Latency (seconds)',
        template='plotly_dark',
        legend=dict(title='Legend'),
        xaxis=dict(showgrid=True),
        yaxis=dict(showgrid=True),
    )

    fig.show()


In [120]:


def analyze_lentency(logs,desc):
  latency_df = calculate_latency_per_messages(logs)
  # print(latency_df)
  # plot_latency_distribution(latency_df)
  plot_latency_trends(latency_df,desc)
  

def analyze_throughtput(logs,desc):
  throughput_df = calculate_throughput(logs)
  # print(throughput_df)
  plot_throughput_trends(throughput_df,desc)
  


exps = [
  
  # ("10 tasks 2 workers batch-size => 5 : slow ","gs://cameltrain-sight/doing_mq_analysis/2024-12-20T09:00:43.541533.json"),
  # ("50 tasks 2 workers batch-size => 5 : slow ","gs://cameltrain-sight/doing_mq_analysis/2024-12-23T15:15:49.032019.json"),
  # ("100 tasks 2 workers batch-size => 5 : slow ","gs://cameltrain-sight/doing_mq_analysis/2024-12-23T19:26:50.355211.json"),
  # ("500 tasks 2 workers batch-size => 5 : slow ","gs://cameltrain-sight/doing_mq_analysis/2024-12-22T15:48:18.956719.json"),
  
  # ("50 tasks 4 workers batch-size => 5 : slow ","gs://cameltrain-sight/doing_mq_analysis/2024-12-27T09:26:30.675248.json"),
  # ("100 tasks 4 workers batch-size => 5 : slow ","gs://cameltrain-sight/doing_mq_analysis/2024-12-27T15:29:03.534160.json"),
  # ("500 tasks 4 workers batch-size => 5 : slow ","gs://cameltrain-sight/doing_mq_analysis/2024-12-22T15:48:18.956719.json"),
  
  ("10 tasks 1 workers batch-size => 1 : fast ","gs://cameltrain-sight/doing_mq_analysis/2025-01-04T19:48:06.083435.json"),
  
  ("10 tasks 2 workers batch-size => 5 : fast ","gs://cameltrain-sight/doing_mq_analysis/2024-12-31T09:40:23.962527.json"),
  ("50 tasks 2 workers batch-size => 5 : fast ","gs://cameltrain-sight/doing_mq_analysis/2024-12-31T10:27:32.520731.json"),
  ("100 tasks 2 workers batch-size => 5 : fast ","gs://cameltrain-sight/doing_mq_analysis/2024-12-31T10:52:02.779873.json"),
  ("500 tasks 2 workers batch-size => 5 : fast ","gs://cameltrain-sight/doing_mq_analysis/2024-12-31T12:06:54.093683.json"), 
  
  ("50 tasks 4 workers batch-size => 5 : fast ","gs://cameltrain-sight/doing_mq_analysis/2024-12-31T08:09:08.847832.json"), 
  ("100 tasks 4 workers batch-size => 5 : fast ","gs://cameltrain-sight/doing_mq_analysis/2024-12-31T09:08:46.723731.json"), 
  ("500 tasks 4 workers batch-size => 5 : fast ","gs://cameltrain-sight/doing_mq_analysis/2024-12-31T12:06:54.093683.json")
]


for (desc,uri) in exps:
  logsx = read_logs_from_gs_uri(gs_uri = uri )
  analyze_message_flow(logsx,desc)
  analyze_lentency(logsx,desc)
  analyze_throughtput(logsx,desc)


throughput => timestamp
2025-01-04 19:46:20    1
2025-01-04 19:46:30    1
2025-01-04 19:46:40    1
2025-01-04 19:46:50    1
2025-01-04 19:47:00    1
2025-01-04 19:47:10    1
2025-01-04 19:47:20    1
2025-01-04 19:47:30    1
2025-01-04 19:47:40    1
2025-01-04 19:47:50    1
Freq: 10S, dtype: int64


throughput => timestamp
2024-12-31 09:40:00    10
Freq: 10S, dtype: int64


throughput => timestamp
2024-12-31 10:23:50    10
2024-12-31 10:24:00     0
2024-12-31 10:24:10     0
2024-12-31 10:24:20     0
2024-12-31 10:24:30     0
2024-12-31 10:24:40    10
2024-12-31 10:24:50     0
2024-12-31 10:25:00     0
2024-12-31 10:25:10     0
2024-12-31 10:25:20     0
2024-12-31 10:25:30    10
2024-12-31 10:25:40     0
2024-12-31 10:25:50     0
2024-12-31 10:26:00     0
2024-12-31 10:26:10     0
2024-12-31 10:26:20    10
2024-12-31 10:26:30     0
2024-12-31 10:26:40     0
2024-12-31 10:26:50     0
2024-12-31 10:27:00     0
2024-12-31 10:27:10    10
Freq: 10S, dtype: int64


throughput => timestamp
2024-12-31 10:44:10    10
2024-12-31 10:44:20     0
2024-12-31 10:44:30     0
2024-12-31 10:44:40     0
2024-12-31 10:44:50     0
2024-12-31 10:45:00    10
2024-12-31 10:45:10     0
2024-12-31 10:45:20     0
2024-12-31 10:45:30     0
2024-12-31 10:45:40     0
2024-12-31 10:45:50    10
2024-12-31 10:46:00     0
2024-12-31 10:46:10     0
2024-12-31 10:46:20     0
2024-12-31 10:46:30     0
2024-12-31 10:46:40    10
2024-12-31 10:46:50     0
2024-12-31 10:47:00     0
2024-12-31 10:47:10     0
2024-12-31 10:47:20     0
2024-12-31 10:47:30    10
2024-12-31 10:47:40     0
2024-12-31 10:47:50     0
2024-12-31 10:48:00     0
2024-12-31 10:48:10     0
2024-12-31 10:48:20    10
2024-12-31 10:48:30     0
2024-12-31 10:48:40     0
2024-12-31 10:48:50     0
2024-12-31 10:49:00     0
2024-12-31 10:49:10    10
2024-12-31 10:49:20     0
2024-12-31 10:49:30     0
2024-12-31 10:49:40     0
2024-12-31 10:49:50     0
2024-12-31 10:50:00    10
2024-12-31 10:50:10     0
2024-12-31 10:

throughput => timestamp
2024-12-31 11:25:30    10
2024-12-31 11:25:40     0
2024-12-31 11:25:50     0
2024-12-31 11:26:00     0
2024-12-31 11:26:10     0
                       ..
2024-12-31 12:05:50     0
2024-12-31 12:06:00     0
2024-12-31 12:06:10     0
2024-12-31 12:06:20     0
2024-12-31 12:06:30    10
Freq: 10S, Length: 247, dtype: int64


throughput => timestamp
2024-12-31 08:07:10    20
2024-12-31 08:07:20     0
2024-12-31 08:07:30     0
2024-12-31 08:07:40     0
2024-12-31 08:07:50     0
2024-12-31 08:08:00    20
2024-12-31 08:08:10     0
2024-12-31 08:08:20     0
2024-12-31 08:08:30     0
2024-12-31 08:08:40     0
2024-12-31 08:08:50    10
Freq: 10S, dtype: int64


throughput => timestamp
2024-12-31 09:05:00    20
2024-12-31 09:05:10     0
2024-12-31 09:05:20     0
2024-12-31 09:05:30     0
2024-12-31 09:05:40     0
2024-12-31 09:05:50    20
2024-12-31 09:06:00     0
2024-12-31 09:06:10     0
2024-12-31 09:06:20     0
2024-12-31 09:06:30     0
2024-12-31 09:06:40    20
2024-12-31 09:06:50     0
2024-12-31 09:07:00     0
2024-12-31 09:07:10     0
2024-12-31 09:07:20     0
2024-12-31 09:07:30    20
2024-12-31 09:07:40     0
2024-12-31 09:07:50     0
2024-12-31 09:08:00     0
2024-12-31 09:08:10     0
2024-12-31 09:08:20    20
Freq: 10S, dtype: int64


throughput => timestamp
2024-12-31 11:25:30    10
2024-12-31 11:25:40     0
2024-12-31 11:25:50     0
2024-12-31 11:26:00     0
2024-12-31 11:26:10     0
                       ..
2024-12-31 12:05:50     0
2024-12-31 12:06:00     0
2024-12-31 12:06:10     0
2024-12-31 12:06:20     0
2024-12-31 12:06:30    10
Freq: 10S, Length: 247, dtype: int64
