# Global data-intensive project, part 02: basic OLTP data model

In [2]:
!pip install psycopg2



# Importing neccesary modules

In [3]:
from sqlalchemy import create_engine, Column, Integer, String, Text, Float, ForeignKey, DECIMAL, TIMESTAMP, text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import declarative_base, relationship, sessionmaker,Session
from datetime import datetime
import time
from dotenv import load_dotenv
import os
from sqlalchemy import MetaData
from concurrent.futures import ThreadPoolExecutor
from psycopg2 import Error as Psycopg2Error
import psycopg2
from tqdm import tqdm
from sqlalchemy import insert

# Loading the Database url from .env file

In [4]:
load_dotenv()
database_url = os.getenv("DATABASE_URL")

# Initializing Database

In [5]:
engine = create_engine(
    database_url,
    echo=False,
)

Base = declarative_base()

metadata = MetaData()
with engine.connect() as connection:
    connection.execute(text("CREATE SCHEMA IF NOT EXISTS optimisedurbanpulse"))
    connection.commit()

# Citizen Table

In [6]:
class Citizen(Base):
    __tablename__ = "citizen"
    __table_args__ = {"schema": "optimisedurbanpulse"}
    citizen_id = Column(Integer, primary_key=True, autoincrement=True)
    first_name = Column(String(50))
    last_name = Column(String(50))
    full_name = Column(String(101))
    sex = Column(String(10))
    email = Column(String(255), unique=True)
    contact_number = Column(String(20))
    password = Column(String(255))
    address = Column(Text)
    created_at = Column(TIMESTAMP, default=datetime.utcnow)

    issues = relationship("Issue", back_populates="citizen")
    votes = relationship("Vote", back_populates="citizen")


# Department Table

In [7]:
class Department(Base):
    __tablename__ = "department"
    __table_args__ = {"schema": "optimisedurbanpulse"}
    department_id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(100), unique=True)
    budget_points = Column(DECIMAL(12, 2))
    created_at = Column(TIMESTAMP, default=datetime.utcnow)

# Official Table

In [8]:
class Official(Base):
    __tablename__ = "official"
    __table_args__ = {"schema": "optimisedurbanpulse"}
    official_id = Column(Integer, primary_key=True, autoincrement=True)
    first_name = Column(String(50))
    last_name = Column(String(50))
    full_name = Column(String(101))
    sex = Column(String(10))
    email = Column(String(255), unique=True)
    contact_number = Column(String(20))
    password = Column(String(255))
    address = Column(Text)
    # Fully qualify the referenced table name in the ForeignKey string:
    department_id = Column(Integer, ForeignKey("optimisedurbanpulse.department.department_id"))
    created_at = Column(TIMESTAMP, default=datetime.utcnow)

# Issue Table

In [9]:
class Issue(Base):
    __tablename__ = "issue"
    __table_args__ = {"schema": "optimisedurbanpulse"}
    issue_id = Column(Integer, primary_key=True, autoincrement=True)
    citizen_id = Column(Integer, ForeignKey("optimisedurbanpulse.citizen.citizen_id"))
    description = Column(Text)
    category = Column(String(50))
    priority_level = Column(Integer)
    latitude = Column(Float)
    longitude = Column(Float)
    status = Column(String(20))
    created_at = Column(TIMESTAMP, default=datetime.utcnow)

    citizen = relationship("Citizen", back_populates="issues")
    votes = relationship("Vote", back_populates="issue")

# Photo Table

In [10]:
class Photo(Base):
    __tablename__ = "photo"
    __table_args__ = {"schema": "optimisedurbanpulse"}
    photo_id = Column(Integer, primary_key=True, autoincrement=True)
    issue_id = Column(Integer, ForeignKey("optimisedurbanpulse.issue.issue_id"))
    photo_url = Column(String(512))
    created_at = Column(TIMESTAMP, default=datetime.utcnow)

# Vote Table

In [11]:
class Vote(Base):
    __tablename__ = "vote"
    __table_args__ = {"schema": "optimisedurbanpulse"}
    vote_id = Column(Integer, primary_key=True, autoincrement=True)
    citizen_id = Column(Integer, ForeignKey("optimisedurbanpulse.citizen.citizen_id"))
    issue_id = Column(Integer, ForeignKey("optimisedurbanpulse.issue.issue_id"))
    priority_vote = Column(Integer)
    created_at = Column(TIMESTAMP, default=datetime.utcnow)

    citizen = relationship("Citizen", back_populates="votes")
    issue = relationship("Issue", back_populates="votes")

