#### Import libraries & Ensure tables are created

In [None]:
import pandas as pd
from sqlalchemy import create_engine, text
from confluent_kafka import Consumer, Producer
import json
import random
from copy import deepcopy
from datetime import datetime, timedelta
import time
import psycopg2
from pyspark.sql import Row
from collections import Counter


# Database connection details - muhid
# DB_NAME = "redset"
# DB_USER = "postgres"
# DB_PASS = "admin123"
# DB_HOST = "localhost"
# DB_PORT = "5432"

# Database connection details - goutham
DB_NAME = "de_project_main"
DB_USER = "postgres"
DB_PASS = "postgres16"
# DB_HOST = "192.168.66.138"  # muhid hotspot
# DB_HOST = "192.168.127.138"  # sagnik hotspot
DB_HOST = "192.168.7.138"  # sagnik hotspot
DB_PORT = "5432"

# Expected Schema (Pandas-compatible)
expected_schema = {
    "user_id": "int",
    "query_id": "int",  # Added query_id to expected schema
    "arrival_timestamp": "datetime64[ns]",
    "compile_duration_ms": "float",
    "execution_duration_ms": "int",
    "was_cached": "int",
    "query_type": "str",
    "read_table_ids": "str",
    "write_table_ids": "str",
    "num_joins": "int",
    "num_scans": "int"
}

# Valid Query Types
valid_query_types = {"select", "insert", "delete", "other", "analyze", "unload", "update", "copy", "ctas", "vacuum"}

# Invalid Table IDs
invalid_table_ids = {"00", "000", "0000"}

# Set Maximum Values for Optimization Alert
# MAX_MBYTES_SCANNED = 100000

# Create PostgreSQL Connection
engine = create_engine(f'postgresql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}')

# Function to ensure `redset_main` table exists
def ensure_master_table():
    create_table_query = """
    -- DROP TABLE IF EXISTS public."redset_main" CASCADE;
    CREATE TABLE IF NOT EXISTS "redset_main" (
        user_id INT,
        query_id INT,
        arrival_timestamp TIMESTAMP,
        compile_duration_ms FLOAT,
        execution_duration_ms FLOAT,
        was_cached INT,
        query_type TEXT,
        read_table_ids TEXT,
        write_table_ids TEXT,
        num_joins INT,
        num_scans INT
    ) PARTITION BY RANGE (arrival_timestamp);
    """
    with engine.connect() as conn:
        conn.execute(text(create_table_query))
        conn.commit()
    print("✅ Ensured 'redset_main' table exists.")

# ✅ Function to ensure `anomalies` table exists
def ensure_anomalies_table():
    create_table_query = """
    -- DROP TABLE IF EXISTS public."anomalies" CASCADE;
    CREATE TABLE IF NOT EXISTS anomalies (
        arrival_timestamp TIMESTAMP NOT NULL,
        query_id INT NOT NULL,  -- Added query_id column
        anomaly_description TEXT NOT NULL
    );
    """
    with engine.connect() as conn:
        conn.execute(text(create_table_query))
        conn.commit()
    print("✅ Ensured 'anomalies' table exists.")

# ✅ Function to ensure `stream_summary` table exists
def stream_summary_table():
    create_table_query = """
    -- DROP TABLE IF EXISTS public."stream_summary" CASCADE;
    CREATE TABLE IF NOT EXISTS stream_summary (
        table_id INT PRIMARY KEY,
        count BIGINT DEFAULT 1
    );
    """
    with engine.connect() as conn:
        conn.execute(text(create_table_query))
        conn.commit()
    print("✅ Ensured 'stream_summary' table exists.")

# ✅ Function to ensure `global_counts` table exists
def global_counts_table():
    create_table_query = """
    -- DROP TABLE IF EXISTS public."global_counts" CASCADE;
    CREATE TABLE IF NOT EXISTS global_counts (
        key TEXT PRIMARY KEY,
        total_elements_seen BIGINT DEFAULT 0
    );
    -- Initialize total_elements_seen if not exists
    INSERT INTO global_counts (key, total_elements_seen)
    VALUES ('total_tables', 0)
    ON CONFLICT (key) DO NOTHING;
    """
    with engine.connect() as conn:
        conn.execute(text(create_table_query))
        conn.commit()
    print("✅ Ensured 'stream_summary' table exists.")


