In [2]:
import psycopg2
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

In [11]:
DB_CONFIG = {
    'host': 'localhost',
    'port': 5432,
    'dbname': 'dev',
    'user': 'adm',
    'password': 'adm'
}

def get_connection():
    """
    Create and return a PostgreSQL connection
    """
    try:
        conn = psycopg2.connect(**DB_CONFIG)
        conn.autocommit = True
        return conn
    except Exception as e:
        print(f"❌ Connection error: {e}")
        return None
    
def execute_query(query, fetch=False):
    """
    Execute a query and optionally fetch results
    """
    conn = get_connection()
    if not conn:
        return None
    
    try:
        cursor = conn.cursor()
        cursor.execute(query)
        
        if fetch:
            results = cursor.fetchall()
            columns = [desc[0] for desc in cursor.description]
            return pd.DataFrame(results, columns=columns)
        else:
            print("✅ Query executed successfully")
            return True
    except Exception as e:
        print(f"❌ Query error: {e}")
        return None
    finally:
        conn.close()

conn = get_connection()
if conn:
    print("✅ Database connection successful")
    conn.close()
else:
    print("❌ Database connection failed")

✅ Database connection successful


In [None]:
print("\n🎯 QUESTION 1A")
print("=" * 15)

print("""
EXPLANATION OF THE RESOLUTION LOGIC:

1) TIMESTAMP NORMALIZATION:
    Convert all timestamps to 15-minute intervals, where each interval starts at :00, :15, :30, or :45 of each hour.

2) PERIOD SPLITTING:
    When a state event crosses the boundary of a 15-minute interval, it must be "split" into multiple records, one for each interval touched.

3) TIME CALCULATION PER INTERVAL:
    For each fragment, calculate how many seconds the agent remained in that state within that specific interval.

4) AGGREGATION:
    Sum all times by (agent_id, interval, state, domain_id).

5) PERSISTENCE:
    Insert or update the records in the agent_state_interval table with creation and update timestamps.
""")


In [4]:
interval_function_sql = """
-- Function to calculate 15-minute interval start time
CREATE OR REPLACE FUNCTION get_15min_interval(input_timestamp TIMESTAMP WITHOUT TIME ZONE)
RETURNS TIMESTAMP WITHOUT TIME ZONE AS $$
BEGIN
    RETURN DATE_TRUNC('hour', input_timestamp) + 
           INTERVAL '15 minutes' * FLOOR(EXTRACT(MINUTE FROM input_timestamp) / 15);
END;
$$ LANGUAGE plpgsql IMMUTABLE;
"""

result = execute_query(interval_function_sql)
if result:
    print("✅ Function get_15min_interval created successfully")

✅ Query executed successfully
✅ Function get_15min_interval created successfully


In [16]:
explantion01_procedure = """
    
    -- This SQL snippet uses a recursive query to split each agent event into 15-minute intervals. 
    -- First, it takes the event and calculates how long it lasted in the first 15-minute interval, 
    -- considering whether the event ended within that interval or is still ongoing. 
    -- Then, if the event lasts more than 15 minutes, it keeps creating new 15-minute intervals until the event ends, 
    -- always calculating how long the agent stayed in each interval.
    
    -- DROP TABLE IF EXISTS event_intervals_aux CASCADE;
    -- CREATE TABLE event_intervals_aux AS
        SELECT 
            agent_id,
            domain_id,
            event_id,
            state,
            state_start_datetime,
            state_end_datetime,
            get_15min_interval(state_start_datetime) as interval_start,
            get_15min_interval(state_start_datetime) + INTERVAL '15 minutes' as interval_end,

            -- This SQL CASE expression calculates the number of seconds an event occupies within its first 15-minute interval.
            CASE 

                -- Ongoing Event: If state_end_datetime is NULL, the event hasn't ended.
                WHEN state_end_datetime IS NULL THEN 
                    EXTRACT(EPOCH FROM (
                        get_15min_interval(state_start_datetime) + INTERVAL '15 minutes' - state_start_datetime
                    ))

                -- Event ends within the first interval: If the event ends before or exactly at the end of the 15-minute interval
                WHEN state_end_datetime <= get_15min_interval(state_start_datetime) + INTERVAL '15 minutes' THEN
                    EXTRACT(EPOCH FROM (state_end_datetime - state_start_datetime))

                ELSE 
                    -- Event spans multiple intervals:: If the event continues past the interval's end
                    EXTRACT(EPOCH FROM (
                        get_15min_interval(state_start_datetime) + INTERVAL '15 minutes' - state_start_datetime
                    ))
            END as time_in_interval_seconds
            
        FROM agent_event
        WHERE state_end_datetime IS NOT NULL;
"""

