# IntelliStore ML: Data Collection and Preprocessing

This notebook demonstrates how to collect and preprocess access log data from Kafka for training the hot/cold tiering model.

In [None]:
import pandas as pd
import numpy as np
import json
import time
from datetime import datetime, timedelta
from kafka import KafkaConsumer
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Dict, List, Any

# Set style
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")

print("Libraries imported successfully!")

## 1. Kafka Consumer Setup

Connect to Kafka and consume access logs from the `access-logs` topic.

In [None]:
# Kafka configuration
KAFKA_BROKERS = ['localhost:9092']  # Update for your environment
ACCESS_LOGS_TOPIC = 'access-logs'
GROUP_ID = 'ml-data-collection'

def create_kafka_consumer():
    """Create and configure Kafka consumer"""
    consumer = KafkaConsumer(
        ACCESS_LOGS_TOPIC,
        bootstrap_servers=KAFKA_BROKERS,
        group_id=GROUP_ID,
        value_deserializer=lambda m: json.loads(m.decode('utf-8')),
        auto_offset_reset='earliest',
        enable_auto_commit=True,
        consumer_timeout_ms=10000  # 10 second timeout
    )
    return consumer

print("Kafka consumer configuration ready")

## 2. Data Collection

Collect access logs and convert them to a structured format.

In [None]:
def collect_access_logs(max_messages=1000):
    """Collect access logs from Kafka"""
    consumer = create_kafka_consumer()
    messages = []
    
    print(f"Collecting up to {max_messages} messages...")
    
    try:
        for message in consumer:
            messages.append(message.value)
            
            if len(messages) >= max_messages:
                break
                
            if len(messages) % 100 == 0:
                print(f"Collected {len(messages)} messages...")
    
    except Exception as e:
        print(f"Collection stopped: {e}")
    
    finally:
        consumer.close()
    
    print(f"Collected {len(messages)} total messages")
    return messages

# For demo purposes, let's create synthetic data
def generate_synthetic_access_logs(num_records=5000):
    """Generate synthetic access logs for demonstration"""
    np.random.seed(42)
    
    users = [f"user_{i}" for i in range(1, 101)]  # 100 users
    buckets = [f"bucket_{i}" for i in range(1, 21)]  # 20 buckets
    actions = ['upload_object', 'download_object', 'delete_object']
    content_types = ['image/jpeg', 'video/mp4', 'application/pdf', 'text/plain', 'application/zip']
    
    logs = []
    base_time = time.time() - (7 * 24 * 3600)  # 7 days ago
    
    for i in range(num_records):
        # Generate timestamp (more recent = higher probability)
        timestamp = base_time + np.random.exponential(scale=2 * 24 * 3600)
        
        # Generate object key
        object_key = f"object_{np.random.randint(1, 1001)}.{np.random.choice(['jpg', 'mp4', 'pdf', 'txt', 'zip'])}"
        
        # Generate size (log-normal distribution)
        size = int(np.random.lognormal(mean=15, sigma=2))  # ~3MB average
        
        # Generate tier (80% hot, 20% cold initially)
        tier = np.random.choice(['hot', 'cold'], p=[0.8, 0.2])
        
        log_entry = {
            'timestamp': timestamp,
            'user': np.random.choice(users),
            'action': np.random.choice(actions, p=[0.3, 0.6, 0.1]),  # More downloads
            'bucket': np.random.choice(buckets),
            'object': object_key,
            'size': size,
            'tier': tier,
            'success': np.random.choice([True, False], p=[0.95, 0.05]),
            'metadata': {
                'content_type': np.random.choice(content_types),
                'user_agent': 'intellistore-client/1.0'
            }
        }
        
        logs.append(log_entry)
    
    return logs

# Generate synthetic data for demo
print("Generating synthetic access logs...")
access_logs = generate_synthetic_access_logs(5000)
print(f"Generated {len(access_logs)} synthetic access logs")

## 3. Data Preprocessing

Convert raw access logs into a structured DataFrame with features for ML training.