def top_k_tables_per_day_materialized_view():
    create_view_query = """
    -- View: public.top_k_tables_per_day

    -- DROP MATERIALIZED VIEW IF EXISTS public.top_k_tables_per_day CASCADE;

    CREATE MATERIALIZED VIEW IF NOT EXISTS public.top_k_tables_per_day
    TABLESPACE pg_default
    AS
    WITH table_usage AS (
            SELECT "redset_main".arrival_timestamp,
                date_trunc('day'::text, "redset_main".arrival_timestamp) AS day,
                unnest(string_to_array("redset_main".read_table_ids, ','::text)) AS table_id,
                "redset_main".query_type,
                "redset_main".user_id
            FROM "redset_main"
            UNION ALL
            SELECT "redset_main".arrival_timestamp,
                date_trunc('day'::text, "redset_main".arrival_timestamp) AS day,
                unnest(string_to_array("redset_main".write_table_ids, ','::text)) AS table_id,
                "redset_main".query_type,
                "redset_main".user_id
            FROM "redset_main"
            ), table_count AS (
            SELECT table_usage_1.day,
                table_usage_1.table_id,
                table_usage_1.query_type,
                table_usage_1.user_id,
                count(*) AS count
            FROM table_usage table_usage_1
            GROUP BY table_usage_1.day, table_usage_1.table_id, table_usage_1.query_type, table_usage_1.user_id
            ), total_count AS (
            SELECT table_count.day,
                sum(table_count.count) AS total
            FROM table_count
            GROUP BY table_count.day
            ), overall_total AS (
            SELECT sum(total_count.total) AS overall_total
            FROM total_count
            ), table_percentage AS (
            SELECT table_count.day,
                table_count.table_id,
                table_count.query_type,
                table_count.user_id,
                table_count.count,
                table_count.count::double precision / (( SELECT total_count.total
                    FROM total_count
                    WHERE total_count.day = table_count.day))::double precision * 100::double precision AS percentage,
                table_count.count::double precision / (( SELECT overall_total.overall_total
                    FROM overall_total))::double precision * 100::double precision AS overall_percentage
            FROM table_count
            )
    SELECT day,
        table_id,
        query_type,
        user_id,
        count,
        percentage,
        overall_percentage
    FROM table_percentage
    WITH DATA;

    ALTER TABLE IF EXISTS public.top_k_tables_per_day
        OWNER TO postgres;


    CREATE UNIQUE INDEX idx_top_k_tables_per_day
        ON public.top_k_tables_per_day USING btree
        (day, table_id COLLATE pg_catalog."default", query_type COLLATE pg_catalog."default", user_id)
        TABLESPACE pg_default;
    """
    with engine.connect() as conn:
        conn.execute(text(create_view_query))
        conn.commit()
    print("✅ Ensured 'top_k' materialized view exists.")

def hit_rate_per_day_materialized_view():
    create_view_query = """
    -- Drop the materialized view if it exists
    -- DROP MATERIALIZED VIEW IF EXISTS public.hit_rate_per_day CASCADE;

    -- Create the materialized view
    CREATE MATERIALIZED VIEW IF NOT EXISTS public.hit_rate_per_day
    TABLESPACE pg_default
    AS
    WITH daily_stats AS (
        SELECT 
            date_trunc('day', "redset_main".arrival_timestamp) AS day,
            "redset_main".query_type,
            "redset_main".user_id,
            COUNT(*) FILTER (WHERE "redset_main".was_cached = 1) AS was_cached_count,
            COUNT(*) AS total_count
        FROM 
            "redset_main"
        GROUP BY 
            date_trunc('day', "redset_main".arrival_timestamp),
            "redset_main".query_type,
            "redset_main".user_id
    ),
    daily_totals AS (
        SELECT 
            day,
            SUM(total_count) AS total_queries
        FROM 
            daily_stats
        GROUP BY 
            day
    )
    SELECT 
        ds.day,
        ds.query_type,
        ds.user_id,
        ds.was_cached_count,
        dt.total_queries,
        (ds.was_cached_count::double precision / dt.total_queries::double precision) * 100 AS hit_rate_per_day
    FROM 
        daily_stats ds
    JOIN 
        daily_totals dt
    ON 
        ds.day = dt.day
    WITH DATA;

    -- Create a unique index on the materialized view
    CREATE UNIQUE INDEX idx_hit_rate_per_day ON public.hit_rate_per_day (day, query_type, user_id);
    """
    with engine.connect() as conn:
        conn.execute(text(create_view_query))
        conn.commit()
    print("✅ Ensured 'hit_rate' materialized view exists.")