# Transaction Table

In [12]:
class Transaction(Base):
    __tablename__ = "transaction"
    __table_args__ = {"schema": "optimisedurbanpulse"}
    transaction_id = Column(Integer, primary_key=True, autoincrement=True)
    issue_id = Column(Integer, ForeignKey("optimisedurbanpulse.issue.issue_id"))
    official_id = Column(Integer, ForeignKey("optimisedurbanpulse.official.official_id"))
    budget_spent = Column(DECIMAL(10, 2))
    status = Column(String(20))
    created_at = Column(TIMESTAMP, default=datetime.utcnow)

# Database setup

In [13]:
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()

#  Sample data

In [14]:
# Departments
dept1 = Department(name="Infrastructure", budget_points=50000)
dept2 = Department(name="Health", budget_points=75000)

# Officials
official1 = Official(first_name="John", last_name="Doe", full_name="John Doe",
                     sex="Male", email="johndoe@email.com", contact_number="1234567890",
                     password="hashedpassword", address="City Hall, NY", department_id=1)

official2 = Official(first_name="Jane", last_name="Smith", full_name="Jane Smith",
                     sex="Female", email="janesmith@email.com", contact_number="0987654321",
                     password="hashedpassword", address="Health Office, NY", department_id=2)

# Citizens
citizen1 = Citizen(first_name="Alice", last_name="Brown", full_name="Alice Brown",
                   sex="Female", email="alice@email.com", contact_number="1112223333",
                   password="hashedpassword", address="123 Main St, NY")

citizen2 = Citizen(first_name="Bob", last_name="Miller", full_name="Bob Miller",
                   sex="Male", email="bob@email.com", contact_number="4445556666",
                   password="hashedpassword", address="456 Elm St, NY")

# Issues
issue1 = Issue(citizen_id=1, description="Broken streetlights",
               category="Infrastructure", priority_level=2, latitude=40.7128, longitude=-74.0060,
               status="Pending")

issue2 = Issue(citizen_id=2, description="Public park needs maintenance",
               category="Environment", priority_level=3, latitude=40.7138, longitude=-74.0070,
               status="Pending")

# Add parent objects first
session.add_all([
    dept1, dept2,
    official1, official2,
    citizen1, citizen2,
    issue1, issue2
])
session.flush()

# Photos
photo1 = Photo(issue_id=issue1.issue_id, photo_url="https://example.com/photo1.jpg")
photo2 = Photo(issue_id=issue2.issue_id, photo_url="https://example.com/photo2.jpg")

# Votes
vote1 = Vote(citizen_id=1, issue_id=issue1.issue_id, priority_vote=5)
vote2 = Vote(citizen_id=2, issue_id=issue2.issue_id, priority_vote=4)

# Transactions
transaction1 = Transaction(issue_id=issue1.issue_id, official_id=1, budget_spent=1000, status="Approved")
transaction2 = Transaction(issue_id=issue2.issue_id, official_id=2, budget_spent=2000, status="Pending")

# Add dependent objects
session.add_all([
    photo1, photo2,
    vote1, vote2,
    transaction1, transaction2
])
session.commit()

# Creating new citizen

In [15]:
def create_new_citizen(first_name, last_name, sex, email, contact_number, password, address):
    session = Session()
    citizen = Citizen(
        first_name=first_name,
        last_name=last_name,
        full_name=f"{first_name} {last_name}",
        sex=sex,
        email=email,
        contact_number=contact_number,
        password=password,
        address=address
    )
    session.add(citizen)
    session.commit()
    session.close()
    print(f"Created citizen: {citizen.full_name}")
    return citizen

# Report issue

In [16]:
def report_issue(citizen_id, description, category, priority_level, latitude, longitude, status="Pending"):
    session = Session()
    try:
        issue = Issue(
            citizen_id=citizen_id,
            description=description,
            category=category,
            priority_level=priority_level,
            latitude=latitude,
            longitude=longitude,
            status=status
        )
        session.add(issue)
        session.commit()
        issue_id = issue.issue_id  # Extract ID while session is active
        # print(f"Issue reported: {description}")
        return issue_id  # Return the ID instead of the Issue object
    finally:
        session.close()

# Cast vote

