# 03 — File Handling for Data Engineering

Master native file interaction and efficient processing of large datasets **without heavy libraries**.

### What you'll learn
| # | Topic | Key Takeaway |
|---|-------|--------------|
| 1 | **Pathlib Mastery** | Object-oriented filesystem paths, globbing, file info |
| 2 | **CSV Module** | Precise control over delimiters, quoting, and types |
| 3 | **JSON & JSONL** | Structured data and line-delimited streaming format |
| 4 | **Context Managers** | Custom resource handling with `with` statements |
| 5 | **Generators** | Memory-efficient large file processing with `yield` |
| 6 | **Binary & Buffering** | Reading files in raw byte chunks for performance |
| 7 | **Mini Pipeline** | Putting it all together on a real dataset |

---
## 1. Pathlib Mastery

`pathlib.Path` is the modern, object-oriented way to work with filesystem paths.  
It replaces older `os.path` string manipulation with readable, chainable methods.

In [2]:
from pathlib import Path

# --- Path Construction ---
# Path() creates a path object from a string.
# The `/` operator joins path segments (like os.path.join but cleaner).
data_dir = Path("data")
csv_path = data_dir / "bookings.csv"

print("Relative path :", csv_path)            # data/bookings.csv
print("Absolute path :", csv_path.resolve())   # resolve() returns the full absolute path

Relative path : data/bookings.csv
Absolute path : /app/notebooks/data/bookings.csv


In [4]:
# --- Path Components ---
# Every path has parts you can inspect without string splitting.
sample = Path("data/exports/hotel_booking.csv")

print("name  :", sample.name)      # 'hotel_booking.csv'  — filename with extension
print("stem  :", sample.stem)      # 'hotel_booking'      — filename WITHOUT extension
print("suffix:", sample.suffix)    # '.csv'               — file extension (with dot)
print("parent:", sample.parent)    # 'data/exports'       — immediate parent directory
print("parts :", sample.parts)     # ('data', 'exports', 'hotel_booking.csv') — all segments

name  : hotel_booking.csv
stem  : hotel_booking
suffix: .csv
parent: data/exports
parts : ('data', 'exports', 'hotel_booking.csv')


In [None]:
# --- Creating Directories ---
# mkdir() creates a directory.
#   parents=True  → create intermediate dirs if needed (like mkdir -p)
#   exist_ok=True → don't error if it already exists
output_dir = Path("data") / "output"
output_dir.mkdir(parents=True, exist_ok=True)

print("Created:", output_dir.resolve())
print("Exists :", output_dir.exists())   # exists() checks if the path exists on disk
print("Is dir :", output_dir.is_dir())   # is_dir() checks if it's a directory

In [5]:
# --- Globbing (Pattern Matching) ---
# glob() finds files matching a pattern inside a directory.
# '*' matches any characters; '**' matches any depth of subdirectories.

data_dir = Path("data")

# Find all CSV files directly in data/
csv_files = sorted(data_dir.glob("*.csv"))
print("CSV files in data/:")
for f in csv_files:
    print(f"  {f.name}")

# Find all JSON files recursively (any depth)
# rglob() is shorthand for glob('**/<pattern>')
json_files = sorted(data_dir.rglob("*.json"))
print("\nJSON files (recursive):")
for f in json_files:
    print(f"  {f}")

CSV files in data/:
  bookings.csv
  hotel_booking.csv

JSON files (recursive):
  data/booking_summary.json
  data/bookings.json


In [6]:
# --- File Info & Manipulation ---
hotel_csv = Path("data/hotel_booking.csv")

if hotel_csv.exists():
    # stat() returns file metadata (size, timestamps, permissions)
    info = hotel_csv.stat()
    size_mb = info.st_size / (1024 * 1024)   # st_size is in bytes
    print(f"File   : {hotel_csv.name}")
    print(f"Size   : {size_mb:.2f} MB")
    print(f"Is file: {hotel_csv.is_file()}")  # is_file() checks if it's a regular file

# with_suffix() returns a NEW path with a different extension (original unchanged)
parquet_path = hotel_csv.with_suffix(".parquet")
print(f"\nConverted path: {parquet_path}")

# with_name() returns a NEW path with a different filename
renamed_path = hotel_csv.with_name("bookings_clean.csv")
print(f"Renamed path  : {renamed_path}")

