In [3]:
import logging
import configparser
import psycopg2
from psycopg2 import extras
import os
from datetime import datetime

# --- Logging ---
def setup_logging(log_file):
    """Sets up logging configuration."""
    if not os.path.exists(os.path.dirname(log_file)):
        os.makedirs(os.path.dirname(log_file))

    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        handlers=[
            logging.FileHandler(log_file),
            logging.StreamHandler()
        ]
    )
    return logging.getLogger(__name__)

# --- Database Connection ---
def get_db_connection(config_file='config.ini'):
    """Establishes and returns a database connection."""
    config = configparser.ConfigParser()
    config.read(config_file)
    db_config = config['DATABASE']

    try:
        conn = psycopg2.connect(
            host=db_config['DB_HOST'],
            database=db_config['DB_NAME'],
            user=db_config['DB_USER'],
            password=db_config['DB_PASSWORD'],
            port=db_config['DB_PORT']
        )
        return conn
    except psycopg2.Error as e:
        logger.error(f"Error connecting to database: {e}")
        raise

# --- Config Loader ---
def load_config(config_file='config.ini'):
    """Loads configuration from config.ini."""
    config = configparser.ConfigParser()
    config.read(config_file)
    return config

# --- Timestamp Management ---
def get_last_run_timestamp(file_path):
    """Reads the timestamp of the last successful ETL run."""
    if os.path.exists(file_path):
        with open(file_path, 'r') as f:
            ts_str = f.read().strip()
            if ts_str:
                return datetime.fromisoformat(ts_str)
    return None

def set_last_run_timestamp(file_path, timestamp):
    """Writes the current timestamp as the last successful ETL run."""
    if not os.path.exists(os.path.dirname(file_path)):
        os.makedirs(os.path.dirname(file_path))
    with open(file_path, 'w') as f:
        f.write(timestamp.isoformat())

logger = setup_logging('logs/etl_pipeline.log') # Inisialisasi logger di sini

In [2]:
pip install psycopg2

Collecting psycopg2
  Downloading psycopg2-2.9.11-cp313-cp313-win_amd64.whl.metadata (5.1 kB)
Downloading psycopg2-2.9.11-cp313-cp313-win_amd64.whl (2.7 MB)
   ---------------------------------------- 0.0/2.7 MB ? eta -:--:--
   --- ------------------------------------ 0.3/2.7 MB ? eta -:--:--
   ----------- ---------------------------- 0.8/2.7 MB 2.1 MB/s eta 0:00:01
   ----------------------- ---------------- 1.6/2.7 MB 2.7 MB/s eta 0:00:01
   ------------------------------ --------- 2.1/2.7 MB 2.8 MB/s eta 0:00:01
   ---------------------------------------- 2.7/2.7 MB 2.9 MB/s  0:00:00
Installing collected packages: psycopg2
Successfully installed psycopg2-2.9.11
Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 25.2 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


In [6]:
import pandas as pd
from utils import get_db_connection, get_last_run_timestamp, set_last_run_timestamp, logger
from datetime import datetime, timedelta

def extract_data(table_name, last_run_timestamp=None, batch_size=None, id_column=None, timestamp_column=None):
    """
    Extracts data from a given table.
    Performs incremental load if last_run_timestamp and timestamp_column are provided.
    """
    conn = None
    try:
        conn = get_db_connection()
        query = f"SELECT * FROM {table_name}"
        if last_run_timestamp and timestamp_column:
            query += f" WHERE {timestamp_column} > '{last_run_timestamp.isoformat()}'"
            logger.info(f"Performing incremental load for {table_name} since {last_run_timestamp.isoformat()}")
        else:
            logger.info(f"Performing full load for {table_name}")

        query += " ORDER BY "
        if timestamp_column:
            query += f"{timestamp_column} ASC"
        elif id_column:
            query += f"{id_column} ASC"
        else:
            query += f"1 ASC" # Default to first column

        # If batch_size is specified and not full load, fetch incrementally
        if batch_size and last_run_timestamp and timestamp_column:
            # This part would be more complex for true incremental batching
            # For simplicity, we fetch all new data if timestamp is used
            # For very large tables, you'd iterate with LIMIT/OFFSET or id_column paging
            df = pd.read_sql(query, conn)
            logger.info(f"Extracted {len(df)} new records from {table_name}.")
        else:
            df = pd.read_sql(query, conn)
            logger.info(f"Extracted {len(df)} records from {table_name}.")

        return df

    except Exception as e:
        logger.error(f"Error during data extraction from {table_name}: {e}")
        return pd.DataFrame() # Return empty DataFrame on error
    finally:
        if conn:
            conn.close()