rollup_result_df = execute_query(explantion01_procedure, fetch=True)
print("📊 Resultado da execução do rollup:")
display(rollup_result_df)

# statements = [stmt.strip() for stmt in explantion01_procedure.split(';') if stmt.strip()]
# conn = get_connection()
# if conn:
#     try:
#         cursor = conn.cursor()
#         cursor.execute(explantion01_procedure)
#     except Exception as e:
#         print(f"❌ Error in statement {i+1}: {e}")
#     conn.close()
#     print("🎉 DDL script execution completed")

📊 Resultado da execução do rollup:


Unnamed: 0,agent_id,domain_id,event_id,state,state_start_datetime,state_end_datetime,interval_start,interval_end,time_in_interval_seconds
0,1001,1,EVT_001,LOGGED_IN,2024-09-03 08:00:00,2024-09-03 08:00:30,2024-09-03 08:00:00,2024-09-03 08:15:00,30.0
1,1001,1,EVT_002,NOT_READY,2024-09-03 08:00:30,2024-09-03 08:05:00,2024-09-03 08:00:00,2024-09-03 08:15:00,270.0
2,1001,1,EVT_003,READY,2024-09-03 08:05:00,2024-09-03 08:12:30,2024-09-03 08:00:00,2024-09-03 08:15:00,450.0
3,1001,1,EVT_004,ON_CALL,2024-09-03 08:12:30,2024-09-03 08:18:45,2024-09-03 08:00:00,2024-09-03 08:15:00,150.0
4,1001,1,EVT_005,ACW,2024-09-03 08:18:45,2024-09-03 08:20:00,2024-09-03 08:15:00,2024-09-03 08:30:00,75.0
5,1001,1,EVT_006,READY,2024-09-03 08:20:00,2024-09-03 08:25:15,2024-09-03 08:15:00,2024-09-03 08:30:00,315.0
6,1001,1,EVT_007,ON_CALL,2024-09-03 08:25:15,2024-09-03 08:35:30,2024-09-03 08:15:00,2024-09-03 08:30:00,285.0
7,1001,1,EVT_008,ON_HOLD,2024-09-03 08:35:30,2024-09-03 08:37:00,2024-09-03 08:30:00,2024-09-03 08:45:00,90.0
8,1001,1,EVT_009,ON_CALL,2024-09-03 08:37:00,2024-09-03 08:42:15,2024-09-03 08:30:00,2024-09-03 08:45:00,315.0
9,1001,1,EVT_010,ACW,2024-09-03 08:42:15,2024-09-03 08:45:00,2024-09-03 08:30:00,2024-09-03 08:45:00,165.0


In [None]:
explantion02_procedure = """

    SELECT 
        ei.agent_id,
        ei.domain_id,
        ei.event_id,
        ei.state,
        ei.state_start_datetime,
        ei.state_end_datetime,
        ei.interval_end as interval_start,
        ei.interval_end + INTERVAL '15 minutes' as interval_end,
        CASE 
            -- If the state ends less than your block of 15 minutes, calculate the exact time else assume full 15 minutes
            -------------------------------------------------------------------------------------------------------------
            WHEN ei.state_end_datetime <= ei.interval_end + INTERVAL '15 minutes' THEN
                EXTRACT(EPOCH FROM (ei.state_end_datetime - ei.interval_end))
            ELSE 
                900 -- 15 minutes in seconds
        END as time_in_interval_seconds
    FROM event_intervals_aux ei
    WHERE ei.interval_end < ei.state_end_datetime;
"""

rollup_result_df = execute_query(explantion02_procedure, fetch=True)
print("📊 Resultado da execução do rollup:")
display(rollup_result_df)

📊 Resultado da execução do rollup:


Unnamed: 0,agent_id,domain_id,event_id,state,state_start_datetime,state_end_datetime,interval_start,interval_end,time_in_interval_seconds
0,1001,1,EVT_004,ON_CALL,2024-09-03 08:12:30,2024-09-03 08:18:45,2024-09-03 08:15:00,2024-09-03 08:30:00,225.0
1,1001,1,EVT_007,ON_CALL,2024-09-03 08:25:15,2024-09-03 08:35:30,2024-09-03 08:30:00,2024-09-03 08:45:00,330.0
2,1001,1,EVT_013,ON_PARK,2024-09-03 09:12:30,2024-09-03 09:15:45,2024-09-03 09:15:00,2024-09-03 09:30:00,45.0
3,1001,1,EVT_014,READY,2024-09-03 09:15:45,2024-09-03 12:00:00,2024-09-03 09:30:00,2024-09-03 09:45:00,900.0
4,1001,1,EVT_015,NOT_READY,2024-09-03 12:00:00,2024-09-03 13:00:00,2024-09-03 12:15:00,2024-09-03 12:30:00,900.0
5,1001,1,EVT_016,READY,2024-09-03 13:00:00,2024-09-03 17:00:00,2024-09-03 13:15:00,2024-09-03 13:30:00,900.0
6,1001,1,EVT_019,READY,2024-09-04 08:00:15,2024-09-04 12:00:00,2024-09-04 08:15:00,2024-09-04 08:30:00,900.0
7,1002,1,EVT_022,ON_CALL,2024-09-03 09:08:30,2024-09-03 09:22:15,2024-09-03 09:15:00,2024-09-03 09:30:00,435.0
8,1002,1,EVT_024,NOT_READY,2024-09-03 09:25:00,2024-09-03 09:35:30,2024-09-03 09:30:00,2024-09-03 09:45:00,330.0
9,1002,1,EVT_025,READY,2024-09-03 09:35:30,2024-09-03 18:00:00,2024-09-03 09:45:00,2024-09-03 10:00:00,900.0


In [80]:
complete_rollup_procedure = """

CREATE OR REPLACE FUNCTION process_agent_events_to_intervals()
RETURNS TABLE(processed_records INTEGER, execution_time_ms INTEGER) AS $$
DECLARE
    start_time TIMESTAMP;
    end_time TIMESTAMP;
    record_count INTEGER;
BEGIN
    start_time := clock_timestamp();
    
    -- Clear existing data 
    DELETE FROM agent_state_interval;
    
    -- This SQL snippet uses a recursive query to split each agent event into 15-minute intervals. 
    -- First, it takes the event and calculates how long it lasted in the first 15-minute interval, 
    -- considering whether the event ended within that interval or is still ongoing. 
    -- Then, if the event lasts more than 15 minutes, it keeps creating new 15-minute intervals until the event ends, 
    -- always calculating how long the agent stayed in each interval.

    WITH RECURSIVE event_intervals AS (
        SELECT 
            agent_id,
            domain_id,
            event_id,
            state,
            state_start_datetime,
            state_end_datetime,
            get_15min_interval(state_start_datetime) as interval_start,
            get_15min_interval(state_start_datetime) + INTERVAL '15 minutes' as interval_end,

            -- This SQL CASE expression calculates the number of seconds an event occupies within its first 15-minute interval.
            CASE 

                -- Ongoing Event: If state_end_datetime is NULL, the event hasn't ended.
                WHEN state_end_datetime IS NULL THEN 
                    EXTRACT(EPOCH FROM (
                        get_15min_interval(state_start_datetime) + INTERVAL '15 minutes' - state_start_datetime
                    ))

                -- Event ends within the first interval: If the event ends before or exactly at the end of the 15-minute interval
                WHEN state_end_datetime <= get_15min_interval(state_start_datetime) + INTERVAL '15 minutes' THEN
                    EXTRACT(EPOCH FROM (state_end_datetime - state_start_datetime))

                ELSE 
                    -- Event spans multiple intervals:: If the event continues past the interval's end
                    EXTRACT(EPOCH FROM (
                        get_15min_interval(state_start_datetime) + INTERVAL '15 minutes' - state_start_datetime
                    ))
            END as time_in_interval_seconds
            
        FROM agent_event
        WHERE state_end_datetime IS NOT NULL
        
        UNION ALL
        
        SELECT 
            ei.agent_id,
            ei.domain_id,
            ei.event_id,
            ei.state,
            ei.state_start_datetime,
            ei.state_end_datetime,
            ei.interval_end as interval_start,
            ei.interval_end + INTERVAL '15 minutes' as interval_end,
            CASE 
                WHEN ei.state_end_datetime <= ei.interval_end + INTERVAL '15 minutes' THEN
                    EXTRACT(EPOCH FROM (ei.state_end_datetime - ei.interval_end))
                ELSE 
                    900 -- 15 minutes in seconds
            END as time_in_interval_seconds
        FROM event_intervals ei
        WHERE ei.interval_end < ei.state_end_datetime
    ),


    aggregated_intervals AS (
        SELECT 
            agent_id,
            domain_id,
            interval_start as interval,
            state,
            SUM(time_in_interval_seconds)::BIGINT as agent_state_time
        FROM event_intervals
        WHERE time_in_interval_seconds > 0
        GROUP BY agent_id, domain_id, interval_start, state
    )

    INSERT INTO agent_state_interval (
        agent_id, 
        domain_id, 
        interval, 
        state, 
        agent_state_time, 
        db_created_datetime, 
        db_updated_datetime
    )
    SELECT 
        agent_id,
        domain_id,
        interval,
        state,
        agent_state_time,
        NOW(),
        NOW()
    FROM aggregated_intervals
    ON CONFLICT (agent_id, domain_id, interval, state)
    DO UPDATE SET
        agent_state_time = EXCLUDED.agent_state_time,
        db_updated_datetime = NOW();
    
    GET DIAGNOSTICS record_count = ROW_COUNT;
    end_time := clock_timestamp();
    
    RETURN QUERY SELECT record_count, EXTRACT(MILLISECONDS FROM (end_time - start_time))::INTEGER;
END;
$$ LANGUAGE plpgsql;
"""

