# Module: SQLite3 Assignments

## Lesson: SQLite3


### Assignment 1: Creating and Connecting to a Database

1. Write a Python function to create a new SQLite3 database named `test.db`.
2. Write a Python function to create a table named `employees` with columns `id` (integer), `name` (text), `age` (integer), and `department` (text) in the `test.db` database.


In [12]:
import sqlite3

def create_database():
    conn = sqlite3.connect('test.db')
    conn.close()
    print("Database Created Successfully")

create_database()

Database Created Successfully


In [13]:
def create_table():
    conn = sqlite3.connect('test.db')
    cursor = conn.cursor()

    cursor.execute("""
        CREATE TABLE IF NOT EXISTS employees(
            id INTEGER PRIMARY KEY,
            name TEXT NOT NULL,
            age INTEGER,
            department TEXT
        )
    """)

    conn.commit()
    conn.close()
    print("Table Created Successfully")

create_table()

Table Created Successfully


### Assignment 2: Inserting Data

1. Write a Python function to insert a new employee into the `employees` table.
2. Insert at least 5 different employees into the `employees` table.

In [14]:
def insert_employee(id, name, age, department):
    conn = sqlite3.connect('test.db')
    cursor = conn.cursor()

    cursor.execute("""
        INSERT INTO employees(id, name, age, department)
        VALUES(?,?,?,?)
    """, (id, name, age, department))

    conn.commit()
    conn.close()
    print(f"Employee {name} Inserted Successfully")

In [15]:
insert_employee(1, "Alice Johnson",  30, "Engineering")
insert_employee(2, "Bob Smith",      25, "Marketing")
insert_employee(3, "Carol Williams", 35, "HR")
insert_employee(4, "David Brown",    28, "Finance")
insert_employee(5, "Eva Davis",      40, "Engineering")

Employee Alice Johnson Inserted Successfully
Employee Bob Smith Inserted Successfully
Employee Carol Williams Inserted Successfully
Employee David Brown Inserted Successfully
Employee Eva Davis Inserted Successfully


#### Bonus: Insert Many at Once

In [16]:
def insert_many_employees(employees):
    conn = sqlite3.connect("test.db")
    cursor = conn.cursor()

    cursor.executemany("""
        INSERT INTO employees (id, name, age, department)
        VALUES (?, ?, ?, ?)
    """, employees)

    conn.commit()
    conn.close()
    print(f"{cursor.rowcount} employees inserted successfully.")

employees = [
    (10, "Johnson",  30, "Engineering"),
    (20, "Smith",      25, "Marketing"),
    (30, "Williams", 35, "HR"),
    (40, "Brown",    28, "Finance"),
    (50, "Davis",      40, "Engineering"),
]

insert_many_employees(employees)


5 employees inserted successfully.


### Assignment 3: Querying Data

1. Write a Python function to fetch and display all records from the `employees` table.
2. Write a Python function to fetch and display all employees from a specific department.

In [17]:
def fetch_all_employees():
    conn = sqlite3.connect("test.db")
    cursor = conn.cursor()

    cursor.execute("SELECT * FROM employees")
    rows = cursor.fetchall()

    conn.close()

    print(f"{'ID':<5} {'Name':<20} {'Age':<5} {'Department'}")
    print("-" * 45)
    for row in rows:
        print(f"{row[0]:<5} {row[1]:<20} {row[2]:<5} {row[3]}")

fetch_all_employees()


ID    Name                 Age   Department
---------------------------------------------
1     Alice Johnson        30    Engineering
2     Bob Smith            25    Marketing
3     Carol Williams       35    HR
4     David Brown          28    Finance
5     Eva Davis            40    Engineering
10    Johnson              30    Engineering
20    Smith                25    Marketing
30    Williams             35    HR
40    Brown                28    Finance
50    Davis                40    Engineering


In [20]:
def fetch_by_department(department):
    conn = sqlite3.connect("test.db")
    cursor = conn.cursor()

    cursor.execute("""
        SELECT * FROM employees
        WHERE department = ?
    """, (department,))         # Note the trailing comma — this makes it a tuple
    rows = cursor.fetchall()

    conn.close()

    if not rows:
        print(f"No employees found in '{department}' department.")
        return

    print(f"\nEmployees in '{department}' department:")
    print(f"{'ID':<5} {'Name':<20} {'Age':<5} {'Department'}")
    print("-" * 45)
    for row in rows:
        print(f"{row[0]:<5} {row[1]:<20} {row[2]:<5} {row[3]}")