def top_k_queries_per_day_materialized_view():
    create_view_query = """
    -- Drop the materialized view if it exists
    -- DROP MATERIALIZED VIEW IF EXISTS public.top_k_queries_per_day CASCADE;

    -- Create the materialized view
    CREATE MATERIALIZED VIEW IF NOT EXISTS public.top_k_queries_per_day
    TABLESPACE pg_default
    AS
    WITH query_usage AS (
        SELECT 
            public.redset_main.arrival_timestamp,
            date_trunc('day', public.redset_main.arrival_timestamp) AS day,
            unnest(string_to_array(public.redset_main.query_type, ',')) AS query_type,
            public.redset_main.user_id
        FROM public.redset_main
    ),
    query_count AS (
        SELECT 
            query_usage.day,
            query_usage.query_type,
            query_usage.user_id,
            COUNT(*) AS count
        FROM query_usage
        GROUP BY query_usage.day, query_usage.query_type, query_usage.user_id
    ),
    total_queries_per_day AS (
        SELECT 
            day,
            SUM(count) AS total_queries
        FROM query_count
        GROUP BY day
    ),
    overall_total_queries AS (
        SELECT SUM(total_queries) AS overall_total
        FROM total_queries_per_day
    ),
    query_percentage AS (
        SELECT 
            query_count.day,
            query_count.query_type,
            query_count.user_id,
            query_count.count,
            (query_count.count::double precision / 
                (SELECT total_queries_per_day.total_queries 
                FROM total_queries_per_day 
                WHERE total_queries_per_day.day = query_count.day)) * 100 AS daily_percentage,
            (query_count.count::double precision / 
                (SELECT overall_total_queries.overall_total FROM overall_total_queries)) * 100 AS overall_percentage
        FROM query_count
    )
    SELECT 
        query_percentage.day,
        query_percentage.query_type,
        query_percentage.user_id,
        query_percentage.count,
        query_percentage.daily_percentage,
        query_percentage.overall_percentage
    FROM query_percentage
    ORDER BY query_percentage.day DESC, query_percentage.count DESC
    WITH DATA;

    -- Create a unique index on the materialized view for faster querying
    CREATE UNIQUE INDEX idx_top_k_queries_per_day 
    ON public.top_k_queries_per_day (day, query_type, user_id);

    -- Refresh the materialized view concurrently
    REFRESH MATERIALIZED VIEW CONCURRENTLY public.top_k_queries_per_day;
    SELECT * FROM public.top_k_queries_per_day LIMIT 10;
    """
    with engine.connect() as conn:
        conn.execute(text(create_view_query))
        conn.commit()
    print("✅ Ensured 'top_k_queries_per_day' materialized view exists.")

def compile_time_vs_num_joins_materialized_view():
    # First part: Drop and create the materialized view inside a transaction block
    create_view_query = """
    -- DROP MATERIALIZED VIEW IF EXISTS compile_time_vs_num_joins CASCADE;

    CREATE MATERIALIZED VIEW public.compile_time_vs_num_joins AS
    SELECT 
        num_joins AS x,
        AVG(compile_duration_ms) AS y
    FROM public.redset_main
    WHERE query_type = 'select' 
    AND num_joins IS NOT NULL
    GROUP BY num_joins
    ORDER BY num_joins with DATA;
    """

    with engine.connect() as conn:
        try:
            # Start transaction block
            conn.execute(text('BEGIN'))

            # Execute the materialized view creation
            conn.execute(text(create_view_query))

            # Commit the transaction block
            conn.execute(text('COMMIT'))

            print("✅ Materialized view 'compile_time_vs_num_joins' created successfully.")

        except Exception as e:
            # Rollback in case of error
            conn.execute(text('ROLLBACK'))
            print(f"❌ Error creating materialized view: {e}")
    
    # Second part: Create the index outside the transaction block using autocommit
    create_index_query = """
    CREATE UNIQUE INDEX CONCURRENTLY idx_compile_time_vs_num_joins 
    ON public.compile_time_vs_num_joins (x, y);
    """

    with engine.connect() as conn:
        try:
            # Set autocommit to True to bypass transaction block
            conn.execution_options(isolation_level="AUTOCOMMIT").execute(text(create_index_query))

            print("✅ Unique index 'idx_compile_time_vs_num_joins' created successfully.")

        except Exception as e:
            print(f"❌ Error creating index: {e}")