In [17]:
def cast_vote(citizen_id, issue_id, priority_vote):
    session = Session()
    vote = Vote(
        citizen_id=citizen_id,
        issue_id=issue_id,
        priority_vote=priority_vote
    )
    session.add(vote)
    session.commit()
    session.close()
    return vote

# Performance Test Functions

### Measures the time taken to report a large number of issues

In [18]:
def performance_test_report_issue(num_iterations):
    start = time.time()
    session = Session()
    try:
        # Get or create a citizen once
        citizen = session.query(Citizen).first()
        if not citizen:
            citizen = create_new_citizen(
                "Test", "User", "Other", "testuser@example.com",
                "0000000000", "password", "Test Address"
            )

        # Prepare data for bulk insertion
        issue_data = [
            {
                "citizen_id": citizen.citizen_id,
                "description": f"Issue {i}",
                "category": "Infrastructure",
                "priority_level": 3,
                "latitude": 12.34,
                "longitude": 56.78,
                "status": "Pending",
                "created_at": datetime.utcnow()
            } for i in tqdm(range(num_iterations))
        ]

        # Perform bulk insertion using SQLAlchemy Core
        session.execute(
            insert(Issue.__table__),
            issue_data
        )
        session.commit()  # Single commit for all issues
    finally:
        session.close()
    end = time.time()
    print(f"Performance Test - Reporting {num_iterations} issues took {end - start:.2f} seconds.")

### Measures the time taken to cast a large number of votes

In [19]:
def performance_test_cast_vote(num_iterations):
    start = time.time()
    session = Session()
    try:
        # Get or create a citizen and issue once
        citizen = session.query(Citizen).first()
        issue = session.query(Issue).first()
        if not citizen:
            citizen = create_new_citizen(
                "Test", "User2", "Other", "testuser2@example.com",
                "0000000000", "password", "Test Address 2"
            )
        if not issue:
            issue_id = report_issue(
                citizen.citizen_id, "Test issue for votes", "Infrastructure",
                3, 12.34, 56.78
            )
            session.commit()  # Commit the issue to get issue_id
        else:
            issue_id = issue.issue_id

        # Prepare data for bulk insertion
        vote_data = [
            {
                "citizen_id": citizen.citizen_id,
                "issue_id": issue_id,
                "priority_vote": 3,
                "created_at": datetime.utcnow()
            } for _ in tqdm(range(num_iterations))
        ]

        # Perform bulk insertion using SQLAlchemy Core
        session.execute(
            insert(Vote.__table__),
            vote_data
        )
        session.commit()  # Single commit for all votes
    finally:
        session.close()
    end = time.time()
    print(f"Performance Test - Casting {num_iterations} votes took {end - start:.2f} seconds.")

# Transaction Isolation Levels Demonstration
`READ COMMITTED`, `REPEATABLE READ`, and `SERIALIZABLE`

In [20]:
def process_issue_transaction(issue_id, official_id, budget_spent, isolation_level):
    """
    Process a transaction without explicit locking to demonstrate isolation level effects.
    """
    session = Session()
    try:
        # Set the isolation level for this transaction
        session.connection().connection.set_isolation_level(isolation_level)
        
        official = session.query(Official).filter(Official.official_id == official_id).one()
        # No with_for_update() - deliberately allowing race conditions
        department = session.query(Department)\
            .filter(Department.department_id == official.department_id)\
            .one()
        
        print(f"Transaction starting - Budget before: {department.budget_points}")
        if department.budget_points < budget_spent:
            raise Exception("Not enough budget in the department.")
        
        # Simulate some work to increase chance of race conditions
        time.sleep(0.1)
        
        department.budget_points -= budget_spent
        transaction_record = Transaction(
            issue_id=issue_id,
            official_id=official_id,
            budget_spent=budget_spent,
            status='Approved'
        )
        session.add(transaction_record)
        session.commit()
        print(f"Transaction completed - Budget after: {department.budget_points}")
        return transaction_record
    except Exception as e:
        session.rollback()
        print(f"Transaction failed: {e}")
        raise  # Re-raise to catch in isolation test
    finally:
        session.close()
    return None

