
This notebook processes and uploads temperature measurement data from text files to the TimescaleDB database.

## Purpose
- Scan directories for temperature data files (m7004_ID_{identifier}.txt files)
- Parse the data into structured format
- Upload the data to the TimescaleDB database
- Avoid duplicate data entries

## Prerequisites
- Running TimescaleDB instance (configured in docker-compose.yml)
- Access to directory containing temperature data files
- Environment variables configured in .env file (for database connection)


Import required libraries and install any missing dependencies.

In [1]:
import os
import re
import uuid
import pandas as pd
import numpy as np
from datetime import datetime, timezone
from pathlib import Path

# Database libraries
from sqlalchemy import create_engine, text

# Progress tracking
from tqdm.notebook import tqdm

# Environment variables
from dotenv import load_dotenv

# Logging
import logging
logging.basicConfig(level=logging.INFO,
                   format='%(asctime)s - %(levelname)s - %(message)s')

In [2]:
!pip install psycopg2-binary sqlalchemy pandas tqdm pathlib python-dotenv
import psycopg2




Load configuration from environment variables and set up constants.

In [3]:
# Look for the .env file two directories up from the notebook location
dotenv_path = Path("../../.env")
load_dotenv(dotenv_path)

# Database configuration from container environment variables with fallbacks
DB_CONFIG = {
    'host': os.getenv('POSTGRES_HOST', 'timescaledb'),  # Use container service name
    'port': int(os.getenv('POSTGRES_PORT', 5432)),
    'database': os.getenv('POSTGRES_DB', 'perocube'),
    'user': os.getenv('POSTGRES_USER', 'postgres'),
    'password': os.getenv('POSTGRES_PASSWORD', 'postgres')
}

# Print database connection info (excluding password)
print(f"Database connection: {DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']} as {DB_CONFIG['user']}")

# Data directory configuration - using relative path from notebook location
ROOT_DIRECTORY = str(Path("../../sample_data/datasets/PeroCube-sample-data").resolve())

# File matching pattern for temperature data
TEMPERATURE_FILE_PATTERN = r"m7004_ID_([A-F0-9]+)\.txt$"

# Batch size for database operations
BATCH_SIZE = 5000

# UUID namespace for temperature sensors (same as irradiance for consistency)
SENSOR_UUID_NAMESPACE = uuid.UUID('12345678-1234-5678-1234-567812345678')

# Data validation configuration
VALIDATION_CONFIG = {
    'enabled': True,  # Master switch for validation
    'remove_nan': True,  # Always remove NaN values from timestamp column
}

def print_validation_config():
    """Print current validation configuration for user awareness"""
    print("\nData Validation Configuration:")
    print(f"- Validation enabled: {VALIDATION_CONFIG['enabled']}")
    print(f"- Remove NaN values from timestamp: {VALIDATION_CONFIG['remove_nan']}")

Database connection: timescaledb:5432/perocube as postgres



Define helper functions for database connection, data validation, and sensor management.

In [4]:
def create_db_connection(config=DB_CONFIG):
    """
    Create a SQLAlchemy database engine from configuration.
    
    Args:
        config: Dictionary containing database connection parameters
        
    Returns:
        SQLAlchemy engine instance
    """
    try:
        connection_string = f"postgresql://{config['user']}:{config['password']}@{config['host']}:{config['port']}/{config['database']}"
        engine = create_engine(connection_string)
        
        # Test the connection
        with engine.connect() as conn:
            result = conn.execute(text("SELECT 1"))
            logging.info(f"Database connection successful: {config['host']}:{config['port']}/{config['database']}")
        return engine
    except Exception as e:
        logging.error(f"Database connection failed: {str(e)}")
        raise

def generate_sensor_id(sensor_identifier):
    """
    Generate a deterministic UUID for a temperature sensor based on its identifier.
    
    Args:
        sensor_identifier (str): Complete sensor identifier (e.g., 'm7004_ID_37F6F9511A64FF28')
        
    Returns:
        UUID: Deterministic UUID5 for the sensor
    """
    return uuid.uuid5(SENSOR_UUID_NAMESPACE, sensor_identifier)

