# HYDATIS Cluster Data Exploration

Initial exploratory data analysis of the HYDATIS cluster metrics for ML scheduler development.

## Objectives
- Analyze 30 days of historical cluster metrics
- Understand node utilization patterns
- Identify scheduling bottlenecks
- Establish baseline performance metrics

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

import sys
sys.path.append('/home/jovyan/work/src')
from data_collection.prometheus_collector import PrometheusCollector
from data_collection.data_processor import DataProcessor

plt.style.use('seaborn-v0_8')
sns.set_palette('husl')

print("HYDATIS Cluster Analysis - Week 2")
print(f"Analysis Date: {datetime.now()}")

## 1. Data Collection Setup

In [None]:
# Initialize collectors with HYDATIS cluster configuration
collector = PrometheusCollector(prometheus_url="http://10.110.190.83:9090")
processor = DataProcessor()

# Define analysis period - last 30 days
end_time = datetime.now()
start_time = end_time - timedelta(days=30)

print(f"Collecting data from {start_time} to {end_time}")
print(f"Analysis period: {(end_time - start_time).days} days")

## 2. Node Resource Analysis

In [None]:
# Collect node metrics for HYDATIS 6-node cluster
print("Collecting node metrics...")
node_metrics = collector.collect_node_metrics(start_time, end_time)

if 'cpu_usage' in node_metrics:
    cpu_df = node_metrics['cpu_usage']
    print(f"CPU data points collected: {len(cpu_df)}")
    print(f"Nodes analyzed: {cpu_df['instance'].nunique()}")
    
    # Node CPU utilization summary
    cpu_summary = cpu_df.groupby('instance')['value'].agg([
        ('avg_cpu', 'mean'),
        ('max_cpu', 'max'),
        ('min_cpu', 'min'),
        ('std_cpu', 'std')
    ]).round(3)
    
    print("\nNode CPU Utilization Summary:")
    print(cpu_summary)
else:
    print("No CPU metrics collected")

In [None]:
# Memory analysis
if 'memory_usage' in node_metrics:
    memory_df = node_metrics['memory_usage']
    
    # Convert to percentage
    memory_df['memory_pct'] = memory_df['value'] * 100
    
    memory_summary = memory_df.groupby('instance')['memory_pct'].agg([
        ('avg_memory', 'mean'),
        ('max_memory', 'max'),
        ('min_memory', 'min')
    ]).round(2)
    
    print("\nNode Memory Utilization Summary (%):")
    print(memory_summary)
    
    # Expected vs Actual (from cluster audit: workers 36-43% memory)
    expected_memory_range = (36, 43)
    actual_avg = memory_summary['avg_memory'].mean()
    print(f"\nExpected memory range: {expected_memory_range[0]}-{expected_memory_range[1]}%")
    print(f"Actual average: {actual_avg:.1f}%")

## 3. Cluster Load Patterns

In [None]:
# Analyze temporal patterns
if 'cpu_usage' in node_metrics and not cpu_df.empty:
    # Hourly patterns
    cpu_df['hour'] = cpu_df['timestamp'].dt.hour
    cpu_df['day_of_week'] = cpu_df['timestamp'].dt.dayofweek
    
    hourly_pattern = cpu_df.groupby('hour')['value'].mean()
    daily_pattern = cpu_df.groupby('day_of_week')['value'].mean()
    
    print("Hourly CPU Usage Pattern:")
    for hour, usage in hourly_pattern.items():
        print(f"{hour:02d}:00 - {usage:.3f}")
    
    print("\nDaily CPU Usage Pattern:")
    days = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
    for day_idx, usage in daily_pattern.items():
        print(f"{days[day_idx]} - {usage:.3f}")

## 4. Scheduler Performance Analysis

In [None]:
# Collect scheduler metrics
print("Collecting scheduler performance metrics...")
scheduler_metrics = collector.collect_scheduler_metrics(start_time, end_time)

