In [1]:
#!/usr/bin/env python
# coding: utf-8
import sys
import platform
import logging

sys.path.append('..')
from common import db_operations

import pandas as pd
import numpy as np
from datetime import datetime, date, timedelta
from common.db_operations import connect_to_trino, fetch_data_for_day, write_df_to_iceberg

In [2]:
# Configure basic logging for the business logic file
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s - %(levelname)s - %(message)s',
                    datefmt='%Y-%m-%d %H:%M:%S')

# Print the Python version being used
print(f"Using Python version: {platform.python_version()}")

Using Python version: 3.11.13


In [3]:
# ---- report configuration ----
TABLE_NAME = "power_consumption_report"
SOURCE_TABLE = "can_parsed_output_100"
COLUMNS_TO_FETCH = [
    '"id"',
    'at_timezone("timestamp", \'Asia/Kolkata\') AS IST',
    '"BAT_SOC"',
    '"Bat_Voltage"',
    '"Total_Battery_Current"',
    '"GUN_Connection_Status"',
    '"OdoMeterReading"',
    '"Gear_Position"',
    '"Vehiclereadycondition"',
    '"Chargingcontactor1positive"',
    '"Chargingcontactor1negative"',
    '"Chargingcontactor2positive"',
    '"Chargingcontactor2negative"'
]