File   : hotel_booking.csv
Size   : 23.95 MB
Is file: True

Converted path: data/hotel_booking.parquet
Renamed path  : data/bookings_clean.csv


---
## 2. CSV Module — Deep Dive

The built-in `csv` module gives precise control over reading/writing CSV files.  
Unlike Pandas, it streams row-by-row — great for **memory efficiency** on huge files.

In [7]:
import csv
from pathlib import Path

# ============================================================
# 2a. Writing CSV with DictWriter
# ============================================================
# DictWriter maps dict keys → CSV columns automatically.
#   fieldnames : list of column names (controls column order)
#   writeheader(): writes the header row
#   writerows() : writes all data rows at once

csv_path = Path("data") / "bookings.csv"

rows = [
    {"booking_id": "B-1001", "guest_name": "Alya",  "revenue": 120.0, "room_type": "Suite"},
    {"booking_id": "B-1002", "guest_name": "Rafi",  "revenue": 180.0, "room_type": "Deluxe"},
    {"booking_id": "B-1003", "guest_name": "Nina",  "revenue": 90.0,  "room_type": "Standard"},
    {"booking_id": "B-1004", "guest_name": "Budi",  "revenue": 250.0, "room_type": "Suite"},
    {"booking_id": "B-1005", "guest_name": "Sari",  "revenue": 110.0, "room_type": "Deluxe"},
]

# open() with newline='' is REQUIRED for csv module on all platforms
# to prevent extra blank lines between rows.
with csv_path.open("w", newline="") as f:
    writer = csv.DictWriter(f, fieldnames=rows[0].keys())
    writer.writeheader()   # writes: booking_id,guest_name,revenue,room_type
    writer.writerows(rows) # writes all dicts as CSV rows

print(f"Wrote {len(rows)} rows → {csv_path}")

Wrote 5 rows → data/bookings.csv


In [None]:
# ============================================================
# 2b. Reading CSV with DictReader
# ============================================================
# DictReader reads each row as an OrderedDict (key = column name).
# IMPORTANT: All values come back as STRINGS — you must cast manually.

with csv_path.open("r", newline="") as f:
    # DictReader auto-detects the header from the first row
    reader = csv.DictReader(f)

    print(f"Detected columns: {reader.fieldnames}\n")

    for row in reader:
        # row['revenue'] is a STRING like '120.0', so we cast to float
        revenue = float(row["revenue"])
        print(f"  {row['booking_id']} | {row['guest_name']:>6} | ${revenue:>7.2f} | {row['room_type']}")

In [None]:
# ============================================================
# 2c. Custom Delimiters & Quoting
# ============================================================
# Real-world data isn't always comma-separated.
# The csv module supports custom delimiters (tab, pipe, etc.)
# and quoting strategies for fields that contain special characters.

tsv_path = Path("data") / "output" / "bookings.tsv"

with tsv_path.open("w", newline="") as f:
    writer = csv.DictWriter(
        f,
        fieldnames=rows[0].keys(),
        delimiter="\t",                  # use TAB instead of comma
        quoting=csv.QUOTE_NONNUMERIC,    # quote all non-numeric fields
        # Quoting options:
        #   csv.QUOTE_MINIMAL    → only quote fields containing the delimiter (default)
        #   csv.QUOTE_ALL        → quote every field
        #   csv.QUOTE_NONNUMERIC → quote strings, leave numbers unquoted
        #   csv.QUOTE_NONE       → never quote (use escapechar for special chars)
    )
    writer.writeheader()
    writer.writerows(rows)

# Read back the TSV and display
print("--- TSV Content ---")
print(tsv_path.read_text()[:400])  # read_text() reads entire file as a string

In [None]:
# ============================================================
# 2d. Reading with csv.reader (list-based, no header mapping)
# ============================================================
# csv.reader returns each row as a plain list of strings.
# Useful when you want positional access or the file has no header.

with csv_path.open("r", newline="") as f:
    reader = csv.reader(f)

    # next() advances the iterator by one step — here it skips the header row
    header = next(reader)
    print("Header:", header)

    # enumerate() gives (index, value) pairs — handy for row numbering
    for i, row in enumerate(reader, start=1):
        print(f"  Row {i}: {row}")

---
## 3. JSON & JSONL (JSON Lines)