def get_or_create_sensor(engine, sensor_identifier):
    """
    Get existing sensor or create a new one if it doesn't exist.
    
    Args:
        engine: SQLAlchemy engine instance
        sensor_identifier (str): Complete sensor identifier (e.g., 'm7004_ID_37F6F9511A64FF28')
        
    Returns:
        UUID: sensor_id of the existing or newly created sensor
    """
    sensor_id = generate_sensor_id(sensor_identifier)
    
    try:
        with engine.connect() as conn:
            # Check if sensor exists
            result = conn.execute(
                text("""
                SELECT temperature_sensor_id 
                FROM temperature_sensor 
                WHERE sensor_identifier = :identifier
                """),
                {"identifier": sensor_identifier}
            )
            
            if not result.fetchone():
                # Create new sensor if it doesn't exist
                conn.execute(
                    text("""
                    INSERT INTO temperature_sensor 
                    (temperature_sensor_id, sensor_identifier, date_installed) 
                    VALUES (:id, :identifier, NULL)
                    """),
                    {
                        "id": sensor_id,
                        "identifier": sensor_identifier
                    }
                )
                conn.commit()
                logging.info(f"Created new sensor: {sensor_identifier}")
            else:
                logging.info(f"Found existing sensor: {sensor_identifier}")
                
        return sensor_id
    except Exception as e:
        logging.error(f"Error in get_or_create_sensor: {str(e)}")
        raise

def validate_temperature_data(df):
    """
    Validate temperature measurement data according to configuration.
    
    Args:
        df: DataFrame containing temperature measurements
        
    Returns:
        Cleaned and validated DataFrame, along with validation statistics
    """
    if df.empty:
        return df, {'initial_count': 0, 'final_count': 0, 'removed': {}}
    
    stats = {
        'initial_count': len(df),
        'final_count': None,
        'removed': {
            'nan_values': 0
        }
    }
    
    # Always ensure timestamp is in UTC
    if 'timestamp' in df.columns:
        df['timestamp'] = pd.to_datetime(df['timestamp'], utc=True)
    
    # Remove NaN values from timestamp column as required by TimescaleDB
    if VALIDATION_CONFIG['remove_nan']:
        nan_count = df['timestamp'].isna().sum()
        df = df.dropna(subset=['timestamp'])
        stats['removed']['nan_values'] = nan_count
    
    stats['final_count'] = len(df)
    
    # Log validation results
    logging.info("Validation statistics:")
    logging.info(f"Initial records: {stats['initial_count']}")
    if VALIDATION_CONFIG['remove_nan']:
        logging.info(f"Removed timestamp NaN values: {stats['removed']['nan_values']}")
    logging.info(f"Final records: {stats['final_count']}")
    
    return df, stats

def check_existing_data(engine, sensor_identifier, timestamps):
    """
    Check if data already exists in the database for given parameters.
    
    Args:
        engine: SQLAlchemy engine
        sensor_identifier: Temperature sensor identifier
        timestamps: List of timestamps to check
        
    Returns:
        Boolean indicating if data exists
    """
    if not timestamps:
        return False
        
    # For efficiency, just check the min and max timestamps
    min_timestamp = min(timestamps)
    max_timestamp = max(timestamps)
    
    sensor_id = generate_sensor_id(sensor_identifier)
    
    # Build a query to check for existing data
    query = text("""
        SELECT COUNT(*)
        FROM temperature_measurement
        WHERE timestamp BETWEEN :min_timestamp AND :max_timestamp
          AND temperature_sensor_id = :sensor_id
    """)
    
    # Execute the query
    with engine.connect() as conn:
        result = conn.execute(query, {
            "min_timestamp": min_timestamp,
            "max_timestamp": max_timestamp,
            "sensor_id": sensor_id
        })
        count = result.scalar()
        
    # If count > 0, some data exists
    return count > 0


Main function to process and upload temperature data files.