def handle_data_quality(df, table_name):
    """
    Basic data quality checks and handling.
    This is a placeholder and should be expanded based on specific data issues.
    """
    initial_rows = len(df)
    if df.empty:
        logger.warning(f"No data to process for {table_name}. Skipping quality checks.")
        return df

    # Example 1: Drop rows with critical NaNs (e.g., product_id, quantity)
    if 'product_id' in df.columns and 'quantity' in df.columns:
        df.dropna(subset=['product_id', 'quantity'], inplace=True)
        if len(df) < initial_rows:
            logger.warning(f"Dropped {initial_rows - len(df)} rows due to critical NaNs in {table_name}.")
            initial_rows = len(df)

    # Example 2: Convert types if necessary (Pandas read_sql usually handles this well)
    # Ensure quantity is integer
    if 'quantity' in df.columns:
        df['quantity'] = pd.to_numeric(df['quantity'], errors='coerce').fillna(0).astype(int)
    if 'unit_price' in df.columns:
        df['unit_price'] = pd.to_numeric(df['unit_price'], errors='coerce').fillna(0.0)

    # Example 3: Remove duplicates if a natural key exists
    # e.g., for stocks, product_id + warehouse_id should be unique
    if table_name == 'stocks' and 'product_id' in df.columns and 'warehouse_id' in df.columns:
        df.drop_duplicates(subset=['product_id', 'warehouse_id'], inplace=True)
        if len(df) < initial_rows:
            logger.warning(f"Dropped {initial_rows - len(df)} duplicate stock records.")
            initial_rows = len(df)

    logger.info(f"Data quality check completed for {table_name}. Remaining rows: {len(df)}")
    return df

def run_extraction(config):
    """Orchestrates data extraction for all necessary tables."""
    last_run_timestamp_file = config['ETL']['LAST_RUN_FILE']
    last_run_ts = get_last_run_timestamp(last_run_timestamp_file)

    extracted_data = {}
    tables_to_extract = {
        'warehouses': {'id_col': 'warehouse_id'},
        'products': {'id_col': 'product_id'},
        'categories': {'id_col': 'category_id'},
        'suppliers': {'id_col': 'supplier_id'},
        'stocks': {'id_col': 'stock_id', 'ts_col': 'last_updated'},
        'stock_movements': {'id_col': 'movement_id', 'ts_col': 'movement_date'},
        'stock_receipt_costs': {'id_col': 'receipt_id', 'ts_col': 'receipt_date'},
        'reorder_points': {'id_col': 'reorder_point_id'},
        # Add other tables like purchase_orders, sales_orders if needed for specific metrics
        'purchase_orders': {'id_col': 'id', 'ts_col': 'order_date'}, # Assuming 'id' is PK and 'order_date' is timestamp
        'purchase_order_details': {'id_col': 'id'},
        'sales_orders': {'id_col': 'id', 'ts_col': 'order_date'},
        'sales_order_details': {'id_col': 'id'},
    }
    
    current_run_timestamp = datetime.now() # Mark this run's timestamp

    for table_name, params in tables_to_extract.items():
        df = extract_data(
            table_name=table_name,
            last_run_timestamp=last_run_ts if params.get('ts_col') else None,
            id_column=params.get('id_col'),
            timestamp_column=params.get('ts_col')
        )
        df = handle_data_quality(df, table_name)
        extracted_data[table_name] = df

    # Update the last run timestamp after successful extraction of all tables
    set_last_run_timestamp(last_run_timestamp_file, current_run_timestamp)
    return extracted_data

ImportError: cannot import name 'get_db_connection' from 'utils' (C:\Users\62818\AppData\Local\Programs\Python\Python313\Lib\site-packages\utils\__init__.py)

In [5]:
pip install utils

Collecting utils
  Downloading utils-1.0.2.tar.gz (13 kB)
  Installing build dependencies: started
  Installing build dependencies: finished with status 'done'
  Getting requirements to build wheel: started
  Getting requirements to build wheel: finished with status 'done'
  Preparing metadata (pyproject.toml): started
  Preparing metadata (pyproject.toml): finished with status 'done'
Building wheels for collected packages: utils
  Building wheel for utils (pyproject.toml): started
  Building wheel for utils (pyproject.toml): finished with status 'done'
  Created wheel for utils: filename=utils-1.0.2-py2.py3-none-any.whl size=14012 sha256=ecc3b2e681e38d2283f43af46890f57278b693eedb650d905fab1916711722f5
  Stored in directory: c:\users\62818\appdata\local\pip\cache\wheels\cd\95\b5\2513b327cdc0ef5f9b087f0596bff56100e7ec84e0d0f4ed18
Successfully built utils
Installing collected packages: utils
Successfully installed utils-1.0.2
Note: you may need to restart the kernel to use updated packag


[notice] A new release of pip is available: 25.2 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


In [7]:
import pandas as pd
from datetime import datetime, timedelta
from utils import logger
import numpy as np