- **JSON**: Great for nested/hierarchical data (configs, API responses).  
- **JSONL** (JSON Lines): One JSON object per line — perfect for **streaming** large datasets.  
  Each line is independently parseable, so you can process files line-by-line.

In [None]:
import json
from pathlib import Path

# ============================================================
# 3a. Standard JSON — read / write
# ============================================================

json_path = Path("data") / "bookings.json"

# Build a nested payload (typical API / config structure)
payload = {
    "property_id": "HTL-01",
    "property_name": "Grand Hotel Jakarta",
    "bookings": rows,   # our list of dicts from section 2
}

# --- Writing ---
# json.dumps() converts a Python object → JSON string
#   indent=2       → pretty-print with 2-space indentation
#   ensure_ascii=False → allow non-ASCII chars (e.g. accented names)
json_text = json.dumps(payload, indent=2, ensure_ascii=False)
json_path.write_text(json_text)   # Path.write_text() writes a string to a file

print(f"Wrote JSON → {json_path}")
print(json_text[:300])

In [None]:
# --- Reading ---
# json.loads() converts a JSON string → Python object (dict/list)
loaded = json.loads(json_path.read_text())

print(f"Property : {loaded['property_name']}")
print(f"# bookings: {len(loaded['bookings'])}")
print(f"First booking: {loaded['bookings'][0]}")

In [None]:
# ============================================================
# 3b. JSONL (JSON Lines) — write
# ============================================================
# JSONL = one JSON object per line, NO wrapping array.
# Format:
#   {"booking_id": "B-1001", "guest_name": "Alya", ...}
#   {"booking_id": "B-1002", "guest_name": "Rafi", ...}
#
# Why JSONL?
#   - Appendable: just add a new line (no need to rewrite the whole file)
#   - Streamable: process one record at a time (low memory)
#   - Used by: BigQuery, Spark, many ETL pipelines

jsonl_path = Path("data") / "output" / "bookings.jsonl"

with jsonl_path.open("w") as f:
    for record in rows:
        # json.dumps() with NO indent → single-line JSON (required for JSONL)
        line = json.dumps(record, ensure_ascii=False)
        f.write(line + "\n")  # each record on its own line

print(f"Wrote {len(rows)} records → {jsonl_path}")
print("\n--- JSONL Content ---")
print(jsonl_path.read_text())

In [None]:
# ============================================================
# 3c. JSONL — read (streaming, line-by-line)
# ============================================================
# Reading JSONL is simple: iterate lines, parse each independently.
# This approach uses almost NO memory regardless of file size.

print("--- Reading JSONL records ---")
with jsonl_path.open("r") as f:
    for line_number, line in enumerate(f, start=1):
        # strip() removes leading/trailing whitespace and the newline
        line = line.strip()
        if not line:          # skip empty lines (defensive)
            continue
        record = json.loads(line)   # parse ONE JSON object
        print(f"  Record {line_number}: {record['booking_id']} — {record['guest_name']}")

---
## 4. Advanced Context Managers

The `with` statement ensures resources (files, connections, locks) are **always cleaned up**,  
even if an exception occurs. You can build your own context managers two ways:

1. **Class-based** — implement `__enter__` and `__exit__`  
2. **Generator-based** — use `@contextmanager` from `contextlib`

In [None]:
# ============================================================
# 4a. Class-Based Context Manager
# ============================================================
# __enter__() is called when entering the `with` block → returns the resource
# __exit__()  is called when LEAVING the `with` block (even on exception)
#   exc_type, exc_val, exc_tb → exception info (None if no error)
#   return True to SUPPRESS the exception, False/None to let it propagate

import csv
from pathlib import Path


