Data Engineering Pipeline

In [1]:
# Advanced ETL Pipeline

import pandas as pd
import numpy as np
import os
from datetime import datetime
import logging
from functools import lru_cache

In [2]:

# Logging Setup
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

In [3]:
# ETL Class Design

class APILogETLPipeline:
    def __init__(self, input_path, output_path):
        self.input_path = input_path
        self.output_path = output_path
        self.df = None
    
    def extract(self):
        try:
            logging.info("Extracting data from CSV...")
            self.df = pd.read_csv(self.input_path)
            logging.info(f"Loaded {len(self.df)} records.")
        except Exception as e:
            logging.error(f"Extraction failed: {e}")
            raise

    def transform(self):
        try:
            logging.info("Starting transformation...")

            # Clean timestamp
            self.df['timestamp'] = pd.to_datetime(self.df['timestamp'], utc=True)

            # Drop duplicates
            self.df.drop_duplicates(subset='request_id', inplace=True)

            # Fill missing values
            self.df['response_time_ms'].fillna(self.df['response_time_ms'].median(), inplace=True)

            # Add temporal features
            self.df['date'] = self.df['timestamp'].dt.date
            self.df['hour'] = self.df['timestamp'].dt.hour
            self.df['day_of_week'] = self.df['timestamp'].dt.dayofweek
            self.df['week_of_year'] = self.df['timestamp'].dt.isocalendar().week
            self.df['is_weekend'] = self.df['day_of_week'].isin([5, 6])
            self.df['is_peak_hour'] = self.df['hour'].between(9, 18)

            # Performance indicators
            self.df['latency_zscore'] = (
                (self.df['response_time_ms'] - self.df['response_time_ms'].mean()) / self.df['response_time_ms'].std()
            )
            self.df['is_latency_outlier'] = self.df['latency_zscore'].abs() > 3
            self.df['latency_category'] = pd.qcut(self.df['response_time_ms'], q=4, labels=['low', 'medium', 'high', 'critical'])

            self.df['cpu_load_category'] = pd.cut(self.df['cpu_usage'], bins=[0, 50, 75, 100], labels=['normal', 'elevated', 'high'])
            self.df['memory_load_category'] = pd.cut(self.df['memory_usage'], bins=[0, 50, 75, 100], labels=['normal', 'elevated', 'high'])

            # Status-related features
            self.df['is_error'] = self.df['status_code'] >= 400
            self.df['error_type'] = self.df['status_code'].apply(
                lambda x: 'client_error' if 400 <= x < 500 else 'server_error' if x >= 500 else 'success'
            )

            # Behavior-based features
            self.df['api_signature'] = self.df['api_endpoint'] + "_" + self.df['status_code'].astype(str)

            # Group-level aggregation
            self.df['request_count'] = self.df.groupby(['consumer_id', 'date'])['request_id'].transform('count')

            logging.info("Transformation complete.")
        except Exception as e:
            logging.error(f"Transformation failed: {e}")
            raise

    def load(self):
        try:
            os.makedirs(os.path.dirname(self.output_path), exist_ok=True)
            self.df.to_csv(self.output_path, index=False)
            logging.info(f" Data saved to {self.output_path}")
        except Exception as e:
            logging.error(f"Load failed: {e}")
            raise

    def run(self):
        self.extract()
        self.transform()
        self.load()

In [4]:
# Run Pipeline

if __name__ == "__main__":
    pipeline = APILogETLPipeline(
        input_path="../data/api_logs_simulated.csv",
        output_path="../data/api_logs_engineered.csv"
    )
    pipeline.run()


2025-03-29 22:06:57,457 - INFO - Extracting data from CSV...
2025-03-29 22:06:57,685 - INFO - Loaded 100000 records.
2025-03-29 22:06:57,686 - INFO - Starting transformation...
The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  self.df['response_time_ms'].fillna(self.df['response_time_ms'].median(), inplace=True)
2025-03-29 22:06:57,904 - INFO - Transformation complete.
2025-03-29 22:06:57,913 - ERROR - Load failed: [Errno 13] Permission denied: '../data/api_logs_engineered.csv'


PermissionError: [Errno 13] Permission denied: '../data/api_logs_engineered.csv'