def calculate_inventory_metrics(data, config):
    """
    Calculates various inventory-related metrics.
    Assumes 'stocks', 'products', 'stock_movements', 'stock_receipt_costs' are in data.
    """
    logger.info("Calculating Inventory Metrics...")
    df_stocks = data.get('stocks', pd.DataFrame())
    df_products = data.get('products', pd.DataFrame())
    df_movements = data.get('stock_movements', pd.DataFrame())
    df_receipts = data.get('stock_receipt_costs', pd.DataFrame())

    if df_stocks.empty or df_products.empty:
        logger.warning("Missing 'stocks' or 'products' data for inventory metrics.")
        return {}

    # Merge product info for unit price
    df_stocks = pd.merge(df_stocks, df_products[['product_id', 'unit_price']], on='product_id', how='left')
    
    metrics = {}

    # Stock turnover ratio per product/category
    # Sum of sales (OUT movements) in a period / Average inventory value in that period
    # This needs sales data and average stock value. For simplicity, let's use current stock value and OUT movements.
    if not df_movements.empty:
        sales_movements = df_movements[df_movements['movement_type'].str.contains('OUT', na=False)]
        sales_qty_per_product = sales_movements.groupby('product_id')['quantity'].sum().reset_index()
        sales_qty_per_product.rename(columns={'quantity': 'total_out_qty'}, inplace=True)

        df_turnover = pd.merge(df_stocks, sales_qty_per_product, on='product_id', how='left').fillna(0)
        df_turnover['avg_inventory_qty'] = df_turnover['quantity'] # Simplification: use current as avg
        df_turnover['turnover_ratio'] = df_turnover['total_out_qty'] / df_turnover['avg_inventory_qty']
        
        metrics['stock_turnover_ratio_product'] = df_turnover[['product_id', 'turnover_ratio']]
        logger.info("Calculated stock turnover ratio per product.")

    # Days of inventory on hand
    # (Current Inventory / Cost of Goods Sold per day)
    # COGS per day needs more info. Using average daily OUT quantity as proxy for now.
    if not df_movements.empty and not df_stocks.empty:
        period_days = (datetime.now() - df_movements['movement_date'].min()).days
        if period_days == 0: period_days = 1 # Avoid division by zero
        
        sales_qty_per_product_daily_avg = df_movements[df_movements['movement_type'].str.contains('OUT', na=False)] \
                                          .groupby('product_id')['quantity'].sum().reset_index()
        sales_qty_per_product_daily_avg['avg_daily_out_qty'] = sales_qty_per_product_daily_avg['quantity'] / period_days
        
        df_days_on_hand = pd.merge(df_stocks, sales_qty_per_product_daily_avg, on='product_id', how='left').fillna(0)
        df_days_on_hand['days_of_inventory_on_hand'] = df_days_on_hand['quantity'] / df_days_on_hand['avg_daily_out_qty']
        df_days_on_hand.replace([np.inf, -np.inf], np.nan, inplace=True) # Handle division by zero
        
        metrics['days_of_inventory_on_hand_product'] = df_days_on_hand[['product_id', 'days_of_inventory_on_hand']]
        logger.info("Calculated days of inventory on hand per product.")

    # Dead stock identification (no movement >180 days)
    dead_stock_days = int(config['METRICS']['DEAD_STOCK_DAYS'])
    threshold_date = datetime.now() - timedelta(days=dead_stock_days)

    if not df_movements.empty and not df_stocks.empty:
        last_movement_date = df_movements.groupby('product_id')['movement_date'].max().reset_index()
        last_movement_date.rename(columns={'movement_date': 'last_movement_date'}, inplace=True)

        df_dead_stock = pd.merge(df_stocks, last_movement_date, on='product_id', how='left')
        
        # Consider products with no movement history (NaN last_movement_date) as potentially dead if they have stock
        df_dead_stock['is_dead_stock'] = (
            (df_dead_stock['last_movement_date'].isna() & (df_dead_stock['quantity'] > 0)) |
            (df_dead_stock['last_movement_date'] < threshold_date)
        )
        metrics['dead_stock_identification'] = df_dead_stock[df_dead_stock['is_dead_stock']][['product_id', 'warehouse_id', 'quantity', 'last_movement_date']]
        logger.info(f"Identified dead stock (no movement > {dead_stock_days} days).")

    # Stock accuracy (physical vs system) - requires external "physical count" data, so skipped for now.

    return metrics