class CSVBatchWriter:
    """
    Context manager that collects rows in a buffer and writes them
    to CSV in one batch when the `with` block exits.

    Usage:
        with CSVBatchWriter('out.csv', ['col_a', 'col_b']) as writer:
            writer.add({'col_a': 1, 'col_b': 2})
    """

    def __init__(self, path: str, fieldnames: list[str]):
        self.path = Path(path)
        self.fieldnames = fieldnames
        self.buffer: list[dict] = []     # in-memory row buffer

    def __enter__(self):
        """Called on entering `with`. Returns self so we can call .add()."""
        print(f"[CSVBatchWriter] Opened — buffering rows for {self.path.name}")
        return self

    def add(self, row: dict):
        """Append a row to the in-memory buffer (not written to disk yet)."""
        self.buffer.append(row)

    def __exit__(self, exc_type, exc_val, exc_tb):
        """
        Called on leaving `with`. Flushes all buffered rows to disk.
        Always runs — even if an exception occurred in the `with` block.
        """
        with self.path.open("w", newline="") as f:
            writer = csv.DictWriter(f, fieldnames=self.fieldnames)
            writer.writeheader()
            writer.writerows(self.buffer)
        print(f"[CSVBatchWriter] Flushed {len(self.buffer)} rows → {self.path}")
        return False   # don't suppress exceptions


# --- Demo ---
batch_csv = Path("data") / "output" / "batch_bookings.csv"

with CSVBatchWriter(str(batch_csv), ["booking_id", "guest_name", "revenue"]) as writer:
    writer.add({"booking_id": "B-2001", "guest_name": "Dewi",  "revenue": 200.0})
    writer.add({"booking_id": "B-2002", "guest_name": "Agus",  "revenue": 150.0})
    writer.add({"booking_id": "B-2003", "guest_name": "Rina",  "revenue": 320.0})

# Verify the file was written
print("\n--- Batch CSV Content ---")
print(batch_csv.read_text())

In [None]:
# ============================================================
# 4b. Generator-Based Context Manager (@contextmanager)
# ============================================================
# contextlib.contextmanager turns a generator function into a context manager.
# Everything BEFORE `yield` = setup (__enter__)
# The yielded value          = what `as` receives
# Everything AFTER  `yield`  = teardown (__exit__)

from contextlib import contextmanager
import time


@contextmanager
def timer(label: str):
    """
    A simple timer context manager.
    Measures elapsed wall-clock time for the code inside the `with` block.
    """
    # --- SETUP (runs before the `with` body) ---
    start = time.perf_counter()  # perf_counter() returns high-resolution time in seconds
    print(f"[Timer] {label} — started")

    yield  # <-- control goes to the `with` block here

    # --- TEARDOWN (runs after the `with` body) ---
    elapsed = time.perf_counter() - start
    print(f"[Timer] {label} — finished in {elapsed:.4f}s")


# --- Demo ---
with timer("Read hotel CSV headers"):
    hotel_csv = Path("data/hotel_booking.csv")
    with hotel_csv.open("r") as f:
        reader = csv.reader(f)
        header = next(reader)   # read only the first line
    print(f"  Found {len(header)} columns")

In [None]:
# ============================================================
# 4c. Practical Example — Temporary Working File
# ============================================================
# A context manager that creates a temp file for processing,
# then cleans it up automatically when done.

@contextmanager
def temp_working_file(path: str):
    """
    Creates a temporary file at `path`. Yields the Path object.
    Automatically deletes the file when the `with` block exits.
    """
    p = Path(path)
    p.parent.mkdir(parents=True, exist_ok=True)
    try:
        yield p             # hand the path to the caller
    finally:
        # finally block runs even if an exception occurs
        if p.exists():
            p.unlink()      # unlink() deletes the file
            print(f"[Cleanup] Deleted temp file: {p}")


# --- Demo ---
with temp_working_file("data/output/temp_staging.csv") as tmp:
    tmp.write_text("id,value\n1,100\n2,200\n")
    print(f"Temp file exists: {tmp.exists()}")
    print(f"Content: {tmp.read_text()}")

# After the `with` block, the file is gone
print(f"Temp file exists after block: {Path('data/output/temp_staging.csv').exists()}")

---
## 5. Generators — Memory-Efficient Large File Processing

A **generator** is a function that uses `yield` instead of `return`.  
It produces values **one at a time** (lazy evaluation), so it never loads the entire dataset into RAM.

This is critical for Data Engineering where files can be **gigabytes** in size.

In [None]:
# ============================================================
# 5a. Generator Basics — yield vs return
# ============================================================

def count_up_to(n: int):
    """
    A generator that yields numbers 1 through n.

    Unlike a regular function that builds a list in memory,
    this produces ONE value at a time and pauses between yields.
    """
    i = 1
    while i <= n:
        yield i    # pause here, return `i` to caller, resume on next iteration
        i += 1