fetch_by_department("Engineering")


Employees in 'Engineering' department:
ID    Name                 Age   Department
---------------------------------------------
1     Alice Johnson        30    Engineering
5     Eva Davis            40    Engineering
10    Johnson              30    Engineering
50    Davis                40    Engineering


```cursor.fetchall()```   # Returns ALL rows as a list of tuples → use for multiple rows

```cursor.fetchone()```   # Returns only the FIRST row as a tuple → use for single lookups

```cursor.fetchmany(n)``` # Returns next N rows as a list      → use for large datasets

### Assignment 4: Updating Data

1. Write a Python function to update the department of an employee based on their `id`.
2. Update the department of at least 2 employees and display the updated records.

In [22]:
def update_department(employee_id, new_department):
    conn = sqlite3.connect("test.db")
    cursor = conn.cursor()

    cursor.execute("""
        UPDATE employees
        SET department = ?
        WHERE id = ?
    """, (new_department, employee_id))

    conn.commit()

    if cursor.rowcount == 0:
        print(f"No employee found with ID {employee_id}. Nothing updated.")
    else:
        print(f"Employee ID {employee_id} moved to '{new_department}' department.")

    conn.close()

update_department(1, "HR")


Employee ID 1 moved to 'HR' department.


In [24]:
print("BEFORE UPDATE:")
fetch_all_employees()


update_department(2, "Sales")           # Bob: Marketing → Sales
update_department(4, "Engineering")     # David: Finance → Engineering

print("\nAFTER UPDATE:")
fetch_all_employees()


BEFORE UPDATE:
ID    Name                 Age   Department
---------------------------------------------
1     Alice Johnson        30    HR
2     Bob Smith            25    Marketing
3     Carol Williams       35    HR
4     David Brown          28    Finance
5     Eva Davis            40    Engineering
10    Johnson              30    Engineering
20    Smith                25    Marketing
30    Williams             35    HR
40    Brown                28    Finance
50    Davis                40    Engineering
Employee ID 2 moved to 'Sales' department.
Employee ID 4 moved to 'Engineering' department.

AFTER UPDATE:
ID    Name                 Age   Department
---------------------------------------------
1     Alice Johnson        30    HR
2     Bob Smith            25    Sales
3     Carol Williams       35    HR
4     David Brown          28    Engineering
5     Eva Davis            40    Engineering
10    Johnson              30    Engineering
20    Smith                25    Marketin

##### Key concepts explained
```cursor.rowcount``` — after any UPDATE, DELETE, or INSERT, SQLite sets rowcount to the number of rows actually affected. This lets you detect whether the operation did anything useful:

```pythoncursor.rowcount == 0```   # No rows matched the WHERE clause → bad ID

```cursor.rowcount == 1```   # Exactly one row updated → expected for ID lookups

```cursor.rowcount > 1```    # Multiple rows updated → expected for bulk updates


### Assignment 5: Deleting Data

1. Write a Python function to delete an employee from the `employees` table based on their `id`.
2. Delete at least 1 employee and display the remaining records.

In [25]:
import sqlite3

def delete_employee(employee_id):
    conn = sqlite3.connect("test.db")
    cursor = conn.cursor()

    cursor.execute("""
        DELETE FROM employees
        WHERE id = ?
    """, (employee_id,))

    conn.commit()

    if cursor.rowcount == 0:
        print(f"No employee found with ID {employee_id}. Nothing deleted.")
    else:
        print(f"Employee ID {employee_id} deleted successfully.")

    conn.close()

In [26]:
print("BEFORE DELETE:")
fetch_all_employees()

delete_employee(3)
delete_employee(99)    # ID that doesn't exist

print("\nAFTER DELETE:")
fetch_all_employees()