In [4]:
# --------------------
# Business Logic Functions
# --------------------
def process_soc_charging_data(df: pd.DataFrame) -> pd.DataFrame:
    """
    Generates a summary and performs outlier analysis for charging events.
    """
    logging.info("‚öôÔ∏è STEP 3a: Starting data processing...")
    df = df.copy()
    
    if df.empty or 'id' not in df.columns:
        logging.warning("Input DataFrame is empty or does not contain an 'id' column. Skipping processing.")
        return pd.DataFrame()
        
    device_ids = df['id'].unique().tolist()
    all_summary_data = []

    for device_id in device_ids:
        logging.info(f"‚öôÔ∏è STEP 3b: Processing charging data for vehicle ID: {device_id}")
        device_df = df[df['id'] == device_id].copy()

        if device_df.empty:
            logging.warning(f"No charging events were detected for device {device_id}. Skipping.")
            continue
        
        if 'timestamp' in device_df.columns:
            device_df.loc[:, 'ist_timestamp'] = pd.to_datetime(device_df['timestamp'], unit='ms').dt.tz_localize('UTC').dt.tz_convert('Asia/Kolkata')
        elif 'IST' in device_df.columns:
            device_df.loc[:, 'ist_timestamp'] = pd.to_datetime(device_df['IST'])
        else:
            logging.error(f"Neither 'timestamp' nor 'IST' column found for device {device_id}. Cannot proceed.")
            continue
            
        device_df.sort_values(by='ist_timestamp', inplace=True)
        device_df.reset_index(drop=True, inplace=True)

        current_threshold = 3200
        device_df = device_df[(device_df['Total_Battery_Current'].abs() != current_threshold)].copy()
        
        for col in ['BAT_SOC', 'Bat_Voltage', 'Chargingcontactor1positive',
                    'Chargingcontactor1negative', 'Chargingcontactor2positive',
                    'Chargingcontactor2negative', 'GUN_Connection_Status']:
            device_df.loc[:, col] = device_df[col].fillna(0)
        
        for col in ['BAT_SOC', 'Bat_Voltage']:
            device_df.loc[:, col] = device_df[col].replace(0.0, np.nan).ffill().bfill()
        
        device_df.loc[:, 'Total_Battery_Current'] = device_df['Total_Battery_Current'].fillna(0)

        charging_start_indices = device_df[device_df['GUN_Connection_Status'].diff() == 1].index.tolist()
        charging_end_indices = device_df[device_df['GUN_Connection_Status'].diff() == -1].index.tolist()
        
        if not charging_start_indices and not device_df.empty and device_df.iloc[0]['GUN_Connection_Status'] == 1:
            charging_start_indices.insert(0, device_df.index[0])
        
        if len(charging_end_indices) < len(charging_start_indices):
            if len(charging_start_indices) > 0:
                charging_end_indices.append(device_df.index[-1])
            
        merged_events = []
        if len(charging_start_indices) > 0 and len(charging_end_indices) > 0:
            num_sessions = min(len(charging_start_indices), len(charging_end_indices))

            for i in range(num_sessions):
                start_index = charging_start_indices[i]
                end_index = charging_end_indices[i]
                
                if i > 0:
                    prev_end_time = device_df.loc[charging_end_indices[i-1], 'ist_timestamp']
                    current_start_time = device_df.loc[start_index, 'ist_timestamp']
                    prev_end_soc = device_df.loc[charging_end_indices[i-1], 'BAT_SOC']
                    current_start_soc = device_df.loc[start_index, 'BAT_SOC']

                    time_diff = (current_start_time - prev_end_time).total_seconds()
                    soc_diff = abs(current_start_soc - prev_end_soc)
                    
                    if (time_diff <= 5 * 60 and soc_diff <= 1.0) or (time_diff <= 60):
                        merged_events[-1] = (merged_events[-1][0], end_index)
                        continue
                
                merged_events.append((start_index, end_index))
        else:
            logging.warning(f"No charging events were detected for device {device_id}.")
            continue

        summary_data_device = []
        BATTERY_CAPACITY_KWH = 423
        
        for start_index, end_index in merged_events:
            event_df = device_df.loc[start_index:end_index].copy()

            if event_df.empty:
                logging.warning(f"Warning: Empty event data found for device {device_id}. Skipping.")
                continue            
                
            charging_periods = event_df[event_df['GUN_Connection_Status'] == 1].copy()
            
            total_duration = 0
            if not charging_periods.empty:
                charging_periods.loc[:, 'time_diff'] = charging_periods['ist_timestamp'].diff().dt.total_seconds().fillna(0)
                total_duration = int(charging_periods['time_diff'].sum())

            start_row = event_df.iloc[0].copy()
            end_row = event_df.iloc[-1].copy()
            
            energy_Wh = 0
            if not charging_periods.empty:
                charging_periods.loc[:, 'power_W'] = charging_periods['Bat_Voltage'] * charging_periods['Total_Battery_Current'].abs()
                energy_Wh = np.trapezoid(charging_periods['power_W'], x=charging_periods['ist_timestamp'].astype(np.int64) / 10**9) / 3600
            
            total_kwh_consumed_tpc = energy_Wh / 1000

            total_kwh_consumed_soc = (end_row['BAT_SOC'] - start_row['BAT_SOC']) * BATTERY_CAPACITY_KWH / 100
            total_kwh_consumed_soc = abs(total_kwh_consumed_soc)

            percent_diff = 0
            if total_kwh_consumed_tpc + total_kwh_consumed_soc != 0:
                percent_diff = (abs(total_kwh_consumed_tpc - total_kwh_consumed_soc) / 
                                ((total_kwh_consumed_tpc + total_kwh_consumed_soc) / 2)) * 100
            
            summary_data_device.append({
                'vehicle_id': device_id,
                'start_time': start_row['ist_timestamp'],
                'end_time': end_row['ist_timestamp'],
                'charge_dur': round(total_duration,2),
                'soc_start': start_row['BAT_SOC'],
                'soc_end': end_row['BAT_SOC'],
                'tpc_kwh': round(total_kwh_consumed_tpc,2),
                'soc_kwh': round(total_kwh_consumed_soc,2),
                'diff_kwh_percent': round(percent_diff,2)
            })
        
        all_summary_data.extend(summary_data_device)
            
    logging.info("‚úÖ STEP 3c: All vehicle data processed.")
    return pd.DataFrame(all_summary_data)