ensure_master_table()
ensure_anomalies_table()
top_k_tables_per_day_materialized_view()
top_k_queries_per_day_materialized_view()
hit_rate_per_day_materialized_view()
compile_time_vs_num_joins_materialized_view()
# stream_summary_table()
# global_counts_table()

✅ Ensured 'redset_main' table exists.
✅ Ensured 'anomalies' table exists.
✅ Ensured 'top_k' materialized view exists.
✅ Ensured 'top_k_queries_per_day' materialized view exists.
✅ Ensured 'hit_rate' materialized view exists.
✅ Materialized view 'compile_time_vs_num_joins' created successfully.
✅ Unique index 'idx_compile_time_vs_num_joins' created successfully.


#### Function to insert data into anomalies table

In [2]:
def validate_batch(data):
    """
    Validates a batch of query logs stored in a pandas DataFrame.
    Stores anomalies in the PostgreSQL 'anomalies' table (one row per input row).
    """
    if not data:
        print("No data to validate.")
        return
    
    # Convert to DataFrame
    df = pd.DataFrame(data)

    # Convert data to expected schema
    df["arrival_timestamp"] = pd.to_datetime(df["arrival_timestamp"], errors="coerce")
    df["compile_duration_ms"] = pd.to_numeric(df["compile_duration_ms"], errors="coerce")
    df["execution_duration_ms"] = pd.to_numeric(df["execution_duration_ms"], errors="coerce")
    df["was_cached"] = pd.to_numeric(df["was_cached"], errors="coerce")
    df["query_type"] = df["query_type"].astype(str)
    df["read_table_ids"] = df["read_table_ids"].astype(str)
    df["write_table_ids"] = df["write_table_ids"].astype(str)
    df["num_joins"] = pd.to_numeric(df["num_joins"], errors="coerce")

    # Store anomalies per row
    anomalies_per_row = {}

    def add_anomaly(index, message):
        """ Helper function to accumulate anomalies for each row """
        if index not in anomalies_per_row:
            anomalies_per_row[index] = []
        anomalies_per_row[index].append(message)

    # Type Validation (Ignore Trivial Differences)
    for column, expected_dtype in expected_schema.items():
        if column in df.columns:
            for idx, value in df[column].items():
                if pd.isna(value):
                    continue  # Skip NaN values
                
                actual_type = type(value).__name__

                # Allow compatible types
                if expected_dtype == "int" and isinstance(value, int):
                    continue
                if expected_dtype == "float" and isinstance(value, (float, int)):  # Allow int for float fields
                    continue
                if expected_dtype == "bool" and isinstance(value, bool):
                    continue
                if expected_dtype == "datetime64[ns]" and isinstance(value, pd.Timestamp):
                    continue
                if expected_dtype == "str" and isinstance(value, str):
                    continue

                # If mismatch remains, record anomaly
                add_anomaly(idx, f"{column} expected {expected_dtype}, found {actual_type}")

    # Validate `query_type`
    invalid_query_types = df.loc[~df["query_type"].isin(valid_query_types)]
    for idx, row in invalid_query_types.iterrows():
        add_anomaly(idx, f"Invalid query_type `{row['query_type']}`")

    # Check Missing Table IDs
    missing_read_ids = df.loc[(df["query_type"].isin({"select", "copy"})) & df["read_table_ids"].isna()]
    for idx, row in missing_read_ids.iterrows():
        add_anomaly(idx, f"Missing read_table_ids for `{row['query_type']}` query")

    missing_write_ids = df.loc[(df["query_type"].isin({"insert", "delete", "copy"})) & df["write_table_ids"].isna()]
    for idx, row in missing_write_ids.iterrows():
        add_anomaly(idx, f"Missing write_table_ids for `{row['query_type']}` query")

    # Check Invalid Table IDs
    df["read_table_ids"] = df["read_table_ids"].fillna("").astype(str)
    df["write_table_ids"] = df["write_table_ids"].fillna("").astype(str)

    invalid_read_ids = df["read_table_ids"].str.split(",").apply(set).apply(lambda x: x & invalid_table_ids)
    for idx, invalid_vals in invalid_read_ids.items():
        if invalid_vals:
            add_anomaly(idx, f"read_table_ids contains invalid values {invalid_vals}")

    invalid_write_ids = df["write_table_ids"].str.split(",").apply(set).apply(lambda x: x & invalid_table_ids)
    for idx, invalid_vals in invalid_write_ids.items():
        if invalid_vals:
            add_anomaly(idx, f"write_table_ids contains invalid values {invalid_vals}")

    # Execution duration check
    execution_issues = df.loc[df["execution_duration_ms"] < 0]
    for idx, row in execution_issues.iterrows():
        add_anomaly(idx, f"execution_duration_ms is negative ({row['execution_duration_ms']})")

    # Queries with execution_duration_ms == 0 but was_cached != 1
    cache_issues = df.loc[(df["execution_duration_ms"] == 0) & (df["was_cached"] != 1)]
    for idx, row in cache_issues.iterrows():
        add_anomaly(idx, f"Query executed instantly but was not cached")

    # Insert anomalies into PostgreSQL
    if anomalies_per_row:
        anomalies_df = pd.DataFrame({
            "arrival_timestamp": df.loc[list(anomalies_per_row.keys()), "arrival_timestamp"].values,
            "query_id": df.loc[list(anomalies_per_row.keys()), "query_id"].values,  # Added query_id to DataFrame
            "anomaly_description": [" | ".join(issues) for issues in anomalies_per_row.values()]
        })
        anomalies_df.to_sql("anomalies", engine, if_exists="append", index=False)
        print(f"✅ {len(anomalies_df)} anomalies inserted into PostgreSQL.")
    else:
        pass
        # print("✅ No anomalies found.")