BEFORE DELETE:
ID    Name                 Age   Department
---------------------------------------------
1     Alice Johnson        30    HR
2     Bob Smith            25    Sales
3     Carol Williams       35    HR
4     David Brown          28    Engineering
5     Eva Davis            40    Engineering
10    Johnson              30    Engineering
20    Smith                25    Marketing
30    Williams             35    HR
40    Brown                28    Finance
50    Davis                40    Engineering
Employee ID 3 deleted successfully.
No employee found with ID 99. Nothing deleted.

AFTER DELETE:
ID    Name                 Age   Department
---------------------------------------------
1     Alice Johnson        30    HR
2     Bob Smith            25    Sales
4     David Brown          28    Engineering
5     Eva Davis            40    Engineering
10    Johnson              30    Engineering
20    Smith                25    Marketing
30    Williams             35    HR
40    B

### Assignment 6: Advanced Queries

1. Write a Python function to fetch and display employees older than a certain age.
2. Write a Python function to fetch and display employees whose names start with a specific letter.


In [27]:
def fetch_employees_older_than(min_age):
    conn = sqlite3.connect("test.db")
    cursor = conn.cursor()

    cursor.execute("""
        SELECT * FROM employees
        WHERE age > ?
        ORDER BY age ASC
    """, (min_age,))

    rows = cursor.fetchall()
    conn.close()

    if not rows:
        print(f"No employees found older than {min_age}.")
        return

    print(f"\nEmployees older than {min_age}:")
    print(f"{'ID':<5} {'Name':<20} {'Age':<5} {'Department'}")
    print("-" * 45)
    for row in rows:
        print(f"{row[0]:<5} {row[1]:<20} {row[2]:<5} {row[3]}")

fetch_employees_older_than(28)
fetch_employees_older_than(50)   # No results case



Employees older than 28:
ID    Name                 Age   Department
---------------------------------------------
1     Alice Johnson        30    HR
10    Johnson              30    Engineering
30    Williams             35    HR
5     Eva Davis            40    Engineering
50    Davis                40    Engineering
No employees found older than 50.


In [28]:
def fetch_employees_by_letter(letter):
    conn = sqlite3.connect("test.db")
    cursor = conn.cursor()

    cursor.execute("""
        SELECT * FROM employees
        WHERE name LIKE ?
        ORDER BY name ASC
    """, (letter.upper() + "%",))     # e.g. "A" becomes "A%"

    rows = cursor.fetchall()
    conn.close()

    if not rows:
        print(f"No employees found whose name starts with '{letter.upper()}'.")
        return

    print(f"\nEmployees whose name starts with '{letter.upper()}':")
    print(f"{'ID':<5} {'Name':<20} {'Age':<5} {'Department'}")
    print("-" * 45)
    for row in rows:
        print(f"{row[0]:<5} {row[1]:<20} {row[2]:<5} {row[3]}")

fetch_employees_by_letter("a")    # lowercase input works fine
fetch_employees_by_letter("E")
fetch_employees_by_letter("Z")    # No results case



Employees whose name starts with 'A':
ID    Name                 Age   Department
---------------------------------------------
1     Alice Johnson        30    HR

Employees whose name starts with 'E':
ID    Name                 Age   Department
---------------------------------------------
5     Eva Davis            40    Engineering
No employees found whose name starts with 'Z'.


### Assignment 7: Handling Transactions

1. Write a Python function to insert multiple employees into the `employees` table in a single transaction. Ensure that if any insertion fails, none of the insertions are committed.
2. Write a Python function to update the age of multiple employees in a single transaction. Ensure that if any update fails, none of the updates are committed.

In [29]:
def insert_employees_transaction(employees):
    conn = sqlite3.connect("test.db")
    cursor = conn.cursor()

    try:
        cursor.executemany("""
            INSERT INTO employees (id, name, age, department) VALUES (?, ?, ?, ?)
        """, employees)

        conn.commit()
        print(f"Success! {cursor.rowcount} employees inserted.")

    except sqlite3.IntegrityError as e:
        conn.rollback()
        print(f"IntegrityError: {e}")
        print("Transaction rolled back. No employees were inserted.")

    except sqlite3.Error as e:
        conn.rollback()
        print(f"Database error: {e}")
        print("Transaction rolled back. No employees were inserted.")

    finally:
        conn.close()