result = execute_query(complete_rollup_procedure)
if result:
    print("✅ Rollup procedure created successfully")

✅ Query executed successfully
✅ Rollup procedure created successfully


In [85]:
execute_rollup_query = "SELECT * FROM process_agent_events_to_intervals();"
rollup_result_df = execute_query(execute_rollup_query, fetch=True)
print("📊 Resultado da execução do rollup:")
print(rollup_result_df)

📊 Resultado da execução do rollup:
   processed_records  execution_time_ms
0                140                 11


In [None]:
validation_query01 = """
    -- WE HAVE FOR EACH AGENT_ID, ALL CONDITIONS BY SECONDS AND INTERVAL
    SELECT 
        *
    FROM agent_state_interval
    where agent_id = '1001' -- Example agent_id filter
    order by interval;
"""

validation_df = execute_query(validation_query01, fetch=True)
print("🔍 Validação - Comparação de totais:")
# Exibe todos os dados sem truncar colunas ou linhas
with pd.option_context('display.max_rows', None, 'display.max_columns', None, 'display.width', None):
    display(validation_df)

🔍 Validação - Comparação de totais:


Unnamed: 0,agent_id,domain_id,interval,state,agent_state_time,db_updated_datetime,db_created_datetime
0,1001,1,2024-09-03 08:00:00,LOGGED_IN,30,2025-09-05 00:17:07.347393,2025-09-05 00:17:07.347393
1,1001,1,2024-09-03 08:00:00,ON_CALL,150,2025-09-05 00:17:07.347393,2025-09-05 00:17:07.347393
2,1001,1,2024-09-03 08:00:00,NOT_READY,270,2025-09-05 00:17:07.347393,2025-09-05 00:17:07.347393
3,1001,1,2024-09-03 08:00:00,READY,450,2025-09-05 00:17:07.347393,2025-09-05 00:17:07.347393
4,1001,1,2024-09-03 08:15:00,ON_CALL,510,2025-09-05 00:17:07.347393,2025-09-05 00:17:07.347393
5,1001,1,2024-09-03 08:15:00,ACW,75,2025-09-05 00:17:07.347393,2025-09-05 00:17:07.347393
6,1001,1,2024-09-03 08:15:00,READY,315,2025-09-05 00:17:07.347393,2025-09-05 00:17:07.347393
7,1001,1,2024-09-03 08:30:00,ON_CALL,645,2025-09-05 00:17:07.347393,2025-09-05 00:17:07.347393
8,1001,1,2024-09-03 08:30:00,ON_HOLD,90,2025-09-05 00:17:07.347393,2025-09-05 00:17:07.347393
9,1001,1,2024-09-03 08:30:00,ACW,165,2025-09-05 00:17:07.347393,2025-09-05 00:17:07.347393


