In [1]:
import psycopg2
import subprocess
import json
import pandas as pd
from datetime import datetime, timezone  # Added timezone
import os
import re  # For removing commas from database timestamp

# PostgreSQL Location Query

In [2]:
DB_HOST = "localhost"
DB_PORT = "5432"
DB_NAME = "SafetyTracker"
DB_USER = "kiet"
DB_PASSWORD = "kietvo17112003"

DECRYPT_SCRIPT_PATH = "./decrypt.py"
PRIVATE_KEY = "hUotVQIdoniIfacuUNHahmnNK98GRV6+kn+sOQ=="

In [3]:
# Helper functions
def connect_db():
    try:
        conn = psycopg2.connect(
            host=DB_HOST,
            port=DB_PORT,
            dbname=DB_NAME,
            user=DB_USER,
            password=DB_PASSWORD,
        )
        return conn
    except psycopg2.Error as e:
        print(f"Error connecting to PostgreSQL database: {e}")
        return None


def fetch_raw_movement_data(conn):
    if not conn:
        return None
    try:
        with conn.cursor() as cur:
            # Ensure you select the correct columns that you'll use
            cur.execute(
                'SELECT "LocationID", "DeviceID", "DatePublished" AS "DBDatePublished", "Payload" AS "EncryptedPayloadDB", "Description" AS "DBDescription", "StatusCode" AS "DBStatusCode" FROM "DeviceLocation" ORDER BY "LocationID";'
            )
            colnames = [desc[0] for desc in cur.description]
            rows = cur.fetchall()
            return pd.DataFrame(rows, columns=colnames)
    except psycopg2.Error as e:
        print(f"Error fetching data: {e}")
        return None


def decrypt_payload_data(payload_str, private_key_val_or_path):
    if (
        not payload_str or pd.isna(payload_str) or payload_str.strip() == "[NULL]"
    ):  # Handle [NULL] string
        return None

    command = [
        "python3",
        DECRYPT_SCRIPT_PATH,
        str(private_key_val_or_path),
        payload_str,
    ]
    try:
        result = subprocess.run(command, capture_output=True, text=True, check=True)
        decrypted_output_str = result.stdout.strip()
        # Let's try to parse it as a list first, then as an object.
        try:
            parsed_json = json.loads(decrypted_output_str)
            if isinstance(parsed_json, list) and len(parsed_json) > 0:
                return parsed_json[0]  # Return the first object if it's a list
            elif isinstance(parsed_json, dict):
                return parsed_json  # Return the object if it's a dictionary
            else:
                print(
                    f"Warning: Decrypted payload is not a recognized JSON object or list: {decrypted_output_str}"
                )
                return None
        except json.JSONDecodeError as e:
            print(
                f"Error decoding JSON from decrypted payload '{decrypted_output_str}': {e}"
            )
            return None
    except subprocess.CalledProcessError as e:
        print(
            f"Error during decryption script execution for payload '{payload_str[:30]}...': {e}"
        )
        print(f"Stderr: {e.stderr}")
        return None
    except FileNotFoundError:
        print(f"Error: Decryption script not found at {DECRYPT_SCRIPT_PATH}")
        return None
    except Exception as e:
        print(f"An unexpected error occurred during decryption: {e}")
        return None