# The generator is LAZY — nothing runs until you iterate
gen = count_up_to(5)
print("Type:", type(gen))     # <class 'generator'>
print("next():", next(gen))   # 1  — next() advances the generator by one step
print("next():", next(gen))   # 2

# for-loop consumes the rest
print("Remaining:", list(gen))  # [3, 4, 5]

In [None]:
# ============================================================
# 5b. Generator for Large CSV — stream rows without loading all into RAM
# ============================================================
import csv
from pathlib import Path


def stream_csv(filepath: str | Path, encoding: str = "utf-8"):
    """
    Generator that yields one dict per row from a CSV file.
    Memory usage is O(1) regardless of file size — only one row in memory at a time.

    Args:
        filepath: Path to the CSV file
        encoding: Text encoding (default: utf-8)

    Yields:
        dict — one row as {column_name: value}
    """
    with open(filepath, "r", encoding=encoding, newline="") as f:
        reader = csv.DictReader(f)
        for row in reader:
            yield row   # yield ONE row, then pause until next() is called


# --- Demo: process 119K-row hotel_booking.csv without loading it all ---
hotel_csv = Path("data/hotel_booking.csv")

# Count rows and sum revenue — streaming, constant memory
total_rows = 0
total_adr = 0.0     # ADR = Average Daily Rate

for row in stream_csv(hotel_csv):
    total_rows += 1
    # adr column might be empty or non-numeric; handle gracefully
    try:
        total_adr += float(row["adr"])
    except (ValueError, KeyError):
        pass

print(f"Total rows    : {total_rows:,}")
print(f"Sum of ADR    : ${total_adr:,.2f}")
print(f"Average ADR   : ${total_adr / total_rows:,.2f}")

In [None]:
# ============================================================
# 5c. Generator Pipeline — chain generators for complex processing
# ============================================================
# Generators can be COMPOSED: output of one feeds into the next.
# Each step processes one record at a time → entire pipeline is O(1) memory.


def filter_rows(rows, column: str, value: str):
    """
    Generator that yields only rows where row[column] == value.
    Like SQL: WHERE column = value
    """
    for row in rows:
        if row[column] == value:
            yield row


def select_columns(rows, columns: list[str]):
    """
    Generator that yields dicts with only the specified columns.
    Like SQL: SELECT col1, col2 FROM ...
    """
    for row in rows:
        # dict comprehension: build a new dict with only desired keys
        yield {col: row[col] for col in columns}


def add_computed_column(rows, new_col: str, func):
    """
    Generator that adds a new computed column to each row.
    Like SQL: SELECT *, func(...) AS new_col FROM ...
    """
    for row in rows:
        row[new_col] = func(row)   # mutate the dict in-place
        yield row


# --- Build a pipeline: stream → filter → select → compute → collect ---
pipeline = stream_csv(hotel_csv)                                       # source
pipeline = filter_rows(pipeline, "hotel", "Resort Hotel")              # WHERE
pipeline = filter_rows(pipeline, "is_canceled", "0")                   # AND
pipeline = select_columns(pipeline, ["hotel", "adr", "country"])       # SELECT
pipeline = add_computed_column(                                        # computed col
    pipeline,
    "adr_category",
    lambda r: "Premium" if float(r["adr"]) > 150 else "Standard",
)

# Consume only the first 5 results (the rest are never even read from disk!)
from itertools import islice    # islice() takes a slice of any iterable (like list[:5])

print("--- Generator Pipeline: Resort Hotel, not canceled, first 5 ---")
for record in islice(pipeline, 5):
    print(f"  {record}")

In [None]:
# ============================================================
# 5d. Generator to JSONL — stream large CSV to JSONL format
# ============================================================
# A common DE task: convert CSV → JSONL for downstream systems.
# Using generators, this works on files of ANY size.

import json

def csv_to_jsonl(csv_path: str | Path, jsonl_path: str | Path, limit: int | None = None):
    """
    Streams a CSV file and writes it as JSONL.
    Memory usage: O(1) — one row at a time.

    Args:
        csv_path  : source CSV file
        jsonl_path: destination JSONL file
        limit     : max rows to convert (None = all)
    """
    count = 0
    with open(jsonl_path, "w") as out:
        for row in stream_csv(csv_path):
            out.write(json.dumps(row) + "\n")
            count += 1
            if limit and count >= limit:
                break
    return count