valid_employees = [
    (6, "Frank Miller",   32, "Marketing"),
    (7, "Grace Lee",      27, "Engineering"),
    (8, "Henry Wilson",   45, "Finance"),
]

invalid_employees = [
    (9,  "Isla Thomas",  29, "HR"),
    (6,  "Jack White",   33, "Marketing"),   # ← duplicate ID, triggers rollback
    (10, "Karen Black",  38, "Finance"),
]

insert_employees_transaction(valid_employees)
insert_employees_transaction(invalid_employees)


Success! 3 employees inserted.
IntegrityError: UNIQUE constraint failed: employees.id
Transaction rolled back. No employees were inserted.


In [30]:
def update_ages_transaction(updates):
    """
    updates: list of (new_age, employee_id) tuples
    """
    conn = sqlite3.connect("test.db")
    cursor = conn.cursor()

    try:
        for new_age, employee_id in updates:

            if not isinstance(new_age, int) or new_age <= 0:
                raise ValueError(f"Invalid age '{new_age}' for employee ID {employee_id}.")

            cursor.execute("""
                UPDATE employees
                SET age = ?
                WHERE id = ?
            """, (new_age, employee_id))

            if cursor.rowcount == 0:
                raise LookupError(f"Employee ID {employee_id} not found.")

        conn.commit()
        print(f"Success! {len(updates)} employee ages updated.")

    except ValueError as e:
        conn.rollback()
        print(f"Validation error: {e}")
        print("Transaction rolled back. No ages were updated.")

    except LookupError as e:
        conn.rollback()
        print(f"Lookup error: {e}")
        print("Transaction rolled back. No ages were updated.")

    except sqlite3.Error as e:
        conn.rollback()
        print(f"Database error: {e}")
        print("Transaction rolled back. No ages were updated.")

    finally:
        conn.close()

valid_updates = [
    (31, 1),    # Alice  → 31
    (26, 2),    # Bob    → 26
    (36, 4),    # David  → 36
]

invalid_age_updates = [
    (33, 1),    # Alice  → 33  (would succeed)
    (-5, 2),    # Bob    → -5  (invalid age, triggers rollback)
    (41, 5),    # Eva    → 41  (never reached)
]

bad_id_updates = [
    (33, 1),    # Alice  → 33  (would succeed)
    (28, 99),   # ID 99 doesn't exist (triggers rollback)
    (41, 5),    # Eva    → 41  (never reached)
]

update_ages_transaction(valid_updates)
update_ages_transaction(invalid_age_updates)
update_ages_transaction(bad_id_updates)


Success! 3 employee ages updated.
Validation error: Invalid age '-5' for employee ID 2.
Transaction rolled back. No ages were updated.
Lookup error: Employee ID 99 not found.
Transaction rolled back. No ages were updated.


### Bonus: Using a context manager (cleanest pattern)


In [None]:
def insert_employees_context_manager(employees):
    try:
        with sqlite3.connect("test.db") as conn:   # auto-commits on success
            cursor = conn.cursor()                  # auto-rollbacks on exception
            cursor.executemany("""
                INSERT INTO employees (id, name, age, department)
                VALUES (?, ?, ?, ?)
            """, employees)
        print(f"Success! Employees inserted.")

    except sqlite3.IntegrityError as e:
        print(f"IntegrityError: {e}")
        print("Transaction rolled back automatically.")

    except sqlite3.Error as e:
        print(f"Database error: {e}")
        print("Transaction rolled back automatically.")


### Assignment 8: Creating Relationships

1. Create a new table named `departments` with columns `id` (integer) and `name` (text).
2. Modify the `employees` table to include a foreign key referencing the `id` column in the `departments` table.
3. Write a Python function to insert data into both the `departments` and `employees` tables, ensuring referential integrity.


In [32]:
"""
Since SQLite doesn't support adding foreign keys to existing tables via ALTER TABLE, we need to recreate the employees table with the foreign key built in.
"""