def calculate_movement_analytics(data):
    """
    Calculates various movement-related analytics.
    Assumes 'stock_movements' is in data.
    """
    logger.info("Calculating Movement Analytics...")
    df_movements = data.get('stock_movements', pd.DataFrame())

    if df_movements.empty:
        logger.warning("Missing 'stock_movements' data for movement analytics.")
        return {}

    metrics = {}

    df_movements['movement_date'] = pd.to_datetime(df_movements['movement_date'])
    df_movements['date'] = df_movements['movement_date'].dt.date
    df_movements['week'] = df_movements['movement_date'].dt.isocalendar().week
    df_movements['month'] = df_movements['movement_date'].dt.month
    df_movements['hour'] = df_movements['movement_date'].dt.hour
    df_movements['movement_value'] = df_movements['quantity'] # For sum/avg calculations

    # Average daily movement per product
    daily_movement_product = df_movements.groupby(['product_id', 'date'])['movement_value'].sum().reset_index()
    avg_daily_movement_product = daily_movement_product.groupby('product_id')['movement_value'].mean().reset_index()
    avg_daily_movement_product.rename(columns={'movement_value': 'avg_daily_movement'}, inplace=True)
    metrics['avg_daily_movement_product'] = avg_daily_movement_product
    logger.info("Calculated average daily movement per product.")

    # Peak periods identification (hourly, daily, weekly, monthly)
    metrics['peak_hourly_movement'] = df_movements.groupby('hour')['movement_value'].sum().reset_index().sort_values(by='movement_value', ascending=False)
    metrics['peak_daily_movement'] = df_movements.groupby('date')['movement_value'].sum().reset_index().sort_values(by='movement_value', ascending=False)
    metrics['peak_weekly_movement'] = df_movements.groupby('week')['movement_value'].sum().reset_index().sort_values(by='movement_value', ascending=False)
    metrics['peak_monthly_movement'] = df_movements.groupby('month')['movement_value'].sum().reset_index().sort_values(by='movement_value', ascending=False)
    logger.info("Identified peak movement periods.")

    # Movement trends (daily, weekly, monthly) - Aggregations
    metrics['daily_movement_trend'] = df_movements.groupby('date')['movement_value'].sum().reset_index()
    metrics['weekly_movement_trend'] = df_movements.groupby('week')['movement_value'].sum().reset_index()
    metrics['monthly_movement_trend'] = df_movements.groupby('month')['movement_value'].sum().reset_index()
    logger.info("Calculated movement trends.")

    # Seasonal patterns detection (basic: avg movement per month)
    metrics['seasonal_monthly_avg_movement'] = df_movements.groupby('month')['movement_value'].mean().reset_index()
    logger.info("Detected basic seasonal patterns.")

    return metrics

def calculate_warehouse_performance(data):
    """
    Calculates warehouse performance metrics.
    Assumes 'warehouses', 'stocks', 'stock_movements' are in data.
    """
    logger.info("Calculating Warehouse Performance...")
    df_warehouses = data.get('warehouses', pd.DataFrame())
    df_stocks = data.get('stocks', pd.DataFrame())
    df_movements = data.get('stock_movements', pd.DataFrame())

    if df_warehouses.empty:
        logger.warning("Missing 'warehouses' data for warehouse performance.")
        return {}
    
    metrics = {}

    # Utilization rate per warehouse (requires warehouse capacity data, which we don't have)
    # Placeholder: total quantity stored
    warehouse_stock_summary = df_stocks.groupby('warehouse_id')['quantity'].sum().reset_index()
    metrics['warehouse_total_stock_qty'] = warehouse_stock_summary
    logger.info("Calculated total stock quantity per warehouse (utilization proxy).")

    # In/Out efficiency
    if not df_movements.empty:
        in_out_movements = df_movements.groupby(['warehouse_id', 'movement_type'])['quantity'].sum().reset_index()
        in_movements = in_out_movements[in_out_movements['movement_type'].str.contains('IN', na=False)]
        out_movements = in_out_movements[in_out_movements['movement_type'].str.contains('OUT', na=False)]

        in_movements.rename(columns={'quantity': 'total_in_qty'}, inplace=True)
        out_movements.rename(columns={'quantity': 'total_out_qty'}, inplace=True)

        df_efficiency = pd.merge(
            in_movements[['warehouse_id', 'total_in_qty']],
            out_movements[['warehouse_id', 'total_out_qty']],
            on='warehouse_id', how='outer'
        ).fillna(0)
        df_efficiency['in_out_ratio'] = df_efficiency['total_in_qty'] / df_efficiency['total_out_qty']
        df_efficiency.replace([np.inf, -np.inf], np.nan, inplace=True)

        metrics['warehouse_in_out_efficiency'] = df_efficiency
        logger.info("Calculated warehouse In/Out efficiency.")

    # Transfer patterns between warehouses
    if not df_movements.empty:
        df_movements_copy = df_movements.copy()
        df_movements_copy['from_warehouse_id'] = np.where(df_movements_copy['movement_type'] == 'OUT', df_movements_copy['warehouse_id'], np.nan)
        df_movements_copy['to_warehouse_id'] = np.where(df_movements_copy['movement_type'] == 'IN', df_movements_copy['warehouse_id'], np.nan)

        # To properly track transfers, a specific 'TRANSFER' type in movement_type with FROM/TO fields would be better.
        # For this setup, we'd need to link IN/OUT movements that belong to the same transfer.
        # This is a simplification:
        transfer_outs = df_movements[(df_movements['movement_type'] == 'OUT') & (df_movements['notes'].str.contains('Transfer OUT', na=False))]
        transfer_ins = df_movements[(df_movements['movement_type'] == 'IN') & (df_movements['notes'].str.contains('Transfer IN', na=False))]

        # A robust solution would pair these based on product_id, quantity, and close timestamps.
        # For simplicity, we just aggregate transfer quantities.
        metrics['total_transfer_out_qty'] = transfer_outs.groupby('warehouse_id')['quantity'].sum().reset_index()
        metrics['total_transfer_in_qty'] = transfer_ins.groupby('warehouse_id')['quantity'].sum().reset_index()
        logger.info("Calculated basic transfer patterns.")


    # Geographic distribution optimization - requires geographic data (lat/long) and demand points. Skipped for now.

    return metrics