In [None]:
def preprocess_access_logs(logs: List[Dict]) -> pd.DataFrame:
    """Convert access logs to DataFrame and add features"""
    df = pd.DataFrame(logs)
    
    # Convert timestamp to datetime
    df['datetime'] = pd.to_datetime(df['timestamp'], unit='s')
    
    # Extract time-based features
    df['hour_of_day'] = df['datetime'].dt.hour
    df['day_of_week'] = df['datetime'].dt.dayofweek
    df['is_weekend'] = df['day_of_week'].isin([5, 6]).astype(int)
    df['is_business_hours'] = ((df['hour_of_day'] >= 9) & (df['hour_of_day'] <= 17)).astype(int)
    
    # Object age (days since first access)
    first_access = df.groupby('object')['datetime'].min().to_dict()
    df['object_age_days'] = (df['datetime'] - df['object'].map(first_access)).dt.total_seconds() / (24 * 3600)
    
    # Size categories
    df['size_category'] = pd.cut(df['size'], 
                                bins=[0, 1024, 1024*1024, 100*1024*1024, float('inf')],
                                labels=['small', 'medium', 'large', 'xlarge'])
    
    # Content type categories
    df['content_type'] = df['metadata'].apply(lambda x: x.get('content_type', 'unknown'))
    df['is_media'] = df['content_type'].str.startswith(('image/', 'video/', 'audio/')).astype(int)
    
    # User activity features
    user_activity = df.groupby('user').size().to_dict()
    df['user_activity_level'] = df['user'].map(user_activity)
    
    # Bucket popularity
    bucket_popularity = df.groupby('bucket').size().to_dict()
    df['bucket_popularity'] = df['bucket'].map(bucket_popularity)
    
    return df

# Preprocess the data
print("Preprocessing access logs...")
df = preprocess_access_logs(access_logs)
print(f"Preprocessed DataFrame shape: {df.shape}")
print(f"Columns: {list(df.columns)}")

## 4. Feature Engineering

Create features that will help predict whether an object will be "hot" (frequently accessed) in the next 24 hours.

In [None]:
def create_ml_features(df: pd.DataFrame, prediction_window_hours=24) -> pd.DataFrame:
    """Create features for ML model training"""
    # Sort by timestamp
    df = df.sort_values('datetime').reset_index(drop=True)
    
    # Create sliding window features for each object
    features_list = []
    
    # Group by object to calculate per-object features
    for object_key, object_df in df.groupby('object'):
        object_df = object_df.sort_values('datetime').reset_index(drop=True)
        
        for i in range(len(object_df)):
            current_time = object_df.iloc[i]['datetime']
            
            # Look back window (past 7 days)
            lookback_start = current_time - timedelta(days=7)
            lookback_data = object_df[object_df['datetime'].between(lookback_start, current_time)]
            
            # Look forward window (next 24 hours) for label
            lookahead_end = current_time + timedelta(hours=prediction_window_hours)
            lookahead_data = object_df[object_df['datetime'].between(current_time, lookahead_end)]
            
            # Skip if not enough future data for labeling
            if lookahead_data.empty:
                continue
            
            # Calculate features
            features = {
                'object_key': object_key,
                'timestamp': current_time,
                'hour_of_day': object_df.iloc[i]['hour_of_day'],
                'day_of_week': object_df.iloc[i]['day_of_week'],
                'is_weekend': object_df.iloc[i]['is_weekend'],
                'is_business_hours': object_df.iloc[i]['is_business_hours'],
                'object_age_days': object_df.iloc[i]['object_age_days'],
                'size': object_df.iloc[i]['size'],
                'size_category': object_df.iloc[i]['size_category'],
                'is_media': object_df.iloc[i]['is_media'],
                'current_tier': object_df.iloc[i]['tier'],
                'user_activity_level': object_df.iloc[i]['user_activity_level'],
                'bucket_popularity': object_df.iloc[i]['bucket_popularity'],
                
                # Historical access patterns (past 7 days)
                'access_count_7d': len(lookback_data),
                'download_count_7d': len(lookback_data[lookback_data['action'] == 'download_object']),
                'unique_users_7d': lookback_data['user'].nunique(),
                'avg_daily_access': len(lookback_data) / 7,
                'last_access_hours_ago': (current_time - lookback_data['datetime'].max()).total_seconds() / 3600 if not lookback_data.empty else 999,
                
                # Recent trend (past 24 hours vs past 7 days)
                'recent_access_trend': len(lookback_data[lookback_data['datetime'] >= current_time - timedelta(hours=24)]) / max(1, len(lookback_data) / 7),
            }
            
            # Create label: will this object be accessed > threshold times in next 24 hours?
            future_access_count = len(lookahead_data[lookahead_data['action'] == 'download_object'])
            features['future_access_count'] = future_access_count
            features['is_hot'] = int(future_access_count >= 3)  # Threshold: 3+ accesses = hot
            
            features_list.append(features)
    
    return pd.DataFrame(features_list)