def setup_tables():
    conn = sqlite3.connect("test.db")
    cursor = conn.cursor()

    # Enable foreign key enforcement (OFF by default in SQLite)
    cursor.execute("PRAGMA foreign_keys = ON")

    # Step 1: Create departments table first (parent)
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS departments (
            id   INTEGER PRIMARY KEY,
            name TEXT    NOT NULL UNIQUE
        )
    """)

    # Step 2: Rename old employees table to a backup
    cursor.execute("ALTER TABLE employees RENAME TO employees_backup")

    # Step 3: Recreate employees with foreign key (child)
    cursor.execute("""
        CREATE TABLE employees (
            id            INTEGER PRIMARY KEY,
            name          TEXT    NOT NULL,
            age           INTEGER NOT NULL,
            department_id INTEGER NOT NULL,
            FOREIGN KEY (department_id) REFERENCES departments(id)
                ON UPDATE CASCADE
                ON DELETE RESTRICT
        )
    """)

    print("Tables created successfully.")

    conn.commit()
    conn.close()

setup_tables()


Tables created successfully.


In [33]:
def insert_department(dept_id, dept_name):
    conn = sqlite3.connect("test.db")
    cursor = conn.cursor()
    cursor.execute("PRAGMA foreign_keys = ON")

    try:
        cursor.execute("""
            INSERT INTO departments (id, name)
            VALUES (?, ?)
        """, (dept_id, dept_name))

        conn.commit()
        print(f"Department '{dept_name}' inserted successfully.")

    except sqlite3.IntegrityError as e:
        conn.rollback()
        print(f"IntegrityError: {e}")

    finally:
        conn.close()

def insert_employee(emp_id, name, age, department_id):
    conn = sqlite3.connect("test.db")
    cursor = conn.cursor()
    cursor.execute("PRAGMA foreign_keys = ON")

    try:
        # Verify department exists before inserting
        cursor.execute("SELECT id, name FROM departments WHERE id = ?", (department_id,))
        dept = cursor.fetchone()

        if not dept:
            raise LookupError(f"Department ID {department_id} does not exist.")

        cursor.execute("""
            INSERT INTO employees (id, name, age, department_id)
            VALUES (?, ?, ?, ?)
        """, (emp_id, name, age, department_id))

        conn.commit()
        print(f"Employee '{name}' inserted into '{dept[1]}' department.")

    except LookupError as e:
        conn.rollback()
        print(f"Lookup error: {e}")
        print("No employee was inserted.")

    except sqlite3.IntegrityError as e:
        conn.rollback()
        print(f"IntegrityError: {e}")
        print("No employee was inserted.")

    finally:
        conn.close()

insert_department(1, "Engineering")
insert_department(2, "Marketing")
insert_department(3, "HR")
insert_department(4, "Finance")
insert_department(5, "Sales")

insert_employee(1, "Alice Johnson",  30, 1)   # Engineering
insert_employee(2, "Bob Smith",      25, 2)   # Marketing
insert_employee(3, "Carol Williams", 35, 3)   # HR
insert_employee(4, "David Brown",    28, 1)   # Engineering
insert_employee(5, "Eva Davis",      40, 5)   # Sales


Department 'Engineering' inserted successfully.
Department 'Marketing' inserted successfully.
Department 'HR' inserted successfully.
Department 'Finance' inserted successfully.
Department 'Sales' inserted successfully.
Employee 'Alice Johnson' inserted into 'Engineering' department.
Employee 'Bob Smith' inserted into 'Marketing' department.
Employee 'Carol Williams' inserted into 'HR' department.
Employee 'David Brown' inserted into 'Engineering' department.
Employee 'Eva Davis' inserted into 'Sales' department.


### Assignment 9: Indexing and Optimization

1. Create an index on the `name` column of the `employees` table.
2. Write a Python function to fetch and display all employees whose names start with a specific letter. Compare the performance with and without the index.


In [34]:
def create_index():
    conn = sqlite3.connect("test.db")
    cursor = conn.cursor()

    cursor.execute("""
        CREATE INDEX IF NOT EXISTS idx_employees_name
        ON employees (name)
    """)

    conn.commit()
    print("Index 'idx_employees_name' created on employees.name")
    conn.close()

create_index()

Index 'idx_employees_name' created on employees.name


In [36]:
import random
import string
import time

def generate_employees(n):
    """Generate n random employees for benchmarking."""
    first_names = ["Alice", "Bob", "Carol", "David", "Eva", "Frank",
                   "Grace", "Henry", "Isla", "Jack", "Karen", "Leo",
                   "Mia", "Nathan", "Olivia", "Paul", "Quinn", "Rachel",
                   "Steve", "Tina", "Uma", "Victor", "Wendy", "Xander"]
    departments = [1, 2, 3, 4, 5]
    employees = []

    for i in range(100, 100 + n):       # start from ID 100 to avoid conflicts
        first = random.choice(first_names)
        last  = "".join(random.choices(string.ascii_uppercase, k=6))
        name  = f"{first} {last}"
        age   = random.randint(22, 60)
        dept  = random.choice(departments)
        employees.append((i, name, age, dept))

    return employees


def bulk_insert(employees):
    conn = sqlite3.connect("test.db")
    cursor = conn.cursor()
    cursor.execute("PRAGMA foreign_keys = ON")

    try:
        cursor.executemany("""
            INSERT OR IGNORE INTO employees (id, name, age, department_id)
            VALUES (?, ?, ?, ?)
        """, employees)
        conn.commit()
        print(f"Inserted {cursor.rowcount} employees for benchmarking.")
    finally:
        conn.close()

bulk_insert(generate_employees(50_000))


Inserted 50000 employees for benchmarking.


In [42]:
import time

def drop_index():
    conn = sqlite3.connect("test.db")
    cursor = conn.cursor()
    cursor.execute("DROP INDEX IF EXISTS idx_employees_name")
    conn.commit()
    conn.close()
    print("Index dropped.")

def create_index():
    conn = sqlite3.connect("test.db")
    cursor = conn.cursor()
    cursor.execute("""
        CREATE INDEX IF NOT EXISTS idx_employees_name
        ON employees (name)
    """)
    conn.commit()
    conn.close()
    print("Index created.")

def fetch_by_letter_benchmarked(letter, label):
    conn = sqlite3.connect("test.db")
    cursor = conn.cursor()

    start = time.perf_counter()         # start timer

    cursor.execute("""
        SELECT * FROM employees
        WHERE name LIKE ?
        ORDER BY name ASC
    """, (letter.upper() + "%",))

    rows = cursor.fetchall()

    end = time.perf_counter()           # stop timer
    conn.close()

    elapsed = (end - start) * 1000      # convert to milliseconds

    print(f"\n[{label}]")
    print(f"  Letter      : '{letter.upper()}'")
    print(f"  Rows found  : {len(rows)}")
    print(f"  Time taken  : {elapsed:.4f} ms")

    return elapsed

def explain_query(letter):
    """Show SQLite's internal query plan — tells you if index is used."""
    conn = sqlite3.connect("test.db")
    cursor = conn.cursor()

    cursor.execute("""
        EXPLAIN QUERY PLAN
        SELECT * FROM employees
        WHERE name LIKE ?
        ORDER BY name ASC
    """, (letter.upper() + "%",))

    plan = cursor.fetchall()
    conn.close()

    print("\n  Query Plan:")
    for row in plan:
        print(f"    {row}")

