In [1]:
pip install pandas

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.0 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [2]:
import pandas as pd
from datetime import datetime, timedelta

# Load CSV
df = pd.read_csv("AirQualityData-TH.csv")

# Function to fix timestamps
def fix_timestamp(row):
    date_part, time_part = row.split(", ")
    if time_part.startswith("24:"):
        new_date = (datetime.strptime(date_part, "%m/%d/%Y") + timedelta(days=1)).strftime("%m/%d/%Y")
        new_time = time_part.replace("24:", "00:")
        return f"{new_date}, {new_time}"
    return row

# Apply fix to ReceivedTime column
df["ReceivedTime"] = df["ReceivedTime"].apply(fix_timestamp)

# Save to new CSV
df.to_csv("AirQualityData-TH-fixed.csv", index=False)


In [1]:
pip install psycopg2

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.0 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [6]:
import psycopg2
from psycopg2 import sql
from urllib.parse import urlparse
import pandas as pd
from datetime import datetime
import os

# Database connection from URL
DATABASE_URL = "postgresql://postgres:root@localhost:5432/postgres?schema=public"

# Path to your CSV file
CSV_FILE_PATH = "AirQualityData-TH-fixed.csv"

def parse_db_url(db_url):
    """Extract connection parameters from DATABASE_URL"""
    result = urlparse(db_url)
    return {
        'host': result.hostname,
        'database': result.path[1:],  # Remove leading '/'
        'user': result.username,
        'password': result.password,
        'port': result.port,
    }

def parse_datetime(dt_str):
    """Handle multiple timestamp formats"""
    formats = [
        '%Y-%m-%dT%H:%M:%S.%fZ',  # With microseconds and Z
        '%Y-%m-%dT%H:%M:%SZ',     # Without microseconds, with Z
        '%Y-%m-%dT%H:%M:%S.%f',   # With microseconds, no Z
        '%Y-%m-%dT%H:%M:%S'       # Without microseconds, no Z
    ]
    
    for fmt in formats:
        try:
            return datetime.strptime(dt_str, fmt)
        except ValueError:
            continue
    raise ValueError(f"Time data '{dt_str}' doesn't match any known format")

def get_column_case_insensitive(df, possible_names):
    """Get column name with case-insensitive matching"""
    for name in possible_names:
        if name in df.columns:
            return name
        if name.lower() in [col.lower() for col in df.columns]:
            return [col for col in df.columns if col.lower() == name.lower()][0]
    raise KeyError(f"None of {possible_names} found in DataFrame columns")

def insert_data_from_csv():
    try:
        # Parse connection parameters
        db_params = parse_db_url(DATABASE_URL)
        
        # Read CSV file
        df = pd.read_csv(CSV_FILE_PATH)
        
        # Find correct column names (handling variations)
        pm25_col = get_column_case_insensitive(df, ['PM2.5', 'PM2_5', 'pm25'])
        pm10_col = get_column_case_insensitive(df, ['PM10', 'pm10'])
        reported_col = get_column_case_insensitive(df, ['ReportedTime-UTC', 'ReportedTime_UTC'])
        
        # Connect to PostgreSQL
        conn = psycopg2.connect(**db_params)
        cursor = conn.cursor()
        
        # Prepare the insert statement
        insert_query = sql.SQL("""
            INSERT INTO public.air_quality(
                sitename, humidity, temperature, noise, pm2_5, pm10, 
                receivedtime, reportedtime_utc)
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
        """)
        
        # Counters for success/error
        success_count = 0
        error_count = 0
        
        # Iterate through DataFrame rows and insert data
        for index, row in df.iterrows():
            try:
                # Convert date strings to datetime objects
                received_time = datetime.strptime(row['ReceivedTime'], '%m/%d/%Y, %H:%M:%S')
                reported_time_utc = parse_datetime(row[reported_col])
                
                # Handle possible NaN/empty values
                data = {
                    'sitename': row['SiteName'],
                    'humidity': float(row['Humidity']) if pd.notna(row['Humidity']) else None,
                    'temperature': float(row['Temperature']) if pd.notna(row['Temperature']) else None,
                    'noise': int(row['Noise']) if pd.notna(row['Noise']) else None,
                    'pm2_5': float(row[pm25_col]) if pd.notna(row[pm25_col]) else None,
                    'pm10': float(row[pm10_col]) if pd.notna(row[pm10_col]) else None,
                    'received_time': received_time,
                    'reported_time_utc': reported_time_utc
                }
                
                # Execute the insert
                cursor.execute(insert_query, (
                    data['sitename'],
                    data['humidity'],
                    data['temperature'],
                    data['noise'],
                    data['pm2_5'],
                    data['pm10'],
                    data['received_time'],
                    data['reported_time_utc']
                ))
                success_count += 1
                
            except Exception as row_error:
                error_count += 1
                print(f"\nError inserting row {index + 1}: {str(row_error)}")
                print("Problematic row data:")
                print(row.to_dict())
                print("Current data being processed:")
                print(data if 'data' in locals() else "Data not processed")
                continue
        
        # Commit the transaction
        conn.commit()
        print(f"\nData insertion complete:")
        print(f"- Successfully inserted: {success_count} rows")
        print(f"- Failed to insert: {error_count} rows")
        
    except Exception as e:
        print(f"\nMajor error: {str(e)}")
        if 'conn' in locals():
            conn.rollback()
    finally:
        if 'cursor' in locals():
            cursor.close()
        if 'conn' in locals():
            conn.close()

