### Install

In [4]:
!pip install psycopg2-binary
!pip install kafka-python

Collecting psycopg2-binary
  Downloading psycopg2_binary-2.9.11-cp312-cp312-win_amd64.whl.metadata (5.1 kB)
Downloading psycopg2_binary-2.9.11-cp312-cp312-win_amd64.whl (2.7 MB)
   ---------------------------------------- 0.0/2.7 MB ? eta -:--:--
   --- ------------------------------------ 0.3/2.7 MB ? eta -:--:--
   ----------- ---------------------------- 0.8/2.7 MB 3.0 MB/s eta 0:00:01
   ------------------- -------------------- 1.3/2.7 MB 2.8 MB/s eta 0:00:01
   ------------------------------ --------- 2.1/2.7 MB 3.2 MB/s eta 0:00:01
   ---------------------------------------- 2.7/2.7 MB 3.3 MB/s eta 0:00:00
Installing collected packages: psycopg2-binary
Successfully installed psycopg2-binary-2.9.11
Collecting kafka-python
  Downloading kafka_python-2.3.0-py2.py3-none-any.whl.metadata (10.0 kB)
Downloading kafka_python-2.3.0-py2.py3-none-any.whl (326 kB)
Installing collected packages: kafka-python
Successfully installed kafka-python-2.3.0


### Import

In [26]:
from kafka import KafkaConsumer
import psycopg2
import json
from datetime import datetime, date
from psycopg2.extras import RealDictCursor

### Connection Settings

In [35]:
# Kafka
bootstrap_server = 'localhost:9092'
topic = 'raw_sensor_data_ingestion'

