In [1]:
!pip install python-dotenv

Collecting python-dotenv
  Downloading python_dotenv-1.2.1-py3-none-any.whl.metadata (25 kB)
Downloading python_dotenv-1.2.1-py3-none-any.whl (21 kB)
Installing collected packages: python-dotenv
Successfully installed python-dotenv-1.2.1


In [2]:
!pip install psycopg2-binary

Collecting psycopg2-binary
  Downloading psycopg2_binary-2.9.11-cp311-cp311-manylinux2014_x86_64.manylinux_2_17_x86_64.whl.metadata (4.9 kB)
Downloading psycopg2_binary-2.9.11-cp311-cp311-manylinux2014_x86_64.manylinux_2_17_x86_64.whl (4.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.2/4.2 MB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hInstalling collected packages: psycopg2-binary
Successfully installed psycopg2-binary-2.9.11


In [3]:
import pandas as pd
from sqlalchemy import create_engine, text
import time
import os
from dotenv import load_dotenv
import re

load_dotenv()
db_uri = os.getenv('DATABASE_URL')
db = create_engine(db_uri)
CSV_PATH = "/home/jovyan/data/enron_mails.csv" 

with db.connect() as connection:
    connection.execute(text("TRUNCATE TABLE emails RESTART IDENTITY;"))
    connection.commit()

start_time = time.time()

chunksize = 20000 
total_rows = 0

use_cols = {
    'Message-ID': 'message_id',
    'Date': 'date_raw',
    'From': 'sender',
    'To': 'recipients',
    'Subject': 'subject',
    'X-Folder': 'folder',
    'body': 'body_text',
    'body_length': 'body_length',
    'directory': 'file_path'
}

for chunk in pd.read_csv(CSV_PATH, usecols=use_cols.keys(), chunksize=chunksize, low_memory=False):
    chunk.rename(columns=use_cols, inplace=True)
    
    clean_dates_full = chunk['date_raw'].astype(str).str.replace(r'\s\([A-Z]{3,4}\)$', '', regex=True)
    
    mask_bad_years = clean_dates_full.str.contains(r'\s000[12]\s', regex=True)
    clean_dates_full.loc[mask_bad_years] = None
    
    chunk['sent_at_utc'] = pd.to_datetime(
        clean_dates_full, 
        format="%a, %d %b %Y %H:%M:%S %z", 
        utc=True, 
        errors='coerce'
    )
    
    clean_dates_wallclock = clean_dates_full.str.replace(r'\s[-+]\d{4}$', '', regex=True)
    
    chunk['sent_at_local'] = pd.to_datetime(
        clean_dates_wallclock,
        format="%a, %d %b %Y %H:%M:%S",
        errors='coerce'
    )
    
    db_column_order = [
        'message_id', 'sent_at_utc', 'sent_at_local', 'sender', 'recipients',
        'subject', 'body_text', 'folder', 'file_path', 'body_length', 'date_raw'
    ]

    chunk = chunk.reindex(columns=db_column_order)
    chunk.to_sql('emails', db, if_exists='append', index=False)
    
    total_rows += len(chunk)
    print(f"{total_rows} Emails imported... (Duration: {time.time() - start_time:.1f}s)")

print(f"DONE. Import completed in {time.time() - start_time:.2f}s")

20000 Emails imported... (Duration: 4.3s)
40000 Emails imported... (Duration: 8.3s)
60000 Emails imported... (Duration: 13.5s)
80000 Emails imported... (Duration: 21.5s)
100000 Emails imported... (Duration: 26.5s)
120000 Emails imported... (Duration: 30.3s)
140000 Emails imported... (Duration: 34.3s)
160000 Emails imported... (Duration: 39.3s)
180000 Emails imported... (Duration: 43.7s)
200000 Emails imported... (Duration: 47.7s)
220000 Emails imported... (Duration: 51.8s)
240000 Emails imported... (Duration: 57.4s)
260000 Emails imported... (Duration: 63.7s)
280000 Emails imported... (Duration: 68.2s)
300000 Emails imported... (Duration: 72.6s)
320000 Emails imported... (Duration: 76.3s)
340000 Emails imported... (Duration: 80.8s)
360000 Emails imported... (Duration: 84.6s)
380000 Emails imported... (Duration: 88.4s)
400000 Emails imported... (Duration: 92.5s)
420000 Emails imported... (Duration: 96.8s)
440000 Emails imported... (Duration: 101.8s)
460000 Emails imported... (Duration: 