def calculate_financial_metrics(data):
    """
    Calculates financial metrics.
    Assumes 'stocks', 'stock_receipt_costs', 'products' are in data.
    """
    logger.info("Calculating Financial Metrics...")
    df_stocks = data.get('stocks', pd.DataFrame())
    df_receipts = data.get('stock_receipt_costs', pd.DataFrame())
    df_products = data.get('products', pd.DataFrame())
    
    metrics = {}

    if df_stocks.empty or df_products.empty or df_receipts.empty:
        logger.warning("Missing 'stocks', 'products', or 'stock_receipt_costs' data for financial metrics.")
        return {}

    # Inventory value over time (snapshot of current value using AVG method for simplicity)
    # This function uses the `calculate_stock_value` PostgreSQL function, so we need a DB connection.
    # Alternatively, replicate the logic here. Let's replicate AVG logic for Python.
    
    # Merge current stock with product details
    df_current_stock_details = pd.merge(df_stocks, df_products[['product_id', 'name']], on='product_id', how='left')
    
    # Calculate Average Cost for each product in each warehouse based on remaining receipts
    avg_costs_data = []
    for (product_id, warehouse_id), group in df_receipts.groupby(['product_id', 'warehouse_id']):
        remaining_qty = group['remaining_quantity'].sum()
        total_remaining_cost = (group['remaining_quantity'] * group['unit_cost']).sum()
        
        if remaining_qty > 0:
            avg_unit_cost = total_remaining_cost / remaining_qty
            avg_costs_data.append({'product_id': product_id, 'warehouse_id': warehouse_id, 'avg_unit_cost': avg_unit_cost})
    
    df_avg_costs = pd.DataFrame(avg_costs_data)

    df_inventory_value = pd.merge(
        df_current_stock_details, 
        df_avg_costs, 
        on=['product_id', 'warehouse_id'], 
        how='left'
    ).fillna(0) # Fill NaN avg_unit_cost with 0 if no receipts

    df_inventory_value['item_value'] = df_inventory_value['quantity'] * df_inventory_value['avg_unit_cost']
    metrics['inventory_value_overview'] = df_inventory_value[['product_id', 'warehouse_id', 'quantity', 'avg_unit_cost', 'item_value']]
    metrics['total_inventory_value'] = df_inventory_value['item_value'].sum()
    logger.info("Calculated inventory value (using AVG method).")


    # Holding cost calculation (requires holding cost percentage, e.g., 15-30% of inventory value annually)
    # Assume 20% annual holding cost for simplicity
    annual_holding_cost_rate = 0.20
    if 'total_inventory_value' in metrics:
        metrics['estimated_annual_holding_cost'] = metrics['total_inventory_value'] * annual_holding_cost_rate
    logger.info("Calculated estimated holding cost.")


    # Stock-out cost estimation (requires average profit margin per product and estimated lost sales due to stock-out)
    # This is highly theoretical and requires sales data + margin. Skipped for now.
    
    # ABC analysis (Pareto classification)
    # Classify products by their contribution to total inventory value (A: 80%, B: 15%, C: 5%)
    if not metrics['inventory_value_overview'].empty:
        df_abc = metrics['inventory_value_overview'].groupby('product_id')['item_value'].sum().reset_index()
        df_abc = df_abc.sort_values(by='item_value', ascending=False).reset_index(drop=True)
        df_abc['cumulative_value'] = df_abc['item_value'].cumsum()
        df_abc['cumulative_percent'] = (df_abc['cumulative_value'] / df_abc['item_value'].sum()) * 100

        def assign_abc_class(row):
            if row['cumulative_percent'] <= 80:
                return 'A'
            elif row['cumulative_percent'] <= 95: # 80 + 15 = 95
                return 'B'
            else:
                return 'C'
        df_abc['abc_class'] = df_abc.apply(assign_abc_class, axis=1)
        metrics['abc_analysis'] = pd.merge(df_abc, df_products[['product_id', 'name']], on='product_id', how='left')
        logger.info("Performed ABC analysis.")

    return metrics

def run_transformations(extracted_data, config):
    """Orchestrates all transformations."""
    logger.info("Starting data transformations...")
    transformed_data = {}

    transformed_data.update(calculate_inventory_metrics(extracted_data, config))
    transformed_data.update(calculate_movement_analytics(extracted_data))
    transformed_data.update(calculate_warehouse_performance(extracted_data))
    transformed_data.update(calculate_financial_metrics(extracted_data))

    logger.info("Data transformations completed.")
    return transformed_data