def isolation_test_process_issue_transaction():
    """
    Demonstrate errors/behaviors for different isolation levels.
    """
    # Setup initial data
    session = Session()
    try:
        # Create or get department
        department = session.query(Department).filter_by(name='Isolation Test Dept').first()
        if not department:
            department = Department(name='Isolation Test Dept', budget_points=1000.00)
            session.add(department)
            session.commit()

        # Create or get official
        official = session.query(Official).filter_by(email='isol@official.com').first()
        if not official:
            official = Official(
                first_name='Iso', last_name='Test', full_name='Iso Test', sex='Other',
                email='isol@official.com', contact_number='111111', password='password',
                address='Test Address', department_id=department.department_id
            )
            session.add(official)
            session.commit()

        # Create or get citizen and issue
        citizen = session.query(Citizen).first()
        if not citizen:
            citizen = create_new_citizen("Iso", "Citizen", "Other", "iso@citizen.com", "2222222222", "password", "Test Address")
        issue_id = report_issue(citizen.citizen_id, "Isolation test issue", "Infrastructure", 3, 12.34, 56.78)
        official_id = official.official_id

    finally:
        session.close()

    # Define wrapper with specific isolation level
    def process_transaction_wrapper(budget_spent, isolation_level):
        return process_issue_transaction(issue_id, official_id, budget_spent, isolation_level)

    # Test each isolation level
    isolation_levels = [
        ("READ_COMMITTED", psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED),
        ("REPEATABLE_READ", psycopg2.extensions.ISOLATION_LEVEL_REPEATABLE_READ),
        ("SERIALIZABLE", psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
    ]

    for level_name, level_value in isolation_levels:
        print(f"\n=== Testing {level_name} ===")
        
        # Reset budget to 1000 before each test
        session = Session()
        try:
            dept = session.query(Department).filter_by(name='Isolation Test Dept').first()
            dept.budget_points = 1000.00
            session.commit()
        finally:
            session.close()

        # Run concurrent transactions
        try:
            with ThreadPoolExecutor(max_workers=2) as executor:
                future1 = executor.submit(process_transaction_wrapper, 700, level_value)
                future2 = executor.submit(process_transaction_wrapper, 700, level_value)
                result1 = future1.result()
                result2 = future2.result()

            # Check final budget
            session = Session()
            final_budget = session.query(Department).filter_by(name='Isolation Test Dept').first().budget_points
            session.close()
            print(f"Final budget after {level_name}: {final_budget}")

        except Psycopg2Error as e:
            print(f"Database error during {level_name}: {str(e)}")
        except Exception as e:
            print(f"General error during {level_name}: {str(e)}")

    print("\n=== Demonstration Complete ===")

# Main block to run tests

In [21]:
if __name__ == '__main__':
    # Optional: Reset sequence or clear tables
    with engine.connect() as connection:
        connection.execute(text("SELECT setval('optimisedurbanpulse.issue_issue_id_seq', (SELECT COALESCE(MAX(issue_id), 0) + 1 FROM optimisedurbanpulse.issue))"))
        connection.commit()

    # Run performance tests
    performance_test_report_issue(num_iterations=1000)
    performance_test_cast_vote(num_iterations=1000)

    # Run isolation test
    isolation_test_process_issue_transaction()

  "created_at": datetime.utcnow()
100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 1000/1000 [00:00<00:00, 356506.93it/s]


Performance Test - Reporting 1000 issues took 0.58 seconds.


  "created_at": datetime.utcnow()
100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 1000/1000 [00:00<00:00, 332327.39it/s]


Performance Test - Casting 1000 votes took 0.45 seconds.

=== Testing READ_COMMITTED ===
Transaction starting - Budget before: 1000.00
Transaction starting - Budget before: 1000.00
Transaction completed - Budget after: 300.00
Transaction completed - Budget after: 300.00
Final budget after READ_COMMITTED: 300.00

=== Testing REPEATABLE_READ ===
Transaction starting - Budget before: 1000.00
Transaction starting - Budget before: 1000.00
Transaction failed: (psycopg2.errors.SerializationFailure) could not serialize access due to concurrent update

[SQL: UPDATE optimisedurbanpulse.department SET budget_points=%(budget_points)s WHERE optimisedurbanpulse.department.department_id = %(optimisedurbanpulse_department_department_id)s]
[parameters: {'budget_points': Decimal('300.00'), 'optimisedurbanpulse_department_department_id': 3}]
(Background on this error at: https://sqlalche.me/e/20/e3q8)
Transaction completed - Budget after: 300.00
General error during REPEATABLE_READ: (psycopg2.errors.Seri