# ── WITHOUT index ──────────────────────────────────────────
drop_index()
explain_query("A")
time_without = fetch_by_letter_benchmarked("A", "WITHOUT Index")

# ── WITH index ─────────────────────────────────────────────
create_index()
explain_query("A")
time_with = fetch_by_letter_benchmarked("A", "WITH Index")

# ── Summary ────────────────────────────────────────────────
print("\n" + "=" * 45)
print("PERFORMANCE SUMMARY")
print("=" * 45)
print(f"  Without index : {time_without:.4f} ms")
print(f"  With index    : {time_with:.4f} ms")
print(f"  Speedup       : {time_without / time_with:.2f}x faster")



Index dropped.

  Query Plan:
    (3, 0, 216, 'SCAN employees')
    (15, 0, 0, 'USE TEMP B-TREE FOR ORDER BY')

[WITHOUT Index]
  Letter      : 'A'
  Rows found  : 2111
  Time taken  : 9.1701 ms
Index created.

  Query Plan:
    (4, 0, 224, 'SCAN employees USING INDEX idx_employees_name')

[WITH Index]
  Letter      : 'A'
  Rows found  : 2111
  Time taken  : 6.7911 ms

PERFORMANCE SUMMARY
  Without index : 9.1701 ms
  With index    : 6.7911 ms
  Speedup       : 1.35x faster