# Convert first 1000 rows of hotel_booking.csv → JSONL
output_jsonl = Path("data/output/hotel_sample.jsonl")
n = csv_to_jsonl(hotel_csv, output_jsonl, limit=1000)
print(f"Converted {n} rows → {output_jsonl}")
print(f"File size: {output_jsonl.stat().st_size / 1024:.1f} KB")

---
## 6. Binary & Buffered Reading

Sometimes you need to read files as **raw bytes** instead of text:  
- Calculating checksums or file hashes  
- Inspecting file headers (magic bytes)  
- Processing binary formats (images, protobuf)  
- Counting bytes for progress bars on large files

In [None]:
# ============================================================
# 6a. Reading in Byte Chunks
# ============================================================
# open() with mode='rb' opens a file in BINARY mode.
# f.read(chunk_size) reads up to `chunk_size` bytes at a time.
# Returns b'' (empty bytes) when the file is fully read.

from pathlib import Path

hotel_csv = Path("data/hotel_booking.csv")
chunk_size = 64 * 1024   # 64 KB chunks — a common buffer size

total_bytes = 0
chunk_count = 0

with hotel_csv.open("rb") as f:        # 'rb' = read binary
    while True:
        chunk = f.read(chunk_size)      # read up to 64KB
        if not chunk:                   # empty bytes = end of file
            break
        total_bytes += len(chunk)       # len() on bytes = byte count
        chunk_count += 1

file_size = hotel_csv.stat().st_size
print(f"File size (stat) : {file_size:,} bytes")
print(f"Total bytes read : {total_bytes:,} bytes")
print(f"Chunks read      : {chunk_count}")
print(f"Match            : {total_bytes == file_size}")

In [None]:
# ============================================================
# 6b. File Hashing (Checksum) — verify data integrity
# ============================================================
# hashlib provides cryptographic hash functions (MD5, SHA-256, etc.).
# Hashing a file in chunks avoids loading it entirely into memory.
#
# Use case: verify a downloaded dataset hasn't been corrupted.

import hashlib


def file_hash(filepath: str | Path, algorithm: str = "sha256", chunk_size: int = 65536) -> str:
    """
    Compute the hash of a file by reading it in chunks.
    Memory: O(chunk_size), NOT O(file_size).

    Args:
        filepath  : path to the file
        algorithm : hash algorithm name (e.g. 'md5', 'sha256')
        chunk_size: bytes per read (default 64KB)

    Returns:
        Hex digest string of the hash
    """
    # hashlib.new() creates a hash object for the given algorithm
    h = hashlib.new(algorithm)

    with open(filepath, "rb") as f:
        while True:
            chunk = f.read(chunk_size)
            if not chunk:
                break
            h.update(chunk)  # update() feeds bytes into the hash incrementally

    return h.hexdigest()     # hexdigest() returns the final hash as a hex string


# --- Demo ---
sha = file_hash(hotel_csv, "sha256")
md5 = file_hash(hotel_csv, "md5")

print(f"SHA-256 : {sha}")
print(f"MD5     : {md5}")

In [None]:
# ============================================================
# 6c. Counting Lines Efficiently with Binary Read
# ============================================================
# Counting lines by reading text (for line in f) is fine, but
# counting newline BYTES in binary chunks is often faster for huge files.


def count_lines_binary(filepath: str | Path, chunk_size: int = 65536) -> int:
    """
    Count the number of lines in a file using binary chunk reading.
    Faster than text-mode iteration for very large files.

    Counts occurrences of the newline byte (b'\\n') in each chunk.
    """
    count = 0
    with open(filepath, "rb") as f:
        while True:
            chunk = f.read(chunk_size)
            if not chunk:
                break
            count += chunk.count(b"\n")  # bytes.count() counts occurrences of a byte pattern
    return count


# --- Demo: count lines in hotel_booking.csv ---
with timer("Binary line count"):
    n_lines = count_lines_binary(hotel_csv)
    print(f"  Lines (including header): {n_lines:,}")

---
## 7. Mini Pipeline — Putting It All Together

Let's combine everything: **Pathlib + CSV streaming + generators + JSONL output**  
to build a mini ETL pipeline on the `hotel_booking.csv` dataset.