# Create ML features
print("Creating ML features...")
ml_df = create_ml_features(df)
print(f"ML DataFrame shape: {ml_df.shape}")
print(f"Hot objects: {ml_df['is_hot'].sum()} ({ml_df['is_hot'].mean():.2%})")

## 5. Exploratory Data Analysis

Analyze the data to understand patterns and relationships.

In [None]:
# Basic statistics
print("=== Dataset Overview ===")
print(f"Total samples: {len(ml_df)}")
print(f"Hot objects: {ml_df['is_hot'].sum()} ({ml_df['is_hot'].mean():.2%})")
print(f"Cold objects: {(~ml_df['is_hot'].astype(bool)).sum()} ({(~ml_df['is_hot'].astype(bool)).mean():.2%})")
print(f"Date range: {ml_df['timestamp'].min()} to {ml_df['timestamp'].max()}")

# Feature distributions
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
fig.suptitle('Feature Distributions', fontsize=16)

# Hour of day distribution
ml_df.groupby(['hour_of_day', 'is_hot']).size().unstack().plot(kind='bar', ax=axes[0,0])
axes[0,0].set_title('Access Patterns by Hour')
axes[0,0].set_xlabel('Hour of Day')
axes[0,0].legend(['Cold', 'Hot'])

# Day of week distribution
ml_df.groupby(['day_of_week', 'is_hot']).size().unstack().plot(kind='bar', ax=axes[0,1])
axes[0,1].set_title('Access Patterns by Day of Week')
axes[0,1].set_xlabel('Day of Week (0=Monday)')
axes[0,1].legend(['Cold', 'Hot'])

# Object age distribution
ml_df.boxplot(column='object_age_days', by='is_hot', ax=axes[0,2])
axes[0,2].set_title('Object Age Distribution')
axes[0,2].set_xlabel('Is Hot')

# Size distribution
ml_df.boxplot(column='size', by='is_hot', ax=axes[1,0])
axes[1,0].set_title('Object Size Distribution')
axes[1,0].set_xlabel('Is Hot')
axes[1,0].set_yscale('log')

# Access count distribution
ml_df.boxplot(column='access_count_7d', by='is_hot', ax=axes[1,1])
axes[1,1].set_title('7-Day Access Count Distribution')
axes[1,1].set_xlabel('Is Hot')

# Recent trend distribution
ml_df.boxplot(column='recent_access_trend', by='is_hot', ax=axes[1,2])
axes[1,2].set_title('Recent Access Trend Distribution')
axes[1,2].set_xlabel('Is Hot')

plt.tight_layout()
plt.show()

# Correlation matrix
numeric_cols = ml_df.select_dtypes(include=[np.number]).columns
correlation_matrix = ml_df[numeric_cols].corr()

plt.figure(figsize=(12, 10))
sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', center=0, fmt='.2f')
plt.title('Feature Correlation Matrix')
plt.tight_layout()
plt.show()

## 6. Data Export

Save the processed data for model training.

In [None]:
# Save processed data
output_file = '../data/processed_access_logs.csv'
ml_df.to_csv(output_file, index=False)
print(f"Saved processed data to {output_file}")

# Save feature metadata
feature_info = {
    'total_samples': len(ml_df),
    'hot_samples': ml_df['is_hot'].sum(),
    'cold_samples': (~ml_df['is_hot'].astype(bool)).sum(),
    'features': list(ml_df.columns),
    'numeric_features': list(ml_df.select_dtypes(include=[np.number]).columns),
    'categorical_features': list(ml_df.select_dtypes(include=['object', 'category']).columns),
    'target_column': 'is_hot',
    'date_range': {
        'start': str(ml_df['timestamp'].min()),
        'end': str(ml_df['timestamp'].max())
    }
}

import json
with open('../data/feature_metadata.json', 'w') as f:
    json.dump(feature_info, f, indent=2, default=str)

print("Data collection and preprocessing completed!")
print(f"Next step: Run the model training notebook with {len(ml_df)} samples")