# Data Processing Notebook

This notebook processes the raw weather data generated in the previous step.

In [None]:
# Cell 1: Import libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import boto3
from io import StringIO
import os

In [None]:
# Cell 2: Define input file paths
# Instead of local file, we'll read from MinIO
input_file = 'raw_weather_data.csv'  # Just the file name, will read from MinIO
directory = 'weather-dataprocessing-0519233833'  # Match the directory in your DAG

In [None]:
# Cell 3: Read data from MinIO
try:
    # Set up the MinIO/S3 client
    s3_client = boto3.client(
        's3',
        endpoint_url='http://minio.minio-system.svc.cluster.local:9000',
        aws_access_key_id='minio', 
        aws_secret_access_key='minio123',
        region_name='us-east-1'  # Can be any region, doesn't matter for MinIO
    )
    
    # Get the file from MinIO
    print(f"Attempting to read {input_file} from MinIO bucket 'elyra-airflow'")
    response = s3_client.get_object(
        Bucket='elyra-airflow',
        Key=f'{directory}/{input_file}'
    )
    
    # Read the content and convert to DataFrame
    content = response['Body'].read().decode('utf-8')
    df = pd.read_csv(StringIO(content))
    
    print(f"Successfully loaded {len(df)} rows of data from MinIO")
    print(df.head())
    
except Exception as e:
    print(f"Error reading data from MinIO: {e}")
    
    # Try backup method - perhaps it was uploaded to a different path
    try:
        print("Trying alternate path...")
        response = s3_client.get_object(
            Bucket='elyra-airflow',
            Key=input_file  # Try without the directory prefix
        )
        content = response['Body'].read().decode('utf-8')
        df = pd.read_csv(StringIO(content))
        print(f"Successfully loaded {len(df)} rows from alternate path")
    except Exception as e2:
        print(f"Backup method also failed: {e2}")
        
        # List files in bucket to debug
        try:
            print("Listing files in MinIO bucket:")
            response = s3_client.list_objects_v2(Bucket='elyra-airflow')
            if 'Contents' in response:
                for obj in response['Contents']:
                    print(f" - {obj['Key']}")
            else:
                print("No files found in bucket")
        except Exception as e3:
            print(f"Could not list bucket contents: {e3}")
        
        # Create a small sample DF to avoid complete failure
        print("Creating sample data to avoid complete failure")
        df = pd.DataFrame({
            'date': pd.date_range(start='2024-01-01', periods=5),
            'temperature': [20, 22, 19, 23, 25],
            'humidity': [45, 47, 52, 55, 42]
        })

In [None]:
# Cell 4: Process the data
print("Processing weather data...")

# Convert date column to datetime
df['date'] = pd.to_datetime(df['date'])

# Calculate daily statistics
daily_stats = df.groupby(df['date'].dt.date).agg({
    'temperature': ['mean', 'min', 'max'],
    'humidity': ['mean', 'min', 'max'],
    'wind_speed': ['mean', 'max'],
    'precipitation': 'sum'
}).reset_index()

# Flatten column names
daily_stats.columns = ['date', 'avg_temp', 'min_temp', 'max_temp', 'avg_humidity', 
                      'min_humidity', 'max_humidity', 'avg_wind', 'max_wind', 'total_rain']

# Round to one decimal place
numeric_cols = daily_stats.columns.drop('date')
daily_stats[numeric_cols] = daily_stats[numeric_cols].round(1)

print("Daily weather statistics:")
print(daily_stats.head())

# Cell 5: Save processed data back to MinIO
try:
    # Convert processed data to CSV string
    csv_buffer = StringIO()
    daily_stats.to_csv(csv_buffer, index=False)
    
    # Upload to MinIO
    s3_client.put_object(
        Bucket='elyra-airflow',
        Key=f'{directory}/processed_weather_data.csv',
        Body=csv_buffer.getvalue()
    )
    print(f"Successfully saved processed data to MinIO at {directory}/processed_weather_data.csv")
except Exception as e:
    print(f"Error saving processed data to MinIO: {e}")
    # Save locally as fallback
    daily_stats.to_csv('processed_weather_data.csv', index=False)
    print("Saved processed data locally as fallback")