ImportError: cannot import name 'logger' from 'utils' (C:\Users\62818\AppData\Local\Programs\Python\Python313\Lib\site-packages\utils\__init__.py)

In [8]:
import pandas as pd
from utils import logger, load_config
import os
from datetime import datetime
import jinja2 # Untuk template HTML
# import pdfkit # Untuk PDF, perlu wkhtmltopdf terinstal

def create_summary_tables(transformed_data, db_conn):
    """
    Loads transformed data into summary tables in the database.
    This is a conceptual example; actual DDL would be needed.
    """
    logger.info("Creating summary tables/materialized views in database...")
    cursor = db_conn.cursor()

    try:
        # Example: inventory_value_summary
        if 'inventory_value_overview' in transformed_data:
            df_summary = transformed_data['inventory_value_overview']
            table_name = 'analytics_inventory_value_summary'
            logger.info(f"Loading '{table_name}' with {len(df_summary)} records.")
            # Drop/create table DDL would go here
            # Example: CREATE TABLE analytics_inventory_value_summary (product_id INT, warehouse_id INT, ...);
            
            # Using pandas to_sql for simplicity for loading
            df_summary.to_sql(table_name, db_conn, if_exists='replace', index=False)
            logger.info(f"Successfully loaded {table_name}.")

        # Example: abc_analysis_summary
        if 'abc_analysis' in transformed_data:
            df_abc = transformed_data['abc_analysis']
            table_name = 'analytics_abc_analysis'
            logger.info(f"Loading '{table_name}' with {len(df_abc)} records.")
            df_abc.to_sql(table_name, db_conn, if_exists='replace', index=False)
            logger.info(f"Successfully loaded {table_name}.")

        # Add other summary tables as needed
        
        db_conn.commit()
        logger.info("Summary tables/materialized views creation complete.")
    except Exception as e:
        db_conn.rollback()
        logger.error(f"Error creating summary tables: {e}")
    finally:
        cursor.close()

def export_to_analytics_ready_format(transformed_data, export_dir):
    """
    Exports transformed data to Parquet/CSV files.
    """
    logger.info(f"Exporting analytics-ready data to {export_dir}...")
    if not os.path.exists(export_dir):
        os.makedirs(export_dir)

    for metric_name, df_or_value in transformed_data.items():
        if isinstance(df_or_value, pd.DataFrame) and not df_or_value.empty:
            file_path_csv = os.path.join(export_dir, f"{metric_name}.csv")
            # file_path_parquet = os.path.join(export_dir, f"{metric_name}.parquet")
            
            df_or_value.to_csv(file_path_csv, index=False)
            # df_or_value.to_parquet(file_path_parquet, index=False) # Uncomment if you want parquet
            logger.info(f"Exported {metric_name} to {file_path_csv}.")
        elif not isinstance(df_or_value, pd.DataFrame):
            # Handle single values
            logger.info(f"Skipping export for scalar metric '{metric_name}': {df_or_value}")
    logger.info("Data export complete.")

def generate_automated_reports(transformed_data, output_dir):
    """
    Generates automated reports in HTML (or PDF if wkhtmltopdf is installed).
    """
    logger.info(f"Generating automated reports to {output_dir}...")
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    template_loader = jinja2.FileSystemLoader(searchpath="./templates") # Assume templates folder exists
    template_env = jinja2.Environment(loader=template_loader)
    
    # Load HTML template
    try:
        template = template_env.get_template("report_template.html")
    except jinja2.exceptions.TemplateNotFound:
        logger.error("report_template.html not found in ./templates directory. Skipping HTML report generation.")
        return

    # Prepare data for template
    report_data = {
        'report_date': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        'total_inventory_value': transformed_data.get('total_inventory_value', 0),
        'estimated_annual_holding_cost': transformed_data.get('estimated_annual_holding_cost', 0),
        'abc_analysis': transformed_data.get('abc_analysis', pd.DataFrame()).to_html(index=False),
        'stock_turnover_ratio_product': transformed_data.get('stock_turnover_ratio_product', pd.DataFrame()).head(10).to_html(index=False),
        'dead_stock_identification': transformed_data.get('dead_stock_identification', pd.DataFrame()).to_html(index=False),
        'peak_daily_movement': transformed_data.get('peak_daily_movement', pd.DataFrame()).head(10).to_html(index=False),
        # Add more dataframes as needed
    }

    # Render template
    html_report = template.render(report_data)
    
    report_filename = os.path.join(output_dir, f"inventory_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.html")
    with open(report_filename, "w") as f:
        f.write(html_report)
    logger.info(f"HTML report generated at {report_filename}")

    # Optional: Generate PDF (requires wkhtmltopdf and pdfkit library)
    # try:
    #     pdf_filename = os.path.join(output_dir, f"inventory_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pdf")
    #     pdfkit.from_file(report_filename, pdf_filename)
    #     logger.info(f"PDF report generated at {pdf_filename}")
    # except Exception as e:
    #     logger.warning(f"Could not generate PDF report (wkhtmltopdf not found or other error): {e}")

    logger.info("Report generation complete.")