In [4]:
def process_movement_data(raw_df):
    if raw_df is None:
        return pd.DataFrame()

    processed_data_list = []
    for index, row in raw_df.iterrows():
        db_location_id = row["LocationID"]
        db_device_id = row["DeviceID"] # From DB
        encrypted_payload_db = row["EncryptedPayloadDB"]

        # Initialize fields to be extracted from decrypted payload
        actual_timestamp_utc = None
        latitude = None
        longitude = None
        confidence = None
        dec_device_id = None # id from decrypted payload
        dec_description = row["DBDescription"] # Default to DB version
        dec_status_code = row["DBStatusCode"] # Default to DB version
        dec_date_published_utc = None # Top-level datePublished from decrypted payload

        if encrypted_payload_db and not pd.isna(encrypted_payload_db) and encrypted_payload_db.strip() != "[NULL]":
            decrypted_json = decrypt_payload_data(encrypted_payload_db, PRIVATE_KEY)

            if decrypted_json:
                dec_device_id = decrypted_json.get("id")
                dec_description = decrypted_json.get("description", dec_description) # Use decrypted if available
                dec_status_code = decrypted_json.get("statusCode", dec_status_code) # Use decrypted if available

                # Process top-level datePublished (milliseconds)
                dec_date_published_ms = decrypted_json.get("datePublished")
                if dec_date_published_ms:
                    try:
                        dec_date_published_utc = datetime.fromtimestamp(dec_date_published_ms / 1000.0, tz=timezone.utc)
                    except (ValueError, TypeError) as e:
                        print(f"Warning: Could not parse decrypted datePublished '{dec_date_published_ms}' for DB LocationID {db_location_id}: {e}")

                # Process nested payload
                payload_data = decrypted_json.get("payload")
                if payload_data and isinstance(payload_data, dict):
                    latitude = payload_data.get("latitude")
                    longitude = payload_data.get("longitude")
                    confidence = payload_data.get("confidence")
                    payload_timestamp_s = payload_data.get("timestamp") # Assumed in seconds

                    if payload_timestamp_s:
                        try:
                            actual_timestamp_utc = datetime.fromtimestamp(payload_timestamp_s, tz=timezone.utc)
                        except (ValueError, TypeError) as e:
                            print(f"Warning: Could not parse decrypted payload.timestamp '{payload_timestamp_s}' for DB LocationID {db_location_id}: {e}")
                    else:
                        print(f"Warning: 'timestamp' not found in decrypted payload.payload for DB LocationID {db_location_id}")

                    if latitude is None or longitude is None:
                        print(f"Warning: 'latitude' or 'longitude' not found in decrypted payload.payload for DB LocationID {db_location_id}")
                else:
                    print(f"Warning: 'payload' object not found or not a dict in decrypted data for DB LocationID {db_location_id}")
            else:
                print(f"Warning: Failed to decrypt payload for DB LocationID {db_location_id}")
        elif encrypted_payload_db and encrypted_payload_db.strip() == "[NULL]":
            print(f"Info: EncryptedPayloadDB is '[NULL]' string for DB LocationID {db_location_id}. No decryption attempted.")


        processed_data_list.append({
            "DBLocationID": db_location_id,
            "DeviceID": dec_device_id if dec_device_id else db_device_id, # Prioritize decrypted, fallback to DB
            "ActualTimestampUTC": actual_timestamp_utc, # This is payload.timestamp
            "Latitude": latitude,
            "Longitude": longitude,
            "Confidence": confidence,
            "DecryptedDescription": dec_description,
            "DecryptedStatusCode": dec_status_code,
            "DecryptedDatePublishedUTC": dec_date_published_utc, # Top-level datePublished
            "DBDatePublishedRaw": row["DBDatePublished"], # Keep original for reference if needed
            "EncryptedPayloadDB": encrypted_payload_db
        })

    return pd.DataFrame(processed_data_list)

## Main Execution

In [None]:
if __name__ == "__main__":

    if not os.path.exists(DECRYPT_SCRIPT_PATH):
        print(f"CRITICAL ERROR: Decryption script not found at {DECRYPT_SCRIPT_PATH}")
        exit()

    db_connection = connect_db()
    if db_connection:
        raw_df_from_db = fetch_raw_movement_data(db_connection)
        db_connection.close()

        if raw_df_from_db is not None and not raw_df_from_db.empty:
            print(f"Fetched {len(raw_df_from_db)} raw movement records from database.")
            print("Processing and decrypting payloads (this may take some time)...")
            
            processed_df = process_movement_data(raw_df_from_db.copy())

            print("\n--- Processed Movement Data (First 5 Records) ---")
            print(processed_df.head())
            print("\n--- Processed Movement Data (Last 5 Records) ---")
            print(processed_df.tail())
            print("\n--- Processed Movement Data Info ---")
            processed_df.info()


            # Further cleaning based on project plan (Section VI - Movement history data)
            # Prioritize 'ActualTimestampUTC' (from payload.timestamp) and Lat/Lon from decrypted payload
            cleaned_df = processed_df.dropna(subset=['ActualTimestampUTC', 'Latitude', 'Longitude'])
            print(f"\nRemoved {len(processed_df) - len(cleaned_df)} rows with missing essential data (ActualTimestampUTC, Latitude, Longitude) after processing.")
            print(f"Final dataset size for map-matching: {len(cleaned_df)} records.")

            if not cleaned_df.empty:
                output_filename_parquet = "danang_movement_processed_decrypted.parquet"
                try:
                    cleaned_df.to_parquet(output_filename_parquet, index=False)
                    print(f"\nProcessed and decrypted movement data saved to '{output_filename_parquet}'.")
                except Exception as e:
                    print(f"\nError saving processed data to Parquet: {e}")

                output_filename_csv = "danang_movement_processed_decrypted.csv"
                try:
                    cleaned_df.to_csv(output_filename_csv, index=False)
                    print(f"Processed and decrypted movement data saved to '{output_filename_csv}'.")
                except Exception as e:
                    print(f"\nError saving processed data to CSV: {e}")
            else:
                print("\nNo data left after cleaning. Output files not saved.")
        else:
            print("No raw movement data fetched from database or DataFrame is empty.")
    else:
        print("Could not connect to the database. Exiting.")

    print("\nProcessing finished.")