if __name__ == "__main__":
    if not os.path.exists(CSV_FILE_PATH):
        print(f"Error: CSV file not found at {CSV_FILE_PATH}")
    else:
        print("Starting data import...")
        insert_data_from_csv()

Starting data import...

Data insertion complete:
- Successfully inserted: 2000 rows
- Failed to insert: 0 rows


In [14]:
import psycopg2
from psycopg2 import sql
from psycopg2.extras import Json
from urllib.parse import urlparse
import pandas as pd
from datetime import datetime
import os
import json
import csv
from typing import Optional, Dict, Any, List

# Configuration
DATABASE_URL = "postgresql://postgres:root@localhost:5432/postgres?schema=public"
CSV_FILE_PATH = "ysi_data.csv"
BATCH_SIZE = 100

# Define NULL handling for all float fields according to Prisma schema
FLOAT_FIELDS = {
    'cond': None,
    'depth': None,
    'nlf': None,
    'odo_sat': None,
    'odo_cb': None,
    'odo': None,
    'pressure': None,
    'sal': None,
    'spcond': None,
    'tds': None,
    'turbidity': None,
    'tss': None,
    'wiper_position': None,
    'temp': None,
    'vertical_position': None,
}

def parse_db_url(db_url: str) -> Dict[str, Any]:
    """Extract connection parameters from DATABASE_URL"""
    result = urlparse(db_url)
    return {
        'host': result.hostname,
        'database': result.path[1:],
        'user': result.username,
        'password': result.password,
        'port': result.port,
    }

def clean_float_value(value: Any, field_name: str) -> Optional[float]:
    """Safely convert any value to float for database insertion"""
    if pd.isna(value) or value in ['', 'NaN', 'nan', 'NAN', None]:
        return FLOAT_FIELDS.get(field_name)
    
    try:
        if isinstance(value, str):
            value = value.strip().replace(',', '.')
            if value.lower() in ['inf', '+inf', '-inf', 'infinity', '+infinity', '-infinity']:
                return None
            value = ''.join(c for c in value if c.isdigit() or c in '.-+eE')
        
        float_value = float(value)
        if pd.isna(float_value) or float_value in [float('inf'), float('-inf')]:
            return None
        return float_value
        
    except (ValueError, TypeError) as e:
        print(f"Warning: Could not convert {field_name} value '{value}' to float: {e}")
        return FLOAT_FIELDS.get(field_name)