def run_loading(transformed_data, config):
    """Orchestrates all loading processes."""
    db_conn = None
    try:
        db_conn = get_db_connection(config_file='config.ini')
        
        # Load summary tables (conceptual)
        create_summary_tables(transformed_data, db_conn)

        # Export to analytics-ready format
        export_to_analytics_ready_format(transformed_data, config['ETL']['EXPORT_CSV_DIR'])

        # Generate automated reports
        generate_automated_reports(transformed_data, config['ETL']['OUTPUT_DIR'])

    except Exception as e:
        logger.error(f"Error during data loading: {e}")
    finally:
        if db_conn:
            db_conn.close()

ImportError: cannot import name 'logger' from 'utils' (C:\Users\62818\AppData\Local\Programs\Python\Python313\Lib\site-packages\utils\__init__.py)

In [9]:
from src.utils import setup_logging, load_config, logger, get_db_connection
from src.extract import run_extraction
from src.transform import run_transformations
from src.load import run_loading
import sys
import os

def main():
    """Main function to run the ETL pipeline."""
    config = load_config()
    setup_logging(config['ETL']['LOG_FILE']) # Setup logging globally
    logger.info("ETL Pipeline Started.")

    try:
        # Create necessary directories
        os.makedirs(config['ETL']['EXPORT_CSV_DIR'], exist_ok=True)
        os.makedirs(config['ETL']['OUTPUT_DIR'], exist_ok=True)
        os.makedirs(os.path.dirname(config['ETL']['LAST_RUN_FILE']), exist_ok=True)


        # 1. Extraction
        extracted_data = run_extraction(config)
        
        # Check if any critical data is missing after extraction
        if not extracted_data.get('stocks', pd.DataFrame()).empty and \
           not extracted_data.get('stock_movements', pd.DataFrame()).empty:
            
            # 2. Transformation
            transformed_data = run_transformations(extracted_data, config)

            # 3. Loading
            run_loading(transformed_data, config)
        else:
            logger.warning("Skipping transformation and loading due to insufficient extracted data.")

    except Exception as e:
        logger.critical(f"ETL Pipeline Failed: {e}", exc_info=True)
        sys.exit(1)
    
    logger.info("ETL Pipeline Completed Successfully.")

if __name__ == "__main__":
    # Ensure 'templates' directory exists for reports
    if not os.path.exists('templates'):
        os.makedirs('templates')
        # Create a dummy template file
        with open('templates/report_template.html', 'w') as f:
            f.write("""
<!DOCTYPE html>
<html>
<head>
    <title>Inventory Analytics Report</title>
    <style>
        body { font-family: Arial, sans-serif; margin: 20px; }
        h1, h2 { color: #333; }
        table { width: 100%; border-collapse: collapse; margin-bottom: 20px; }
        th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
        th { background-color: #f2f2f2; }
    </style>
</head>
<body>
    <h1>Inventory Analytics Report</h1>
    <p>Generated On: {{ report_date }}</p>

    <h2>Overall Financial Metrics</h2>
    <p>Total Inventory Value: <strong>{{ "Rp{:,.2f}".format(total_inventory_value) }}</strong></p>
    <p>Estimated Annual Holding Cost: <strong>{{ "Rp{:,.2f}".format(estimated_annual_holding_cost) }}</strong></p>

    <h2>ABC Analysis (Top Products by Value)</h2>
    {{ abc_analysis | safe }}

    <h2>Stock Turnover Ratio (Top 10 Products)</h2>
    {{ stock_turnover_ratio_product | safe }}

    <h2>Dead Stock Identification</h2>
    {{ dead_stock_identification | safe }}

    <h2>Peak Daily Movement (Top 10 Days)</h2>
    {{ peak_daily_movement | safe }}

    </body>
</html>
            """)
        logger.info("Created a default 'report_template.html' in 'templates' directory.")


    main()

ModuleNotFoundError: No module named 'src'

In [10]:
import unittest
import pandas as pd
from datetime import datetime, timedelta
from src.transform import calculate_inventory_metrics, calculate_movement_analytics, calculate_financial_metrics
from src.utils import load_config

