# User DB CRUD walkthrough

Prerequisites:
- Postgres running via `backend/src/user_db/docker-compose.yml` with a matching `DATABASE_URL`.
- Tables created with `python -m src.user_db.init_db` after the database is up.

These examples use the SQLAlchemy models and session helper from `src.user_db`. Adjust the sample data to suit your environment.


In [8]:
import os
from pathlib import Path
import sys

from dotenv import load_dotenv

# Load DATABASE_URL from backend/src/user_db/.env (preferred)
candidates = [
    Path.cwd() / "src" / "user_db" / ".env",
    Path.cwd().parent / "src" / "user_db" / ".env",
]

env_path = next((p for p in candidates if p.exists()), None)
if not env_path:
    raise FileNotFoundError("Could not find src/user_db/.env; adjust the path if you are running from elsewhere.")

load_dotenv(env_path)
DATABASE_URL = os.environ.get("DATABASE_URL")
if not DATABASE_URL:
    raise RuntimeError("DATABASE_URL is required; set it in src/user_db/.env")

os.environ["DATABASE_URL"] = DATABASE_URL
print(f"Loaded DATABASE_URL from {env_path}")


# Ensure project root (where `src` lives) is on sys.path
candidates = [Path.cwd(), Path.cwd().parent, Path.cwd().parent.parent]
project_root = next((p for p in candidates if (p / "src").exists()), None)
if not project_root:
    raise FileNotFoundError("Could not locate project root containing src/; adjust path logic.")

if str(project_root) not in sys.path:
    sys.path.insert(0, str(project_root))

print(f"Using project root for imports: {project_root}")


Loaded DATABASE_URL from /Users/samgreenwood/code/personal_development/room-booking-bot/dish-booking-agent/backend/src/user_db/.env
Using project root for imports: /Users/samgreenwood/code/personal_development/room-booking-bot/dish-booking-agent/backend


In [9]:
from typing import Iterable

from sqlalchemy import select

from src.user_db.user_db import session_scope
from src.user_db import models


def format_user(user: models.User) -> str:
    return f"{user.id} | {user.email} | active={user.is_active}"


def list_users() -> list[models.User]:
    with session_scope() as session:
        rows = session.scalars(select(models.User).order_by(models.User.created_at)).all()
    if rows:
        for row in rows:
            print(format_user(row))
    else:
        print("No users yet.")
    return rows


## Create a user

Use a strong, hashed password here; this example uses a placeholder for clarity.


In [10]:
import bcrypt


def hash_password(password: str, rounds: int = 12) -> str:
    """Hash a password with bcrypt (adjust rounds for cost)."""
    return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt(rounds)).decode("utf-8")


def verify_password(password: str, hashed: str) -> bool:
    try:
        return bcrypt.checkpw(password.encode("utf-8"), hashed.encode("utf-8"))
    except ValueError:
        return False


new_email = "alice@example.com"
password_hash = hash_password("demo-password")

with session_scope() as session:
    user = models.User(email=new_email, password_hash=password_hash)
    session.add(user)
    session.flush()  # ensures we get the generated ID before commit
    created_id = user.id

print(f"Created user {new_email} -> {created_id}")


Created user alice@example.com -> fda7a85a-5e7e-4a98-bb8c-8c53b1d1f10e


## List users


In [11]:
users = list_users()
created_id = created_id if 'created_id' in globals() else (users[0].id if users else None)
created_id


fda7a85a-5e7e-4a98-bb8c-8c53b1d1f10e | alice@example.com | active=True


UUID('fda7a85a-5e7e-4a98-bb8c-8c53b1d1f10e')

## Read one user


In [12]:
target_email = new_email

with session_scope() as session:
    user = session.scalar(select(models.User).where(models.User.email == target_email))

print(user)
format_user(user) if user else "Not found"


<src.user_db.models.User object at 0x114f0e630>


'fda7a85a-5e7e-4a98-bb8c-8c53b1d1f10e | alice@example.com | active=True'

## Update a user


In [17]:
new_password_hash = hash_password("new-demo-hash")

with session_scope() as session:
    user = session.get(models.User, created_id)
    if not user:
        raise ValueError("User not found; ensure created_id is set")
    user.password_hash = new_password_hash
    user.is_active = True

list_users()


fda7a85a-5e7e-4a98-bb8c-8c53b1d1f10e | alice@example.com | active=True


[<src.user_db.models.User at 0x114f0f080>]

## Authenticate a user with bcrypt


In [20]:
login_email = new_email
login_password = "new-demo-hash"

with session_scope() as session:
    user = session.scalar(select(models.User).where(models.User.email == login_email))
    if user and user.is_active and verify_password(login_password, user.password_hash):
        print(f"Authenticated: {user.email}")
    else:
        print("Invalid credentials or inactive user")



Authenticated: alice@example.com


## Delete a user (cascade deletes secrets)

After trying auth, you can clean up the test user.

In [7]:
with session_scope() as session:
    user = session.get(models.User, created_id)
    if not user:
        print("User not found; nothing to delete")
    else:
        session.delete(user)
        print(f"Deleted user {user.email}")

list_users()


Deleted user alice@example.com
No users yet.


[]