#### Function to insert data into redset_main table

In [3]:
# Function to insert a batch of rows into the master table
prev_date = None

def insert_batch_into_master(data):
    """
    Inserts a batch of JSON-formatted rows into the PostgreSQL redset_main table,
    distributing rows into their respective date partitions.

    Parameters:
        data (list of dicts): A list containing N rows as dictionaries.
    """
    global prev_date
    
    if not data:
        print("No data to insert.")
        return

    try:
        
        # Convert data into a Pandas DataFrame
        df = pd.DataFrame(data)

        # Convert 'arrival_timestamp' column to datetime
        df['arrival_timestamp'] = pd.to_datetime(df['arrival_timestamp'])

        # Extract the date from 'arrival_timestamp'
        df['date_partition'] = df['arrival_timestamp'].dt.date

        # # Convert 'was_cached' column to boolean (True/False)
        # if 'was_cached' in df.columns:
        #     df['was_cached'] = df['was_cached'].astype(bool)

        # Handle NaN values (replace NaN with None for PostgreSQL compatibility)
        df = df.where(pd.notna(df), None)

        insert_start_time = time.time()

        # Group rows by date_partition and insert into respective partitions
        for partition_date, partition_df in df.groupby('date_partition'):
            with engine.connect() as conn:
                partition_name = f'redset_{partition_date.strftime("%Y_%m_%d")}'  # Example: Master_2024_03_01
                
                # Ensure the partition exists (optional step, otherwise assume it's pre-created)
                create_partition_query = f"""
                CREATE TABLE IF NOT EXISTS public."{partition_name}"
                PARTITION OF public."redset_main"
                FOR VALUES FROM ('{partition_date}') TO ('{partition_date + pd.Timedelta(days=1)}');
                """

                if partition_date != prev_date:
                    # Delete old partition
                    partition_to_delete = f'redset_{(partition_date - timedelta(days=4)).strftime("%Y_%m_%d")}'
                    delete_partition_query = f"""
                        DROP TABLE IF EXISTS public."{partition_to_delete}";
                    """

                    # Refresh materialized views
                    refresh_queries = [
                        "REFRESH MATERIALIZED VIEW CONCURRENTLY public.top_k_tables_per_day;" ,
                        "REFRESH MATERIALIZED VIEW CONCURRENTLY public.top_k_queries_per_day;",
                        "REFRESH MATERIALIZED VIEW CONCURRENTLY public.hit_rate_per_day;"
                        "REFRESH MATERIALIZED VIEW CONCURRENTLY public.compile_time_vs_num_joins;"
                    ]

                    conn.execute(text(delete_partition_query))
                    for query in refresh_queries:
                        conn.execute(text(query))

                # Run all the queries
                conn.execute(text(create_partition_query))  # Wrap SQL in text()
                
                conn.commit()  # Ensure queries run

                # Insert into the respective partition
                partition_df.drop(columns=['date_partition'], inplace=True)  # Remove helper column
                partition_df.to_sql(partition_name, engine, if_exists='append', index=False)

                prev_date = partition_date

        # print(f"✅ Inserted {len(partition_df)} rows into partition {partition_name}")

        insert_time = time.time() - insert_start_time
        # print(f"✅ Total insert time: {insert_time:.2f} seconds")

    except Exception as e:
        print(f"❌ Error inserting data: {e}")