class TestTransformations(unittest.TestCase):

    def setUp(self):
        # Setup dummy dataframes for testing
        self.config = load_config(config_file='config.ini') # Ensure config is loaded for metrics

        self.df_products = pd.DataFrame({
            'product_id': [1, 2, 3, 4],
            'name': ['Product A', 'Product B', 'Product C', 'Product D'],
            'unit_price': [10.0, 20.0, 5.0, 15.0]
        })
        self.df_warehouses = pd.DataFrame({
            'warehouse_id': [1, 2],
            'name': ['WH A', 'WH B']
        })
        self.df_stocks = pd.DataFrame({
            'stock_id': [101, 102, 103, 104],
            'product_id': [1, 2, 3, 4],
            'warehouse_id': [1, 1, 2, 2],
            'quantity': [100, 50, 200, 0],
            'last_updated': [datetime.now(), datetime.now(), datetime.now(), datetime.now()]
        })
        self.df_movements = pd.DataFrame({
            'movement_id': [1, 2, 3, 4, 5, 6, 7],
            'product_id': [1, 1, 2, 2, 3, 3, 4],
            'warehouse_id': [1, 1, 1, 1, 2, 2, 2],
            'movement_type': ['IN', 'OUT', 'IN', 'OUT', 'IN', 'OUT', 'IN'],
            'quantity': [10, 5, 20, 10, 30, 15, 5],
            'movement_date': [
                datetime.now() - timedelta(days=200),
                datetime.now() - timedelta(days=100),
                datetime.now() - timedelta(days=250),
                datetime.now() - timedelta(days=50),
                datetime.now() - timedelta(days=300),
                datetime.now() - timedelta(days=10),
                datetime.now() - timedelta(days=5) # Recent movement for Product D
            ],
            'notes': ['Bought', 'Sold', 'Bought', 'Sold', 'Bought', 'Sold', 'Bought']
        })
        self.df_receipts = pd.DataFrame({
            'receipt_id': [1, 2, 3, 4],
            'product_id': [1, 1, 2, 3],
            'warehouse_id': [1, 1, 1, 2],
            'quantity_received': [50, 60, 70, 80],
            'unit_cost': [9.0, 9.5, 19.0, 4.5],
            'receipt_date': [
                datetime.now() - timedelta(days=200),
                datetime.now() - timedelta(days=150),
                datetime.now() - timedelta(days=250),
                datetime.now() - timedelta(days=300)
            ],
            'remaining_quantity': [50, 60, 70, 80] # Initial state, needs to be updated by OUT movements
        })
        
        self.extracted_data = {
            'products': self.df_products,
            'warehouses': self.df_warehouses,
            'stocks': self.df_stocks,
            'stock_movements': self.df_movements,
            'stock_receipt_costs': self.df_receipts
        }

    def test_calculate_inventory_metrics(self):
        metrics = calculate_inventory_metrics(self.extracted_data, self.config)
        self.assertIn('stock_turnover_ratio_product', metrics)
        self.assertIn('days_of_inventory_on_hand_product', metrics)
        self.assertIn('dead_stock_identification', metrics)
        
        # Test dead stock logic
        dead_stock = metrics['dead_stock_identification']
        # Product A: last movement 100 days ago (not dead)
        # Product B: last movement 50 days ago (not dead)
        # Product C: last movement 10 days ago (not dead)
        # Product D: stock 0, but if it had stock and last movement > 180 days ago, it would be dead.
        # With current data, product D has quantity 0, so it won't appear.
        # Let's adjust Product A's last movement to be very old
        self.df_movements.loc[self.df_movements['product_id'] == 1, 'movement_date'] = datetime.now() - timedelta(days=300)
        self.extracted_data['stock_movements'] = self.df_movements # Update data
        metrics_recalculated = calculate_inventory_metrics(self.extracted_data, self.config)
        dead_stock_recalculated = metrics_recalculated['dead_stock_identification']
        self.assertTrue(1 in dead_stock_recalculated['product_id'].values) # Product 1 should now be dead stock
        self.assertFalse(2 in dead_stock_recalculated['product_id'].values)


    def test_calculate_movement_analytics(self):
        metrics = calculate_movement_analytics(self.extracted_data)
        self.assertIn('avg_daily_movement_product', metrics)
        self.assertIn('peak_hourly_movement', metrics)
        self.assertIn('daily_movement_trend', metrics)

    def test_calculate_financial_metrics(self):
        metrics = calculate_financial_metrics(self.extracted_data)
        self.assertIn('inventory_value_overview', metrics)
        self.assertIn('total_inventory_value', metrics)
        self.assertIn('estimated_annual_holding_cost', metrics)
        self.assertIn('abc_analysis', metrics)

        # Basic check for ABC analysis (Product B (20*50=1000) should be A, Product A (10*100=1000) A, Product C (5*200=1000) A, Product D (0*15=0) C)
        # This needs more realistic values to get proper ABC distribution.
        # Assuming our example products all have relatively high values, they might all be 'A' or 'B' depending on thresholds.
        # Let's check if the ABC analysis dataframe is created.
        self.assertIsInstance(metrics['abc_analysis'], pd.DataFrame)
        self.assertIn('abc_class', metrics['abc_analysis'].columns)

if __name__ == '__main__':
    unittest.main()

ModuleNotFoundError: No module named 'src'