In [None]:
# ============================================================
# 7. Full Pipeline: hotel_booking.csv → filtered JSONL summary
# ============================================================
# Task:
#   1. Stream the 119K-row hotel CSV
#   2. Filter to non-canceled bookings at "City Hotel"
#   3. Extract & transform selected columns
#   4. Write results as JSONL
#   5. Print summary stats

import csv
import json
from pathlib import Path
from contextlib import contextmanager
import time


@contextmanager
def timer(label: str):
    """Reusable timer context manager."""
    start = time.perf_counter()
    yield
    elapsed = time.perf_counter() - start
    print(f"[{label}] completed in {elapsed:.4f}s")


# --- Generator pipeline functions ---

def stream_csv(filepath):
    """Yield one dict per CSV row (O(1) memory)."""
    with open(filepath, "r", newline="") as f:
        for row in csv.DictReader(f):
            yield row


def transform_booking(row: dict) -> dict:
    """
    Transform a raw hotel booking row into a clean summary record.
    Handles type casting and derives new fields.
    """
    adr = float(row.get("adr", 0) or 0)  # Average Daily Rate; handle empty strings
    weekend = int(row.get("stays_in_weekend_nights", 0) or 0)
    weekday = int(row.get("stays_in_week_nights", 0) or 0)
    total_nights = weekend + weekday

    return {
        "country": row.get("country", "UNKNOWN"),
        "room_type": row.get("reserved_room_type", "?"),
        "adr": adr,
        "total_nights": total_nights,
        "total_revenue": round(adr * total_nights, 2),  # round() avoids floating-point noise
        "arrival_year": row.get("arrival_date_year", ""),
        "arrival_month": row.get("arrival_date_month", ""),
    }


# --- Execute the pipeline ---

source = Path("data/hotel_booking.csv")
output = Path("data/output/city_hotel_bookings.jsonl")
output.parent.mkdir(parents=True, exist_ok=True)

with timer("Full pipeline"):
    written = 0
    total_revenue = 0.0
    country_counts: dict[str, int] = {}  # manual counter (no collections import needed)

    with output.open("w") as out_file:
        for raw_row in stream_csv(source):
            # --- Filter: City Hotel + not canceled ---
            if raw_row["hotel"] != "City Hotel":
                continue
            if raw_row["is_canceled"] != "0":
                continue

            # --- Transform ---
            record = transform_booking(raw_row)

            # --- Accumulate stats ---
            total_revenue += record["total_revenue"]
            country = record["country"]
            country_counts[country] = country_counts.get(country, 0) + 1

            # --- Write JSONL ---
            out_file.write(json.dumps(record) + "\n")
            written += 1

# --- Summary ---
print(f"\nRecords written : {written:,}")
print(f"Total revenue   : ${total_revenue:,.2f}")
print(f"Output file     : {output} ({output.stat().st_size / 1024:.1f} KB)")
print(f"Output SHA-256  : {file_hash(output)}")

# Top 5 countries by booking count
# sorted() with key=lambda sorts by value descending
top_countries = sorted(country_counts.items(), key=lambda x: x[1], reverse=True)[:5]
print(f"\nTop 5 countries:")
for country, count in top_countries:
    print(f"  {country:>5} : {count:,} bookings")

In [None]:
# Quick peek at the first 3 output records
print("--- First 3 JSONL records ---")
with output.open("r") as f:
    for line in __import__("itertools").islice(f, 3):
        # json.loads → parse,  json.dumps(indent=2) → pretty-print
        record = json.loads(line)
        print(json.dumps(record, indent=2))
        print()

---
## Key Takeaways

| Concept | Why it matters for DE |
|---------|----------------------|
| **Pathlib** | Clean, cross-platform file paths — no more `os.path.join()` chains |
| **csv module** | Stream CSV row-by-row with full control over delimiters and quoting |
| **JSON / JSONL** | JSONL is the go-to format for streaming structured data in pipelines |
| **Context Managers** | Guarantee cleanup of files, connections, and temp resources |
| **Generators** | Process files of ANY size in constant memory with `yield` |
| **Binary reading** | Hash files, count lines fast, handle non-text formats |

---

## Next Steps

Continue with **`03b_standard_libs.ipynb`** — Python's built-in libraries for datetime, logging, collections, and more.