### Assignment 10: Backing Up and Restoring Data

1. Write a Python function to back up the `test.db` database to a file named `backup.db`.
2. Write a Python function to restore the `test.db` database from the `backup.db` file.

In [43]:
import os
from datetime import datetime

def backup_database(source="test.db", backup="backup.db"):
    if not os.path.exists(source):
        print(f"Error: Source database '{source}' not found.")
        return False

    try:
        source_conn = sqlite3.connect(source)
        backup_conn = sqlite3.connect(backup)

        source_conn.backup(backup_conn)     # built-in SQLite backup API

        source_size = os.path.getsize(source)
        backup_size = os.path.getsize(backup)
        timestamp   = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

        print(f"Backup successful!")
        print(f"  Source  : {source}  ({source_size:,} bytes)")
        print(f"  Backup  : {backup}  ({backup_size:,} bytes)")
        print(f"  Time    : {timestamp}")
        return True

    except sqlite3.Error as e:
        print(f"Backup failed: {e}")
        return False

    finally:
        source_conn.close()
        backup_conn.close()

backup_database()


Backup successful!
  Source  : test.db  (2,347,008 bytes)
  Backup  : backup.db  (2,347,008 bytes)
  Time    : 2026-02-20 17:48:29


True

In [44]:
import os
from datetime import datetime

def restore_database(backup="backup.db", target="test.db"):
    if not os.path.exists(backup):
        print(f"Error: Backup file '{backup}' not found.")
        return False

    # Verify backup is a valid SQLite database before restoring
    try:
        test_conn = sqlite3.connect(backup)
        test_conn.execute("SELECT name FROM sqlite_master LIMIT 1")
        test_conn.close()
    except sqlite3.Error:
        print(f"Error: '{backup}' is not a valid SQLite database.")
        return False

    try:
        backup_conn = sqlite3.connect(backup)
        target_conn = sqlite3.connect(target)

        backup_conn.backup(target_conn)     # restore is just backup in reverse

        backup_size = os.path.getsize(backup)
        target_size = os.path.getsize(target)
        timestamp   = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

        print(f"Restore successful!")
        print(f"  Backup  : {backup}  ({backup_size:,} bytes)")
        print(f"  Target  : {target}  ({target_size:,} bytes)")
        print(f"  Time    : {timestamp}")
        return True

    except sqlite3.Error as e:
        print(f"Restore failed: {e}")
        return False

    finally:
        backup_conn.close()
        target_conn.close()


restore_database()

Restore successful!
  Backup  : backup.db  (2,347,008 bytes)
  Target  : test.db  (2,347,008 bytes)
  Time    : 2026-02-20 17:50:01


True

In [45]:
def count_records(db):
    conn = sqlite3.connect(db)
    cursor = conn.cursor()
    cursor.execute("SELECT COUNT(*) FROM employees")
    count = cursor.fetchone()[0]
    conn.close()
    return count

def delete_all_employees():
    conn = sqlite3.connect("test.db")
    cursor = conn.cursor()
    cursor.execute("DELETE FROM employees")
    conn.commit()
    conn.close()

# Step 1: Check current state
print(f"[Before backup]  test.db has {count_records('test.db')} employees")

# Step 2: Back up
backup_database()

# Step 3: Simulate data loss
delete_all_employees()
print(f"\n[After deletion] test.db has {count_records('test.db')} employees")

# Step 4: Restore
restore_database()
print(f"\n[After restore]  test.db has {count_records('test.db')} employees")


[Before backup]  test.db has 50005 employees
Backup successful!
  Source  : test.db  (2,347,008 bytes)
  Backup  : backup.db  (2,347,008 bytes)
  Time    : 2026-02-20 17:50:58

[After deletion] test.db has 0 employees
Restore successful!
  Backup  : backup.db  (2,347,008 bytes)
  Target  : test.db  (2,347,008 bytes)
  Time    : 2026-02-20 17:50:59

[After restore]  test.db has 50005 employees