In [5]:
def process_temperature_files(root_dir, engine, pattern=TEMPERATURE_FILE_PATTERN, batch_size=BATCH_SIZE):
    """
    Process temperature data files and upload measurements to the database.
    
    Args:
        root_dir: Root directory to search for files
        engine: SQLAlchemy engine instance
        pattern: Regex pattern to match files
        batch_size: Number of records to process in one batch
        
    Returns:
        Dictionary with processing statistics
    """
    # Statistics to track progress
    stats = {
        'files_processed': 0,
        'files_skipped': 0,
        'files_error': 0,
        'rows_inserted': 0,
        'start_time': datetime.now(timezone.utc),
        'total_files': 0
    }
    
    # Convert to Path object
    root_path = Path(root_dir)
    if not root_path.exists():
        logging.error(f"Root directory does not exist: {root_dir}")
        return stats
    
    # Compile regex pattern
    pattern_compiled = re.compile(pattern)
    
    # Find all matching files
    matching_files = []
    for dirpath, dirnames, filenames in os.walk(root_path):
        path_parts = Path(dirpath).parts
        if any(part.startswith("data") for part in path_parts):
            for filename in filenames:
                filepath = Path(dirpath) / filename
                match = pattern_compiled.search(filename)
                if match:
                    identifier = match.group(1)
                    sensor_identifier = f"m7004_ID_{identifier}"
                    matching_files.append((filepath, sensor_identifier))
    
    stats['total_files'] = len(matching_files)
    logging.info(f"Found {len(matching_files)} temperature data files to process")
    
    # Process each file
    with tqdm(total=len(matching_files), desc="Processing Files") as pbar:
        for filepath, sensor_identifier in matching_files:
            try:
                logging.info(f"Processing file: {filepath}")
                logging.info(f"Sensor: {sensor_identifier}")
                
                # Read the data file
                df = pd.read_csv(filepath, sep='\t',
                               names=['timestamp', 'temperature'])
                
                if df.empty:
                    logging.warning(f"Empty file: {filepath}")
                    stats['files_skipped'] += 1
                    pbar.update(1)
                    continue
                
                # Validate data
                df, validation_stats = validate_temperature_data(df)
                if df.empty:
                    logging.warning(f"No valid data after validation: {filepath}")
                    stats['files_skipped'] += 1
                    pbar.update(1)
                    continue
                
                # Check for existing data
                if check_existing_data(engine, sensor_identifier, df['timestamp'].tolist()):
                    logging.info(f"Data already exists for {filepath}. Skipping file.")
                    stats['files_skipped'] += 1
                    pbar.update(1)
                    continue
                
                # Get or create sensor
                sensor_id = get_or_create_sensor(engine, sensor_identifier)
                
                # Add sensor_id to DataFrame
                df['temperature_sensor_id'] = sensor_id
                
                # Upload data in batches
                total_rows = len(df)
                for i in range(0, total_rows, batch_size):
                    batch_df = df.iloc[i:i+batch_size]
                    batch_df.to_sql('temperature_measurement', engine, 
                                  if_exists='append', index=False)
                
                stats['rows_inserted'] += total_rows
                stats['files_processed'] += 1
                logging.info(f"Successfully uploaded {total_rows} rows from {filepath}")
                
                # Clean up
                del df
                pbar.update(1)
                
            except Exception as e:
                logging.error(f"Error processing {filepath}: {str(e)}")
                stats['files_error'] += 1
                pbar.update(1)
    
    # Calculate duration
    stats['end_time'] = datetime.now(timezone.utc)
    stats['duration_seconds'] = (stats['end_time'] - stats['start_time']).total_seconds()
    
    logging.info(f"Processing complete. "
                 f"Processed {stats['files_processed']} files, "
                 f"skipped {stats['files_skipped']} files, "
                 f"errors in {stats['files_error']} files. "
                 f"Inserted {stats['rows_inserted']} data points "
                 f"in {stats['duration_seconds']:.2f} seconds.")
    
    return stats

## 5. Execute the Data Upload Process

In [6]:
try:
    engine = create_db_connection()
    logging.info("Database connection established successfully")
except Exception as e:
    logging.error(f"Failed to connect to database: {str(e)}")
    raise

2025-05-22 09:09:22,097 - INFO - Database connection successful: timescaledb:5432/perocube
2025-05-22 09:09:22,099 - INFO - Database connection established successfully


In [7]:
print_validation_config()


Data Validation Configuration:
- Validation enabled: True
- Remove NaN values from timestamp: True


In [8]:
print(f"Starting temperature data processing from directory: {ROOT_DIRECTORY}")
stats = process_temperature_files(ROOT_DIRECTORY, engine)

2025-05-22 09:09:32,321 - INFO - Found 43 temperature data files to process


Starting temperature data processing from directory: /home/jovyan/sample_data/datasets/PeroCube-sample-data