# Kafka consumer configuration
consumer = KafkaConsumer(
    topic,
    bootstrap_servers=[bootstrap_server],
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    # group_id='my-group',
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

# PostgreSQL connection config
conn = psycopg2.connect(
    dbname='iot_data_analytic',
    user='admin',
    password='admin',
    host='localhost',
    port='5432'
)
cursor = conn.cursor(cursor_factory=RealDictCursor)

### DB vehical_hourly_summary table CRUD operations

In [36]:
def create_vehical_hourly_summary(data):
    """
    data: dict with keys:
      - sensor_id (str)
      - avg_vehical_count_hourly (str)
      - vehical_count_per_hour (numeric / float / int, optional, default 0)
      - record_count (numeric / float / int, optional, default 0)
    Returns: inserted row id or None on error
    """
    try:
        insert_query = """
            INSERT INTO public.vehical_hourly_summary
                (sensor_id,
                 avg_vehical_count_hourly,
                 vehical_count_per_hour,
                 record_count, 
                 hour,
                 sensor_date)
            VALUES
                (%(sensor_id)s,
                 %(avg_vehical_count_hourly)s,
                 %(vehical_count_per_hour)s,
                 %(record_count)s, 
                 %(hour)s,
                 %(sensor_date)s)
            RETURNING id;
        """

        mapping = {
            "sensor_id": data["sensor_id"],
            "avg_vehical_count_hourly": data["avg_vehical_count_hourly"],
            "vehical_count_per_hour": data.get("vehical_count_per_hour", 0),
            "record_count": data.get("record_count", 0),
            "hour": data.get("hour", -1),
            "sensor_date": data.get("sensor_date", date.today())
        }

        cursor.execute(insert_query, mapping)
        new_id = cursor.fetchone()[0]
        conn.commit()
        print(f"Inserted vehical_hourly_summary id={new_id}")
        return new_id
    except Exception as e:
        conn.rollback()
        print(f"Error inserting vehical_hourly_summary: {e}")
        return None
    

def get_vehical_hourly_summary_by_id(record_id):
    """
    Returns a dict for the given id, or None if not found.
    """
    try:
        query = """
            SELECT
                id,
                sensor_id,
                avg_vehical_count_hourly,
                created_date,
                updated_at,
                vehical_count_per_hour,
                record_count,
                hour,
                sensor_date
            FROM public.vehical_hourly_summary
            WHERE id = %s;
        """

        cursor.execute(query, (record_id,))
        row = cursor.fetchone()

        if not row:
            print(f"No vehical_hourly_summary found with id={record_id}")
            return None

        col_names = [desc[0] for desc in cursor.description]
        result = dict(zip(col_names, row))
        return result

    except Exception as e:
        print(f"Error retrieving vehical_hourly_summary id={record_id}: {e}")
        return None



def get_vehical_hourly_summaries(sensor_id=None):
    """
    Returns a list of dicts.
    If sensor_id is given, only records for that sensor are returned.
    """
    try:
        base_query = """
            SELECT
                id,
                sensor_id,
                avg_vehical_count_hourly,
                created_date,
                updated_at,
                vehical_count_per_hour,
                record_count,
                hour,
                sensor_date
            FROM public.vehical_hourly_summary
        """

        if sensor_id is not None:
            query = base_query + " WHERE sensor_id = %s ORDER BY created_date DESC;"
            params = (sensor_id,)
        else:
            query = base_query + " ORDER BY created_date DESC;"
            params = None

        cursor.execute(query, params)
        rows = cursor.fetchall()

        col_names = [desc[0] for desc in cursor.description]
        results = [dict(zip(col_names, row)) for row in rows]

        print(f"Retrieved {len(results)} vehical_hourly_summary record(s)")
        return results

    except Exception as e:
        print(f"Error retrieving vehical_hourly_summary records: {e}")
        return []


def update_vehical_hourly_summary(record_id, data):
    """
    data: dict with any of:
      - sensor_id
      - avg_vehical_count_hourly
      - vehical_count_per_hour
      - record_count
      - hour
    Only provided fields will be updated.
    """
    try:
        # Build dynamic SET clause
        fields = []
        params = {}

        if "sensor_id" in data:
            fields.append("sensor_id = %(sensor_id)s")
            params["sensor_id"] = data["sensor_id"]

        if "avg_vehical_count_hourly" in data:
            fields.append("avg_vehical_count_hourly = %(avg_vehical_count_hourly)s")
            params["avg_vehical_count_hourly"] = data["avg_vehical_count_hourly"]

        if "vehical_count_per_hour" in data:
            fields.append("vehical_count_per_hour = %(vehical_count_per_hour)s")
            params["vehical_count_per_hour"] = data["vehical_count_per_hour"]

        if "record_count" in data:
            fields.append("record_count = %(record_count)s")
            params["record_count"] = data["record_count"]

        if "hour" in data:
            fields.append("hour = %(hour)s")
            params["hour"] = data["hour"]

        if not fields:
            print("No fields to update.")
            return False

        # Always update updated_at
        fields.append("updated_at = CURRENT_TIMESTAMP")

        update_query = f"""
            UPDATE public.vehical_hourly_summary
            SET {', '.join(fields)}
            WHERE id = %(id)s;
        """

        params["id"] = record_id

        cursor.execute(update_query, params)
        conn.commit()

        if cursor.rowcount == 0:
            print(f"No vehical_hourly_summary updated, id={record_id} not found.")
            return False

        print(f"Updated vehical_hourly_summary id={record_id}")
        return True

    except Exception as e:
        conn.rollback()
        print(f"Error updating vehical_hourly_summary id={record_id}: {e}")
        return False


def delete_vehical_hourly_summary(record_id):
    """
    Delete a record by id.
    Returns True if a row was deleted, False otherwise.
    """
    try:
        delete_query = """
            DELETE FROM public.vehical_hourly_summary
            WHERE id = %s;
        """

        cursor.execute(delete_query, (record_id,))
        conn.commit()

        if cursor.rowcount == 0:
            print(f"No vehical_hourly_summary deleted, id={record_id} not found.")
            return False

        print(f"Deleted vehical_hourly_summary id={record_id}")
        return True

    except Exception as e:
        conn.rollback()
        print(f"Error deleting vehical_hourly_summary id={record_id}: {e}")
        return False


##### Usage details# Create
new_id = create_vehical_hourly_summary({
    "sensor_id": "CAM-00123",
    "avg_vehical_count_hourly": "145.3",
    "vehical_count_per_hour": 160,
    "record_count": 12,
})

##### Read one
row = get_vehical_hourly_summary_by_id(new_id)

##### Update
update_vehical_hourly_summary(new_id, {
    "avg_vehical_count_hourly": "150.0",
    "vehical_count_per_hour": 170,
})

##### Delete
delete_vehical_hourly_summary(new_id)

### DB daily_peak_summary table CRUD operations

In [37]:
from dataclasses import dataclass

@dataclass
class DailyPeakSummary:
    id: int | None
    sensor_id: int
    peak_vehicle_count: int
    sensor_date: date

# ---------- CRUD FUNCTIONS ----------

def create_daily_peak_summary(sensor_id: int, peak_vehicle_count: int, sensor_date: date) -> DailyPeakSummary:
    
    try:
        """
        Insert a new row into daily_peak_summary and return it.
        Assumes `id` is auto-generated by the database.
        """
        sql = """
            INSERT INTO public.daily_peak_summary (sensor_id, peak_vehicle_count, sensor_date)
            VALUES (%s, %s, %s)
            RETURNING id, sensor_id, peak_vehicle_count, sensor_date;
        """


        cursor.execute(sql, (sensor_id, peak_vehicle_count, sensor_date))
        conn.commit()
        row = cursor.fetchone()

        print(f'Created daily_peak_summary id={row["id"]} successfully.')

        return DailyPeakSummary(
            id=row["id"],
            sensor_id=row["sensor_id"],
            peak_vehicle_count=row["peak_vehicle_count"],
            sensor_date=row["sensor_date"],
        )
    except Exception as e:
        conn.rollback()
        print(f"Error creating daily_peak_summary: {e}")
        return False


def get_daily_peak_summary_by_sensor_and_date(sensor_id: int, sensor_date: date) -> DailyPeakSummary | None:
    
    try:
        """
        Read a row using the UNIQUE (sensor_id, sensor_date) constraint.
        """
        sql = """
            SELECT id, sensor_id, peak_vehicle_count, sensor_date
            FROM public.daily_peak_summary
            WHERE sensor_id = %s AND sensor_date = %s;
        """


        cursor.execute(sql, (sensor_id, sensor_date))
        row = cursor.fetchone()

        if not row:
            return None

        return DailyPeakSummary(
            id=row["id"],
            sensor_id=row["sensor_id"],
            peak_vehicle_count=row["peak_vehicle_count"],
            sensor_date=row["sensor_date"],
        )

    except Exception as e:
        conn.rollback()
        print(f"Error retrieving daily_peak_summary: {e}")
        return False

def update_daily_peak_summary(
    id_: int,
    peak_vehicle_count: int | None = None
) -> DailyPeakSummary | None:
    
    try:
        """
        Update fields for the row with given id.
        Only non-None values are updated.
        Returns the updated row, or None if not found.
        """

        # Build dynamic SET clause
        fields = []
        params = []

        if peak_vehicle_count is not None:
            fields.append("peak_vehicle_count = %s")
            params.append(peak_vehicle_count)

        params.append(id_)

        sql = f"""
            UPDATE public.daily_peak_summary
            SET {", ".join(fields)}
            WHERE id = %s
            RETURNING id, sensor_id, peak_vehicle_count, sensor_date;
        """

        cursor.execute(sql, tuple(params))
        conn.commit()
        row = cursor.fetchone()

        if not row:
            return None
        
        print(f'Updated daily_peak_summary id={row["id"]} peak_vehicle_count={row["peak_vehicle_count"]} successfully.')

        return DailyPeakSummary(
            id=row["id"],
            sensor_id=row["sensor_id"],
            peak_vehicle_count=row["peak_vehicle_count"],
            sensor_date=row["sensor_date"],
        )

    except Exception as e:
        conn.rollback()
        print(f"Error updating daily_peak_summary: {e}")
        return False


"""
#  Usage example:

 # CREATE
    created = create_daily_peak_summary(sensor_id=1, peak_vehicle_count=150, sensor_date=date(2025, 11, 22))
    print("Created:", created)

    # READ by id
    loaded = get_daily_peak_summary_by_id(created.id)
    print("Loaded by id:", loaded)

    # UPDATE
    updated = update_daily_peak_summary(created.id, peak_vehicle_count=200)
    print("Updated:", updated)
"""

'\n#  Usage example:\n\n # CREATE\n    created = create_daily_peak_summary(sensor_id=1, peak_vehicle_count=150, sensor_date=date(2025, 11, 22))\n    print("Created:", created)\n\n    # READ by id\n    loaded = get_daily_peak_summary_by_id(created.id)\n    print("Loaded by id:", loaded)\n\n    # UPDATE\n    updated = update_daily_peak_summary(created.id, peak_vehicle_count=200)\n    print("Updated:", updated)\n'

### DB daily_sensor_availability table CRUD operations

In [38]:
from dataclasses import dataclass

@dataclass
class DailySensorAvailability:
    id: int | None
    sensor_id: int
    sensor_date: date
    signal_received_count: int
    sensor_availability_percentage: float  # or Decimal


# ---------------- CRUD FUNCTIONS ----------------

def create_daily_sensor_availability(
    sensor_id: int,
    sensor_date: date,
    signal_received_count: int = 1,
    sensor_availability_percentage: float = 0.0,
) -> DailySensorAvailability | bool:

    try:
        sql = """
            INSERT INTO public.daily_sensor_availability (sensor_id, sensor_date, signal_received_count, sensor_availability_percentage)
            VALUES (%s, %s, %s, %s)
            RETURNING id, sensor_id, sensor_date, signal_received_count, sensor_availability_percentage;
        """

        cursor.execute(sql, (sensor_id, sensor_date, signal_received_count, sensor_availability_percentage))
        row = cursor.fetchone()
        conn.commit()

        print(f'Created daily_sensor_availability id={row["id"]} successfully.')

        return DailySensorAvailability(
            id=row["id"],
            sensor_id=row["sensor_id"],
            sensor_date=row["sensor_date"],
            signal_received_count=row["signal_received_count"],
            sensor_availability_percentage=row.get("sensor_availability_percentage", 0.0),
        )
    except Exception as e:
        if conn:
            conn.rollback()
        print(f"Error creating daily_sensor_availability: {e}")
        return False
    
def get_daily_sensor_availability_by_sensor_and_date(
    sensor_id: int,
    sensor_date: date,
) -> DailySensorAvailability | None | bool:

    try:
        sql = """
            SELECT id, sensor_id, sensor_date, signal_received_count, sensor_availability_percentage
            FROM public.daily_sensor_availability
            WHERE sensor_id = %s AND sensor_date = %s;
        """

        cursor.execute(sql, (sensor_id, sensor_date))
        row = cursor.fetchone()

        if not row:
            return None

        return DailySensorAvailability(
            id=row["id"],
            sensor_id=row["sensor_id"],
            sensor_date=row["sensor_date"],
            signal_received_count=row["signal_received_count"],
            sensor_availability_percentage=row.get("sensor_availability_percentage", 0.0),
        )
    except Exception as e:
        if conn:
            conn.rollback()
        print(f"Error reading daily_sensor_availability for sensor_id={sensor_id}, date={sensor_date}: {e}")
        return False

def update_daily_sensor_availability(
    id_: int,
    signal_received_count: int | None = None,
    sensor_availability: float | None = None,
) -> DailySensorAvailability | None | bool:
    
    try:
        fields = []
        params: list = []

        if signal_received_count is not None:
            fields.append("signal_received_count = %s")
            params.append(signal_received_count)

        if sensor_availability is not None:
            fields.append("sensor_availability_percentage = %s")
            params.append(sensor_availability)

        params.append(id_)

        sql = f"""
            UPDATE public.daily_sensor_availability
            SET {", ".join(fields)}
            WHERE id = %s
            RETURNING id, sensor_id, sensor_date, signal_received_count, sensor_availability_percentage;
        """

        cursor.execute(sql, tuple(params))
        row = cursor.fetchone()
        conn.commit()

        if not row:
            print(f"No daily_sensor_availability found with id={id_}.")
            return None

        print(f'Updated daily_sensor_availability id={row["id"]} successfully.')

        return DailySensorAvailability(
            id=row["id"],
            sensor_id=row["sensor_id"],
            sensor_date=row["sensor_date"],
            signal_received_count=row["signal_received_count"],
            sensor_availability_percentage=row["sensor_availability_percentage"],
        )
    except Exception as e:
        if conn:
            conn.rollback()
        print(f"Error updating daily_sensor_availability id={id_}: {e}")
        return False




### Logic calculations

In [39]:
def calculate_hourly_details(data, existing_records, mapping):

    # Update the record count by one
    record_count = existing_records["record_count"] + 1  # including the new record
    mapping["record_count"] = record_count

    # Calculate total vehical count
    total_vehical_count = data["volume"] + existing_records["vehical_count_per_hour"]
    mapping["vehical_count_per_hour"] = total_vehical_count

    # Calculate avg vehical count per sensor_id
    avg_vehical_count_hourly = total_vehical_count / record_count
    mapping["avg_vehical_count_hourly"] = avg_vehical_count_hourly 

    return mapping

def get_date_from_datetime(datetime_str):
    dt = datetime.strptime(datetime_str, "%Y-%m-%d %H:%M:%S")
    return dt.date()

def convert_string_to_int(value):

    if value is None:
        return None

    try:
        # Convert anything (int, float, str) to string first
        cleaned = str(value).replace(",", "").strip()
        return int(cleaned)
    except (ValueError, TypeError) as e:
        print(f"Invalid atd_device_id: {value} -> {e}")
        return None


def update_vehicle_hourly_summary_details(data):

    # Map the record fields into mapping object
    mapping = {
        "sensor_id": data["atd_device_id"],
        "avg_vehical_count_hourly": 0, # will be calculated in step 02
        "vehical_count_per_hour": data.get("volume", 0), # default to 0 if not present, for already exist records, add this amount to the existing value
        "record_count": 0,  # will be calculated
        "hour": data.get("hour", -1),  # default to -1 if not present
        "sensor_date": data.get("datetime", date.today())
    }

    # Check if a record exists for this sensor_id
    existing_records = get_vehical_hourly_summaries(sensor_id=mapping["sensor_id"])
    
    # Check if there is an existing record for the same date
    if existing_records:
        current_data = existing_records[0]
        db_date = current_data["created_date"]
        incoming_date = datetime.strptime(data["datetime"], "%Y-%m-%d %H:%M:%S")
        is_record_within_day = db_date.date() == incoming_date.date()

        #  Is hour matches
        is_same_hour = current_data["hour"] == data.get("hour")
    
    else:
        is_record_within_day = False
        is_same_hour = False

    # If the record exists, and the hour matches and date within the day, update it
    if existing_records and is_same_hour and is_record_within_day:

        # Calculate Hourly Vehical Details
        mapping = calculate_hourly_details(data, existing_records[0], mapping)
        
        # Update the most recent record
        latest_record = existing_records[0]
        update_vehical_hourly_summary(latest_record["id"], mapping)

    # If the record exists, and the hour mismatches and date within the same day, update it
    elif existing_records and is_same_hour and is_record_within_day:
        
        # Calculate Hourly Vehical Details
        mapping = calculate_hourly_details(data, existing_records[0], mapping)

        # Update the most recent record
        latest_record = existing_records[0]
        update_vehical_hourly_summary(latest_record["id"], mapping)

    else:

        # 1. Update the record count by one
        mapping["record_count"] = 1  # including the new record

        # 2. calculate avg vehical count per sensor_id
        avg_vehical_count_hourly = data["volume"] / 1  # record count taken as 1 since the first record
        mapping["avg_vehical_count_hourly"] = avg_vehical_count_hourly

        # Insert a new record
        create_vehical_hourly_summary(mapping)  

    return

def update_vehicle_daily_peak_summary_details(data):

    # Preprocess the data and create a boject with sensor_id, date, volume
    dps = DailyPeakSummary(
        id=None,                  # or an int if you already know it
        sensor_id=convert_string_to_int(data["atd_device_id"]),
        peak_vehicle_count=convert_string_to_int(data["volume"]),
        sensor_date=get_date_from_datetime(data["datetime"]),
    )
    
    # check if a record exists for this sensor_id and date
    existing_record = get_daily_peak_summary_by_sensor_and_date(
        sensor_id=dps.sensor_id,
        sensor_date=dps.sensor_date
    )
    if existing_record:
        # if exist: update the record if the volume is greater than existing
        print(f'Existing record found: {existing_record}')
        if dps.peak_vehicle_count > existing_record.peak_vehicle_count:
            update_daily_peak_summary(
                id_=existing_record.id,
                peak_vehicle_count=dps.peak_vehicle_count
            )
    else:
        # else: insert a new record 
        create_daily_peak_summary(
            sensor_id=dps.sensor_id,
            peak_vehicle_count=dps.peak_vehicle_count,
            sensor_date=dps.sensor_date
        )

def update_vehicle_daily_sensor_availibility_details(data):

    # const value of daily expected signals
    DAILY_EXPECTED_SIGNALS = 96  # assuming one signal per minute

    # Preprocess the timestamp to date4
    dsa = DailySensorAvailability(
        id=None,
        sensor_id=convert_string_to_int(data["atd_device_id"]),
        sensor_date=get_date_from_datetime(data["datetime"]),
        signal_received_count=1,  # default to 1 for new record
        sensor_availability_percentage= 0.0,  # default set to 0.0
    )

    # check if a record exists for this sensor_id and date
    existing_record = get_daily_sensor_availability_by_sensor_and_date(
        sensor_id=dsa.sensor_id,
        sensor_date=dsa.sensor_date
    )
    if existing_record:
        # if exist: update the record count
        new_count = existing_record.signal_received_count + 1
        sensor_availability = (new_count / DAILY_EXPECTED_SIGNALS) * 100  # assuming 4*24=96 expected signals per day
        update_daily_sensor_availability(
            id_=existing_record.id,
            signal_received_count=new_count,
            sensor_availability=sensor_availability
        )
    else:
        # else: insert a new record with signal count 1
        create_daily_sensor_availability(
            sensor_id=dsa.sensor_id,
            sensor_date=dsa.sensor_date,
            signal_received_count=dsa.signal_received_count,
            sensor_availability_percentage= dsa.sensor_availability_percentage
        )


In [40]:
def test():    
    data = {
        "record_id": "f92cb55d3a2e3f750aed090c78a86033",
        "atd_device_id": "6,353",
        "read_date": "2020 Mar 06 12:45:00 PM",
        "intersection_name": "ENFIELD RD / EXPOSITION BLVD",
        "direction": "SOUTHBOUND",
        "movement": "THRU",
        "heavy_vehicle": 1,
        "volume": 1,
        "speed_average_(miles_per_hour)": 9,
        "speed_stddev": 0,
        "seconds_in_zone_average": 0.9,
        "seconds_in_zone_stddev": 0,
        "month": 3,
        "day": 6,
        "year": "2,020",
        "hour": 12,
        "minute": 45,
        "day_of_week": 5,
        "bin_duration_(seconds)": 900,
        "datetime": "2020-03-06 12:45:00",
        "datetime_utc": "2020-03-06 18:45:00+00:00",
        "timestamp_ms": 1583520300000
    }

    update_vehicle_daily_peak_summary_details(data)

test()

Existing record found: DailyPeakSummary(id=17957, sensor_id=6353, peak_vehicle_count=1, sensor_date=datetime.date(2020, 3, 6))


### Consume messages and insert into database

In [None]:
try:
    for message in consumer:
        data = message.value  # This is a dict after JSON deserialization

        # Update hourly vehical summary details
        # update_vehicle_hourly_summary_details(data)

        # Update daily peak summary details
        update_vehicle_daily_peak_summary_details(data)

        # Update daily sensor availability details
        update_vehicle_daily_sensor_availibility_details(data)

except KeyboardInterrupt:
    print('Stopped consuming.')

finally:
    cursor.close()
    conn.close()
    consumer.close()