In [1]:
from google.cloud import storage
import pandas as pd
import matplotlib.pyplot as plt
import json
from datetime import datetime
from urllib.parse import urlparse
import plotly.express as px


def read_logs_from_gs_uri(gs_uri):
    """
    Read logs from a Google Cloud Storage file given a gs:// URI.
    """
    # Parse the gs:// URI
    parsed = urlparse(gs_uri)
    if parsed.scheme != "gs":
        raise ValueError(f"Invalid URI scheme: {gs_uri}. Must start with 'gs://'.")

    bucket_name = parsed.netloc
    file_path = parsed.path.lstrip('/')

    # Initialize GCS client
    client = storage.Client()
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(file_path)
    
    # Download the JSON content as a string
    logs_json = blob.download_as_string()

    # Parse the JSON string into a list of dictionaries
    logs = json.loads(logs_json)
    return logs

def analyze_message_flow(logs):
  
    # Create a DataFrame from the logs
    df = pd.DataFrame(logs)

    # Convert the 'timestamp' column to datetime type
    df['timestamp'] = pd.to_datetime(df['timestamp'])

    # Sort by 'timestamp' to maintain proper order
    df = df.sort_values('timestamp')

    # Create an interactive scatter plot for the message states over time
    fig = px.line(
        df,
        x='timestamp',
        y='message_id',
        color='state',  # Color by the state to differentiate states visually
        title="Message Transitions Over Time",
        labels={"timestamp": "Time", "state": "State"},
        hover_data=["message_id"],
        template="plotly_dark",  # Optional: dark theme
        markers=True
    )

    # Customize layout for better readability
    fig.update_layout(
        hovermode="closest",
        xaxis=dict(showgrid=True, title="Timestamp"),
        yaxis=dict(showgrid=True, title="State"),
        margin=dict(t=40, b=40, l=40, r=40),  # Adjust margins to make space for labels
    )
    
    # Show the plot
    fig.show()



In [35]:
# Read logs from GCS

# workers 2 , tasks 5*2 = 10 , batch_size 5
logs10 = read_logs_from_gs_uri(gs_uri= "gs://cameltrain-sight/doing_mq_analysis/2024-12-20T09:00:43.541533.json")
# workers 2 , tasks 5*2 = 10 , batch_size 5
logs10_fast = read_logs_from_gs_uri(gs_uri= "gs://cameltrain-sight/doing_mq_analysis/2024-12-31T09:40:23.962527.json")


In [42]:
analyze_message_flow(logs10)
analyze_message_flow(logs10_fast)

In [43]:
# workers 2 , tasks 25*2 = 50 , batch_size 5
logs50 = read_logs_from_gs_uri(gs_uri="gs://cameltrain-sight/doing_mq_analysis/2024-12-23T15:15:49.032019.json")
logs50_fast = read_logs_from_gs_uri(gs_uri="gs://cameltrain-sight/doing_mq_analysis/2024-12-31T10:27:32.520731.json")


In [44]:
analyze_message_flow(logs50)
analyze_message_flow(logs50_fast)

In [37]:
# workers 2 , tasks 50*2 = 100 , batch_size = 5
logs100 = read_logs_from_gs_uri(gs_uri="gs://cameltrain-sight/doing_mq_analysis/2024-12-23T19:26:50.355211.json")
logs100_fast = read_logs_from_gs_uri(gs_uri="gs://cameltrain-sight/doing_mq_analysis/2024-12-31T10:52:02.779873.json")


In [38]:
analyze_message_flow(logs100)
analyze_message_flow(logs100_fast)

In [39]:
# # workers 2 , tasks 250*2 = 500 , batch_size = 5
logs500 = read_logs_from_gs_uri(gs_uri="gs://cameltrain-sight/doing_mq_analysis/2024-12-22T15:48:18.956719.json")
logs500_fast = read_logs_from_gs_uri(gs_uri="gs://cameltrain-sight/doing_mq_analysis/2024-12-31T12:06:54.093683.json")


In [41]:
analyze_message_flow(logs500)
analyze_message_flow(logs500_fast)

In [24]:
# Read logs from GCS
# # workers 4 , tasks 5*2 = 10 , batch_size 5
# logs10 = read_logs_from_gs_uri(gs_uri= "gs://cameltrain-sight/doing_mq_analysis/2024-12-20T09:00:43.541533.json")
# workers 4 , tasks 25*2 = 50 , batch_size 5
logs4_50 = read_logs_from_gs_uri(gs_uri="gs://cameltrain-sight/doing_mq_analysis/2024-12-27T09:26:30.675248.json")
logs4_50_fast = read_logs_from_gs_uri(gs_uri="gs://cameltrain-sight/doing_mq_analysis/2024-12-31T08:09:08.847832.json")


# # workers 4 , tasks 50*2 = 100 , batch_size = 5
logs4_100 = read_logs_from_gs_uri(gs_uri="gs://cameltrain-sight/doing_mq_analysis/2024-12-27T15:29:03.534160.json")
logs4_100_fast = read_logs_from_gs_uri(gs_uri="gs://cameltrain-sight/doing_mq_analysis/2024-12-31T09:08:46.723731.json")

# # workers 2 , tasks 250*2 = 500 , batch_size = 5
# logs500 = read_logs_from_gs_uri(gs_uri="gs://cameltrain-sight/doing_mq_analysis/2024-12-22T15:48:18.956719.json")

In [25]:
analyze_message_flow(logs4_50)
analyze_message_flow(logs4_50_fast)


In [26]:
analyze_message_flow(logs4_100)           # 100 tasks , 4 workers , batch_size => 5
analyze_message_flow(logs4_100_fast)      # 100 tasks , 4 workers , batch_size => 5  , fast propose action