#### Main consumer loop

In [None]:
# Kafka host and topic to match the producer
kafka_host = "localhost:9092"
kafka_topic = "muhid"  # Topic where messages are produced

# Initialize the consumer with the correct configuration
c = Consumer({
    'bootstrap.servers': kafka_host,
    'group.id': 'redset_stream',  # Static group ID
    'auto.offset.reset': 'earliest',  # Start reading from the earliest available message
    'debug': 'all',  # Enable all Kafka debugging logs
    'fetch.message.max.bytes': 104857600,  # 100MB
    'max.partition.fetch.bytes': 104857600,
    'session.timeout.ms': 60000,
})

c.subscribe([kafka_topic])

# Variables to calculate average
tmp = 20
v = 0
prev_data = 0

msg_printed = 0
file_writer = 0

print("Consumer running...")
while True:
    msg = c.poll(3.0)
    time.sleep(2.0)
    if msg is None:
        print('No message received during poll. Retrying...')
        continue
    if msg.error():
        print(f"Consumer error: {msg.error()}")
        continue

    try:
        # Decode the message
        message = msg.value().decode('utf-8')
        # print(f"Received message: {message}")
        
        # Parse JSON
        message_data = json.loads(message)
        if prev_data == message_data:
            # print(f"Prev data same")
            pass
        else:
            # print(f"Prev data NOT same!!!")
            pass
        
        prev_data = deepcopy(message_data)

        validate_start_time = time.time()
        # Check for anomalies and insert into anomaly table
        validate_batch(message_data)
        validate_time = time.time() - validate_start_time

        # Insert the batch into PostgreSQL
        insert_batch_into_master(message_data)
    except Exception as e:
        print(f"Error processing message: {e}")
        continue

print("Consumer stopped.")
# Clean up
c.close()

Consumer running...
No message received during poll. Retrying...
No message received during poll. Retrying...
No message received during poll. Retrying...
No message received during poll. Retrying...
No message received during poll. Retrying...
No message received during poll. Retrying...
No message received during poll. Retrying...
✅ 1 anomalies inserted into PostgreSQL.
✅ 1 anomalies inserted into PostgreSQL.
✅ 2 anomalies inserted into PostgreSQL.
✅ 1 anomalies inserted into PostgreSQL.
✅ 1 anomalies inserted into PostgreSQL.
✅ 1 anomalies inserted into PostgreSQL.
❌ Error inserting data: (psycopg2.OperationalError) server closed the connection unexpectedly
	This probably means the server terminated abnormally
	before or while processing the request.

[SQL: 
                CREATE TABLE IF NOT EXISTS public."redset_2024_03_01"
                PARTITION OF public."redset_main"
                FOR VALUES FROM ('2024-03-01') TO ('2024-03-02');
                ]
(Background on this erro

------------------------------------------------------