In [5]:
# --------------------
# Main execution logic
# --------------------
def main(start_date_str: str = None, end_date_str: str = None):
    conn = connect_to_trino()
    # df_duplicate_processed = pd.DataFrame()
    # df_duplicate_raw = pd.DataFrame()
    vehicle_ids_for_report = []    
    if conn:
        try:
            # Determine the date range to process
            if start_date_str and end_date_str:
                start_date = date.fromisoformat(start_date_str)
                end_date = date.fromisoformat(end_date_str)
                date_range = [start_date + timedelta(days=i) for i in range((end_date - start_date).days + 1)]
            else:
                # Default to processing yesterday's data
                date_range = [date.today() - timedelta(days=1)]


            for single_date in date_range:
                date_str = single_date.isoformat()
                logging.info(f"‚ñ∂Ô∏è Starting daily report job for {date_str}")

                # Example 2: Call the function with specific vehicle IDs
                logging.info("\n--- Processing specific vehicle IDs ---")
                # vehicle_ids_for_report = ['3', '16', '18', '19']
                df_raw_specific = fetch_data_for_day(conn, date_str, COLUMNS_TO_FETCH, SOURCE_TABLE, vehicle_ids_for_report)
                # df_duplicate_raw = df_raw_specific.copy()

                if not df_raw_specific.empty:
                    df_processed_specific = process_soc_charging_data(df_raw_specific)
                    # df_duplicate_processed = df_processed_specific.copy()
                    if not df_processed_specific.empty:
                        write_df_to_iceberg(df_processed_specific, TABLE_NAME)
                        logging.info("‚úÖ Processing and write for specific IDs complete.")
                    else:
                        logging.info("Processed DataFrame is empty. No data to write.")
                else:
                    logging.info("Raw DataFrame is empty. No processing needed.")

        except Exception as e:
            logging.critical(f"‚ùå A critical error occurred in the main script: {e}")

        finally:
            logging.info("üîí STEP 5: Closing Trino connection...")
            conn.close()
            logging.info("‚úÖ STEP 5: Connection closed.")
    else:
        logging.critical("‚ùå Failed to establish a database connection. Exiting.")

    # return df_duplicate_raw, df_duplicate_processed

In [6]:

if __name__ == "__main__":
    # global_df_raw, global_df_processed = main()
    # --- For a one-time manual backfill, uncomment the line below and set your dates ---
    main(start_date_str='2025-07-24', end_date_str='2025-09-15')

    # --- For daily automated runs, use the existing call ---
    # main()

2025-09-15 21:09:06 - INFO - üîå STEP 1: Connecting to Trino...
2025-09-15 21:09:06 - INFO - ‚úÖ STEP 1: Connected to Trino
2025-09-15 21:09:06 - INFO - ‚ñ∂Ô∏è Starting daily report job for 2025-09-14
2025-09-15 21:09:06 - INFO - 
--- Processing specific vehicle IDs ---
2025-09-15 21:09:06 - INFO - üì• STEP 2a: Validating and fetching data for 2025-09-14...
2025-09-15 21:09:06 - INFO - ‚öôÔ∏è Executing query...
2025-09-15 21:09:27 - INFO - ‚úÖ Query executed successfully!
2025-09-15 21:09:27 - INFO - ‚úÖ STEP 2d: Data fetching for 2025-09-14 completed, Rows fetched: 361976
2025-09-15 21:09:27 - INFO - ‚öôÔ∏è STEP 3a: Starting data processing...
2025-09-15 21:09:27 - INFO - ‚öôÔ∏è STEP 3b: Processing charging data for vehicle ID: 11
2025-09-15 21:09:27 - INFO - ‚öôÔ∏è STEP 3b: Processing charging data for vehicle ID: 13
2025-09-15 21:09:27 - INFO - ‚öôÔ∏è STEP 3b: Processing charging data for vehicle ID: 14
2025-09-15 21:09:27 - INFO - ‚öôÔ∏è STEP 3b: Processing charging data for vehi

In [7]:
# df_combined.groupby(['date','id']).size().reset_index(name='count_of_instances')
# global_df_raw.loc[:, 'date'] = global_df_raw['IST'].dt.date
# global_df_raw[global_df_raw['id'].isin(['3','16'])].groupby(['id', 'date']).size().reset_index(name='count_of_instances')

In [8]:
# global_df_processed