if 'scheduling_duration' in scheduler_metrics:
    sched_df = scheduler_metrics['scheduling_duration']
    
    if not sched_df.empty:
        avg_latency = sched_df['value'].mean() * 1000  # Convert to ms
        p95_latency = sched_df['value'].quantile(0.95) * 1000
        p99_latency = sched_df['value'].quantile(0.99) * 1000
        
        print(f"\nScheduling Latency Analysis:")
        print(f"Average: {avg_latency:.2f}ms")
        print(f"P95: {p95_latency:.2f}ms")
        print(f"P99: {p99_latency:.2f}ms")
        print(f"Target: <100ms P99 (Current: {'✓' if p99_latency < 100 else '✗'})")
    else:
        print("No scheduling duration data available")

if 'pending_pods' in scheduler_metrics:
    pending_df = scheduler_metrics['pending_pods']
    if not pending_df.empty:
        avg_pending = pending_df['value'].mean()
        max_pending = pending_df['value'].max()
        print(f"\nPod Queuing Analysis:")
        print(f"Average pending pods: {avg_pending:.2f}")
        print(f"Maximum pending pods: {max_pending:.0f}")

## 5. Baseline Performance Metrics

In [None]:
# Calculate current baseline metrics for improvement tracking
baseline_metrics = {
    'current_date': datetime.now().isoformat(),
    'analysis_period_days': 30,
    'cluster_info': {
        'nodes': 6,
        'masters': 3,
        'workers': 3,
        'cpu_cores_per_node': 8,
        'memory_gb_per_node': 16
    }
}

if 'cpu_usage' in node_metrics and not cpu_df.empty:
    baseline_metrics['cpu'] = {
        'current_avg_utilization': float(cpu_df['value'].mean()),
        'target_utilization': 0.65,  # 65% target
        'improvement_needed': float(cpu_df['value'].mean()) - 0.65
    }

if 'memory_usage' in node_metrics:
    baseline_metrics['memory'] = {
        'current_avg_utilization': float(memory_df['memory_pct'].mean() / 100),
        'current_range': f"{memory_df['memory_pct'].min():.1f}-{memory_df['memory_pct'].max():.1f}%"
    }

# Save baseline for future comparison
import json
with open('/home/jovyan/artifacts/week2_baseline_metrics.json', 'w') as f:
    json.dump(baseline_metrics, f, indent=2)

print("Baseline Metrics Established:")
print(json.dumps(baseline_metrics, indent=2))

## 6. Data Quality Assessment

In [None]:
# Assess data quality and completeness
data_quality = {
    'collection_period': f"{start_time} to {end_time}",
    'metrics_collected': list(node_metrics.keys()),
    'total_data_points': sum(len(df) for df in node_metrics.values()),
    'data_completeness': {}
}

for metric_name, df in node_metrics.items():
    if not df.empty:
        missing_pct = df['value'].isna().mean() * 100
        data_quality['data_completeness'][metric_name] = {
            'total_points': len(df),
            'missing_percentage': round(missing_pct, 2),
            'quality_status': 'Good' if missing_pct < 5 else 'Poor'
        }

print("Data Quality Assessment:")
print(json.dumps(data_quality, indent=2))

# Target: >95% data quality success rate
good_quality_metrics = sum(1 for m in data_quality['data_completeness'].values() if m['quality_status'] == 'Good')
total_metrics = len(data_quality['data_completeness'])
quality_rate = good_quality_metrics / total_metrics * 100 if total_metrics > 0 else 0

print(f"\nOverall Data Quality: {quality_rate:.1f}% (Target: >95%)")
print(f"Status: {'✓ PASS' if quality_rate > 95 else '✗ NEEDS IMPROVEMENT'}")

## 7. Next Steps for Week 3

Based on this analysis, prepare for:
- Feature engineering pipeline development
- Temporal pattern extraction
- Node correlation analysis
- Feast feature store setup