def parse_timestamp(ts_str: str) -> Optional[datetime]:
    """Parse timestamps with automatic format detection"""
    if pd.isna(ts_str) or ts_str == '' or ts_str is None:
        return None
    
    try:
        if isinstance(ts_str, str):
            ts_str = ts_str.replace('-', '/').replace('\\', '/')
            
            if ' ' in ts_str:
                date_part, time_part = ts_str.split(' ', 1)
            else:
                date_part = ts_str
                time_part = '00:00:00'
            
            date_formats = ['%Y/%m/%d', '%m/%d/%Y', '%d/%m/%Y']
            parsed_date = None
            for fmt in date_formats:
                try:
                    parsed_date = datetime.strptime(date_part, fmt)
                    break
                except ValueError:
                    continue
            
            if not parsed_date:
                return None
                
            time_parts = time_part.split(':')
            hour = int(time_parts[0])
            minute = int(time_parts[1]) if len(time_parts) > 1 else 0
            second = 0
            microsecond = 0
            
            if len(time_parts) > 2:
                seconds_part = time_parts[2]
                if '.' in seconds_part:
                    second = int(seconds_part.split('.')[0])
                    microsecond = int(float(f"0.{seconds_part.split('.')[1]}") * 1e6)
                else:
                    second = int(seconds_part)
            
            return datetime(
                parsed_date.year, parsed_date.month, parsed_date.day,
                hour, minute, second, microsecond
            )
    except Exception as e:
        print(f"Error parsing timestamp {ts_str}: {e}")
    return None

def transform_payload(payload_str: str) -> Optional[Dict[str, Any]]:
    """Transform payload into JSON object, returns None for non-JSON data"""
    if pd.isna(payload_str) or payload_str == '' or payload_str is None:
        return None
    
    payload_str = payload_str.strip().strip('"').strip("'")
    
    try:
        # Only return proper JSON objects
        payload = json.loads(payload_str)
        if isinstance(payload, dict):  # Only accept dictionary-style JSON
            return payload
        return None
    except json.JSONDecodeError:
        return None

def process_csv_file(file_path: str) -> List[Dict[str, Any]]:
    """Read and process CSV file with robust error handling"""
    try:
        with open(file_path, 'rb') as f:
            raw_data = f.read(1024)
            detected_encoding = 'utf-8'
        
        for encoding in ['utf-8', 'latin-1', 'windows-1252']:
            try:
                raw_data.decode(encoding)
                detected_encoding = encoding
                break
            except UnicodeDecodeError:
                continue
                
        with open(file_path, 'r', encoding=detected_encoding) as f:
            try:
                dialect = csv.Sniffer().sniff(f.read(1024))
                f.seek(0)
            except:
                dialect = csv.excel
                f.seek(0)
                
            reader = csv.DictReader(f, dialect=dialect)
            data = list(reader)
            
            if data:
                fieldnames = [name.strip().replace(' ', '_').replace('-', '_').lower() 
                             for name in reader.fieldnames]
                data = [{fieldnames[i]: v for i, (k, v) in enumerate(row.items())} 
                        for row in data]
            
            return data
            
    except Exception as e:
        print(f"Error reading CSV file: {e}")
        raise

def validate_row(row: Dict[str, Any]) -> bool:
    """Validate a row of data before insertion"""
    if 'id' not in row or pd.isna(row['id']):
        print("Missing required field: id")
        return False
        
    try:
        int(row['id'])
    except (ValueError, TypeError):
        print(f"Invalid ID value: {row.get('id')}")
        return False
        
    return True

def prepare_database_values(row: Dict[str, Any]) -> tuple:
    """Prepare all values for database insertion with proper NULL handling"""
    values = {
        'id': int(row['id']),
        'timestamp': parse_timestamp(row.get('timestamp'))
    }
    
    for field in FLOAT_FIELDS:
        values[field] = clean_float_value(row.get(field), field)
    
    values.update({
        'battery': str(row['battery']) if pd.notna(row.get('battery')) else None,
        'cable_pwr': str(row['cable_pwr']) if pd.notna(row.get('cable_pwr')) else None,
        'payload': Json(transform_payload(row.get('payload'))) if transform_payload(row.get('payload')) else None,
        'device_id': str(row.get('device_id', '')),
        'ip': str(row.get('ip', ''))
    })
    
    return tuple(values.values())