In [None]:
validation_query = """
-- This SQL query is designed to validate the accuracy of the interval rollup process 
-- by comparing the total time spent in each state for each agent, as recorded in the 
-- original event data versus the aggregated interval data.

WITH original_totals AS (

    -- calculates the total number of seconds each agent spent in each state by summing 
    -- the duration of all events (using EXTRACT(EPOCH FROM ...)) from the agent_event table. 
    -- Only events with a non-null end time are included, and the results are grouped by agent_id 
    -- and state.

    SELECT 
        agent_id,
        state,
        SUM(EXTRACT(EPOCH FROM (state_end_datetime - state_start_datetime))) as original_seconds
    FROM agent_event 
    WHERE state_end_datetime IS NOT NULL
    GROUP BY agent_id, state
),

-- computes the total time per agent and state from the agent_state_interval table, 
-- which contains the results of splitting events into 15-minute intervals and summing 
-- the time spent in each interval.

rollup_totals AS (
    SELECT 
        agent_id,
        state,
        SUM(agent_state_time) as rollup_seconds
    FROM agent_state_interval
    GROUP BY agent_id, state
)
SELECT 
    COALESCE(o.agent_id, r.agent_id) as agent_id,
    COALESCE(o.state, r.state) as state,
    COALESCE(o.original_seconds, 0) as original_seconds,
    COALESCE(r.rollup_seconds, 0) as rollup_seconds,
    ABS(COALESCE(o.original_seconds, 0) - COALESCE(r.rollup_seconds, 0)) as difference_seconds
FROM original_totals o
FULL OUTER JOIN rollup_totals r ON o.agent_id = r.agent_id AND o.state = r.state
ORDER BY agent_id, state;
"""

In [86]:
validation_df = execute_query(validation_query, fetch=True)
print("🔍 Validação - Comparação de totais:")
display(validation_df)

🔍 Validação - Comparação de totais:


Unnamed: 0,agent_id,state,original_seconds,rollup_seconds,difference_seconds
0,1001,ACW,240.0,240,0.0
1,1001,LOGGED_IN,45.0,45,0.0
2,1001,NOT_READY,3870.0,3870,0.0
3,1001,ON_CALL,2055.0,2055,0.0
4,1001,ON_HOLD,90.0,90,0.0
5,1001,ON_PARK,195.0,195,0.0
6,1001,READY,40305.0,40305,0.0
7,1002,ACW,165.0,165,0.0
8,1002,LOGGED_IN,10.0,10,0.0
9,1002,NOT_READY,630.0,630,0.0


In [None]:
best_practices_info = """
1. RECOMMENDED INDEXES:
    - CREATE INDEX idx_agent_state_interval_interval_range ON agent_state_interval(interval);
    - CREATE INDEX idx_agent_state_interval_agent_interval ON agent_state_interval(agent_id, interval);

2. SARGABLE FILTERS:
    - Always use >= and < for date ranges (not BETWEEN)
    - Avoid functions on indexed columns (interval)
    - Use conditions that allow index range scan

3. DAYLIGHT SAVING TIME HANDLING:
    - PostgreSQL automatically handles DST with 'AT TIME ZONE'
    - Test DST transitions (March/October in Europe)
    - Consider that some days may have 23 or 25 hours

4. CACHE AND PERFORMANCE:
    - Cache UTC boundaries for the current day
    - Use prepared statements for frequent queries
    - Consider materializing data per timezone for very frequent queries

5. TIMEZONE VALIDATION:
    - Validate if the provided timezone is valid
    - Have a fallback to UTC in case of invalid timezone
    - Document supported timezones

6. DESIGN IMPROVEMENTS:
    - Implement partitioning by date on the agent_state_interval table
    - Use UPSERT (ON CONFLICT) to handle late arriving data
    - Add surrogate keys for better performance
    - Implement versioning and auditing
    - Configure more aggressive autovacuum for high-churn tables
"""