Processing Files:   0%|          | 0/43 [00:00<?, ?it/s]

2025-05-22 09:09:32,331 - INFO - Processing file: /home/jovyan/sample_data/datasets/PeroCube-sample-data/data_20240319/data/m7004_ID_B700000D569E0C28.txt
2025-05-22 09:09:32,332 - INFO - Sensor: m7004_ID_B700000D569E0C28
2025-05-22 09:09:32,346 - INFO - Validation statistics:
2025-05-22 09:09:32,347 - INFO - Initial records: 10716
2025-05-22 09:09:32,348 - INFO - Removed timestamp NaN values: 0
2025-05-22 09:09:32,348 - INFO - Final records: 10716
2025-05-22 09:09:32,419 - INFO - Data already exists for /home/jovyan/sample_data/datasets/PeroCube-sample-data/data_20240319/data/m7004_ID_B700000D569E0C28.txt. Skipping file.
2025-05-22 09:09:32,420 - INFO - Processing file: /home/jovyan/sample_data/datasets/PeroCube-sample-data/data_20240319/data/m7004_ID_3F00000A108FDE28.txt
2025-05-22 09:09:32,420 - INFO - Sensor: m7004_ID_3F00000A108FDE28
2025-05-22 09:09:32,432 - INFO - Validation statistics:
2025-05-22 09:09:32,432 - INFO - Initial records: 10716
2025-05-22 09:09:32,433 - INFO - Remov

In [9]:
def format_duration(seconds):
    """Format duration in a human-readable format"""
    hours = int(seconds // 3600)
    minutes = int((seconds % 3600) // 60)
    secs = seconds % 60
    if hours > 0:
        return f"{hours}h {minutes}m {secs:.1f}s"
    elif minutes > 0:
        return f"{minutes}m {secs:.1f}s"
    else:
        return f"{secs:.1f}s"

def format_number(n):
    """Format number with thousand separators"""
    return f"{n:,}"

# Display processing statistics
if 'stats' in locals():
    print("📊 File Processing")
    print("━━━━━━━━━━━━━━━")
    print(f"📁 Total files found:           {format_number(stats.get('total_files', 0)):>10}")
    print(f"✅ Successfully processed:      {format_number(stats.get('files_processed', 0)):>10}")
    print(f"⏭️  Skipped (existing/empty):    {format_number(stats.get('files_skipped', 0)):>10}")
    print(f"❌ Errors during processing:    {format_number(stats.get('files_error', 0)):>10}")
    
    print("\n📈 Data Statistics")
    print("━━━━━━━━━━━━━━━")
    print(f"📝 Data points inserted:        {format_number(stats.get('rows_inserted', 0)):>10}")
    
    if 'duration_seconds' in stats:
        duration = format_duration(stats['duration_seconds'])
        print("\n⚡ Performance Metrics")
        print("━━━━━━━━━━━━━━━━━━")
        print(f"⏱️  Total processing time:      {duration:>10}")
        
        if stats.get('rows_inserted', 0) > 0 and stats.get('duration_seconds', 0) > 0:
            throughput = stats['rows_inserted'] / stats['duration_seconds']
            print(f"🚀 Processing speed:           {format_number(int(throughput)):>10} rows/sec")

    # Database verification
    try:
        with engine.connect() as conn:
            result = conn.execute(text("SELECT COUNT(*) FROM temperature_measurement"))
            total_count = result.scalar()
            
            print("\n🗄️  Database Status")
            print("━━━━━━━━━━━━━━━")
            print(f"💾 Total records in database:  {format_number(total_count):>10}")
            
    except Exception as e:
        print("\n⚠️  Could not verify database status:")
        print(f"   {str(e)}")
else:
    print("❌ No statistics available - processing may have failed")
    print("   Please check the logs above for errors.")

📊 File Processing
━━━━━━━━━━━━━━━
📁 Total files found:                   43
✅ Successfully processed:               0
⏭️  Skipped (existing/empty):            43
❌ Errors during processing:             0

📈 Data Statistics
━━━━━━━━━━━━━━━
📝 Data points inserted:                 0

⚡ Performance Metrics
━━━━━━━━━━━━━━━━━━
⏱️  Total processing time:            4.8s

🗄️  Database Status
━━━━━━━━━━━━━━━
💾 Total records in database:   2,878,659