def insert_batch(cursor, batch: List[Dict[str, Any]]) -> int:
    """Insert a batch of rows with comprehensive error handling"""
    insert_query = sql.SQL("""
        INSERT INTO water_quality(
            id, timestamp, cond, depth, nlf, odo_sat, odo_cb, odo, 
            pressure, sal, spcond, tds, turbidity, tss, 
            wiper_position, temp, vertical_position, battery, 
            cable_pwr, payload, device_id, ip)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 
                %s, %s, %s, %s, %s, %s::jsonb, %s, %s)
        ON CONFLICT (id) DO NOTHING
    """)
    
    success_count = 0
    for row in batch:
        if not validate_row(row):
            continue
            
        try:
            data = prepare_database_values(row)
            cursor.execute(insert_query, data)
            success_count += 1
        except Exception as e:
            print(f"Error inserting row {row.get('id')}: {e}")
            print(f"Problematic values - Cond: {row.get('cond')}, Wiper_Position: {row.get('wiper_position')}")
            
    return success_count

def main():
    if not os.path.exists(CSV_FILE_PATH):
        print(f"Error: CSV file not found at {CSV_FILE_PATH}")
        return

    print("Starting water quality data import...")
    start_time = datetime.now()
    
    try:
        print("Reading and processing CSV file...")
        csv_data = process_csv_file(CSV_FILE_PATH)
        print(f"Found {len(csv_data)} rows in CSV file")
        
        if not csv_data:
            print("No data found in CSV file")
            return
            
        print("Connecting to database...")
        db_params = parse_db_url(DATABASE_URL)
        conn = psycopg2.connect(**db_params)
        cursor = conn.cursor()
        
        total_success = 0
        total_batches = (len(csv_data) // BATCH_SIZE) + 1
        
        print(f"\nStarting batch processing ({total_batches} batches)...")
        for i in range(0, len(csv_data), BATCH_SIZE):
            batch = csv_data[i:i+BATCH_SIZE]
            batch_num = (i // BATCH_SIZE) + 1
            
            print(f"\nProcessing batch {batch_num}/{total_batches} (rows {i+1}-{min(i+BATCH_SIZE, len(csv_data))})")
            
            success = insert_batch(cursor, batch)
            conn.commit()
            total_success += success
            
            print(f"Inserted {success}/{len(batch)} rows successfully")
            print(f"Overall progress: {min(100, (i + BATCH_SIZE) / len(csv_data) * 100):.1f}%")
        
        elapsed = datetime.now() - start_time
        print(f"\nImport complete in {elapsed.total_seconds():.2f} seconds")
        print(f"Total rows processed: {len(csv_data)}")
        print(f"Successfully inserted: {total_success}")
        print(f"Failed rows: {len(csv_data) - total_success}")
        
    except Exception as e:
        print(f"\nFatal error during import: {e}")
        if 'conn' in locals():
            conn.rollback()
    finally:
        if 'cursor' in locals():
            cursor.close()
        if 'conn' in locals():
            conn.close()

if __name__ == "__main__":
    main()

Starting water quality data import...
Reading and processing CSV file...
Found 2000 rows in CSV file
Connecting to database...

Starting batch processing (21 batches)...

Processing batch 1/21 (rows 1-100)
Inserted 100/100 rows successfully
Overall progress: 5.0%

Processing batch 2/21 (rows 101-200)
Inserted 100/100 rows successfully
Overall progress: 10.0%

Processing batch 3/21 (rows 201-300)
Inserted 100/100 rows successfully
Overall progress: 15.0%

Processing batch 4/21 (rows 301-400)
Inserted 100/100 rows successfully
Overall progress: 20.0%

Processing batch 5/21 (rows 401-500)
Inserted 100/100 rows successfully
Overall progress: 25.0%

Processing batch 6/21 (rows 501-600)
Inserted 100/100 rows successfully
Overall progress: 30.0%

Processing batch 7/21 (rows 601-700)
Inserted 100/100 rows successfully
Overall progress: 35.0%

Processing batch 8/21 (rows 701-800)
Inserted 100/100 rows successfully
Overall progress: 40.0%

Processing batch 9/21 (rows 801-900)
Inserted 100/100 ro