
# ðŸ“˜ From Python to Production
## Handling Data Efficiently: CSV, JSON, APIs & File System Automation  
By **Prerna Joshi** | #25DaysOfDataTech


## What You'll Learn (at a glance)
- CSV essentials **and** production patterns: encodings, dialects, chunking, memory safety
- JSON and JSON Lines (NDJSON) + flattening nested structures to rows
- APIs beyond basics: headers, params, pagination, **retries** and **rate limits**
- File system with `pathlib` + safe moves, temp dirs, unique file names
- Automation mini-pipelines with **logging** and simple configuration
- Practice tasks with solutions


## 0) Imports and Logging Setup

In [1]:

import os, csv, json, time, math, shutil, tempfile, requests
from pathlib import Path
from datetime import datetime
from typing import Dict, Any, Iterable, List, Tuple, Optional

# Minimal, production-leaning logging
import logging
LOG_DIR = Path("logs"); LOG_DIR.mkdir(exist_ok=True)
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(message)s",
    handlers=[
        logging.StreamHandler(),
        logging.FileHandler(LOG_DIR / "day4.log", encoding="utf-8")
    ],
)
logger = logging.getLogger(__name__)
logger.info("Notebook started")


2025-12-07 21:26:39,995 | INFO | Notebook started



## 1) CSV â€” Bread & Butter of Data Work
Key challenges: **encodings, delimiters, quoting, memory**.
We'll create sample files so the notebook is self-contained.


In [2]:

# Create a sample CSV
rows = [
    ["name", "age", "score"],
    ["Prerna", "24", "95"],
    ["Amit", "21", "88"],
    ["Riya", "22", "76"],
    ["Arjun", "23", "82"],
]
with open("students.csv", "w", newline="", encoding="utf-8") as f:
    writer = csv.writer(f)
    writer.writerows(rows)
print("Created students.csv")


Created students.csv


### 1.1 Reading with `csv.reader` (baseline)

In [3]:

with open("students.csv", "r", encoding="utf-8") as f:
    reader = csv.reader(f)
    for row in reader:
        print(row)


['name', 'age', 'score']
['Prerna', '24', '95']
['Amit', '21', '88']
['Riya', '22', '76']
['Arjun', '23', '82']


### 1.2 Safer: `DictReader` / `DictWriter`

In [4]:

with open("students.csv", "r", encoding="utf-8") as f:
    reader = csv.DictReader(f)
    for row in reader:
        print(f"{row['name']} -> age={row['age']}, score={row['score']}")


Prerna -> age=24, score=95
Amit -> age=21, score=88
Riya -> age=22, score=76
Arjun -> age=23, score=82


### 1.3 Encodings and CSV Dialects

In [5]:

# Detect dialect if file source is unknown (heuristic)
with open("students.csv", "r", encoding="utf-8") as f:
    sample = f.read(1024)
    f.seek(0)
    try:
        dialect = csv.Sniffer().sniff(sample)
    except csv.Error:
        dialect = csv.excel  # fallback
    has_header = csv.Sniffer().has_header(sample)

print("Detected header:", has_header)
print("Dialect delimiter:", getattr(dialect, "delimiter", ","))

# Use detected dialect to read
with open("students.csv", "r", encoding="utf-8") as f:
    reader = csv.reader(f, dialect)
    for row in reader:
        print(row)


Detected header: True
Dialect delimiter: ,
['name', 'age', 'score']
['Prerna', '24', '95']
['Amit', '21', '88']
['Riya', '22', '76']
['Arjun', '23', '82']


### 1.4 Large Files: Streaming & Chunking

In [6]:

def iter_csv(path: str) -> Iterable[Dict[str, str]]:
    with open(path, "r", encoding="utf-8", newline="") as f:
        reader = csv.DictReader(f)
        for row in reader:
            yield row

def avg_score(path: str) -> float:
    total, n = 0, 0
    for row in iter_csv(path):
        try:
            total += int(row["score"])
            n += 1
        except (KeyError, ValueError):
            continue
    return total / n if n else float("nan")

print("Average score:", avg_score("students.csv"))


Average score: 85.25



> **Tip:** When using pandas later, use `dtype`, `usecols`, `chunksize`, and `parse_dates` for speed and memory control.


## 2) JSON & JSON Lines (NDJSON)

### 2.1 Basic JSON read/write

In [7]:

config = {
    "model": "xgboost",
    "learning_rate": 0.05,
    "features": ["age", "amount", "balance"],
    "threshold": 0.82
}
with open("config.json", "w", encoding="utf-8") as f:
    json.dump(config, f, indent=2, ensure_ascii=False)

with open("config.json", "r", encoding="utf-8") as f:
    loaded = json.load(f)

print("Loaded config:", loaded)


Loaded config: {'model': 'xgboost', 'learning_rate': 0.05, 'features': ['age', 'amount', 'balance'], 'threshold': 0.82}


### 2.2 Nested JSON access + flattening

In [8]:

from typing import Any, Dict

sample = {
    "id": 101,
    "user": {"name": "Prerna", "country": "USA"},
    "transactions": [
        {"ts": "2025-01-01", "amount": 120.0},
        {"ts": "2025-01-03", "amount": 199.5},
    ],
}

def flatten(d: Dict[str, Any], parent_key: str = "", sep: str = ".") -> Dict[str, Any]:
    items = []
    for k, v in d.items():
        new_key = f"{parent_key}{sep}{k}" if parent_key else k
        if isinstance(v, dict):
            items.extend(flatten(v, new_key, sep=sep).items())
        else:
            items.append((new_key, v))
    return dict(items)

print(flatten(sample))


{'id': 101, 'user.name': 'Prerna', 'user.country': 'USA', 'transactions': [{'ts': '2025-01-01', 'amount': 120.0}, {'ts': '2025-01-03', 'amount': 199.5}]}


### 2.3 JSON Lines / NDJSON

In [9]:

ndjson_path = "events.ndjson"
events = [
    {"event": "login", "user": "alice", "ok": True},
    {"event": "update", "user": "bob", "ok": False, "reason": "denied"},
    {"event": "logout", "user": "alice", "ok": True},
]
with open(ndjson_path, "w", encoding="utf-8") as f:
    for e in events:
        f.write(json.dumps(e, ensure_ascii=False) + "\n")

parsed = []
with open(ndjson_path, "r", encoding="utf-8") as f:
    for line in f:
        parsed.append(json.loads(line))
parsed


[{'event': 'login', 'user': 'alice', 'ok': True},
 {'event': 'update', 'user': 'bob', 'ok': False, 'reason': 'denied'},
 {'event': 'logout', 'user': 'alice', 'ok': True}]

## 3) APIs â€” From Basic to Production

### 3.1 Safe GET with retry & backoff

In [10]:

from typing import Optional

def safe_get(url: str, params: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None,
             max_tries: int = 3, timeout: int = 10) -> Optional[Dict[str, Any]]:
    tries = 0
    while tries < max_tries:
        try:
            resp = requests.get(url, params=params, headers=headers, timeout=timeout)
            if resp.status_code == 429:
                retry_after = int(resp.headers.get("Retry-After", "1"))
                time.sleep(retry_after)
                tries += 1
                continue
            resp.raise_for_status()
            return resp.json()
        except requests.RequestException as e:
            wait = 2 ** tries
            print(f"Request failed ({e}). Retrying in {wait}s...")
            time.sleep(wait)
            tries += 1
    print("Failed after retries:", url)
    return None

user = safe_get("https://api.github.com/users/prernajoshipj")
print("User keys (sample):", list(user.keys())[:6] if user else None)


User keys (sample): ['login', 'id', 'node_id', 'avatar_url', 'gravatar_id', 'url']


### 3.2 Pagination pattern (GitHub public repos, first N)

In [11]:

def fetch_github_repos(user: str, limit: int = 50) -> List[Dict[str, Any]]:
    all_repos = []
    per_page = 30
    page = 1
    while len(all_repos) < limit:
        data = safe_get(f"https://api.github.com/users/{user}/repos", params={"per_page": per_page, "page": page})
        if not data:
            break
        all_repos.extend(data)
        if len(data) < per_page:
            break
        page += 1
    return all_repos[:limit]

repos = fetch_github_repos("pallets", limit=40)
summary = [{"name": r.get("name"), "stars": r.get("stargazers_count")} for r in repos[:5]]
summary


[{'name': '.github', 'stars': 14},
 {'name': 'actions', 'stars': 0},
 {'name': 'click', 'stars': 17047},
 {'name': 'flask', 'stars': 70888},
 {'name': 'flask-docs', 'stars': 34}]

### 3.3 Save API data (compressed)

In [12]:

import gzip

def save_json_gz(obj: Any, path: Path) -> None:
    with gzip.open(path, "wt", encoding="utf-8") as gz:
        json.dump(obj, gz, ensure_ascii=False)
    print("Saved gzip JSON ->", path)

if repos:
    save_json_gz(repos, Path("repos.json.gz"))


Saved gzip JSON -> repos.json.gz


## 4) File System â€” Underrated Production Skill

### 4.1 Paths, globs, and safe moves

In [13]:

RAW = Path("data/raw"); PROC = Path("data/processed")
RAW.mkdir(parents=True, exist_ok=True); PROC.mkdir(parents=True, exist_ok=True)

# Put some sample files
(RAW / "students_backup.csv").write_text(Path("students.csv").read_text(encoding="utf-8"), encoding="utf-8")
(RAW / "notes.txt").write_text("hello", encoding="utf-8")

def move_with_unique(src: Path, dst_dir: Path) -> Path:
    dst_dir.mkdir(parents=True, exist_ok=True)
    target = dst_dir / src.name
    i = 1
    while target.exists():
        stem, suf = target.stem, target.suffix
        target = dst_dir / f"{stem}_{i}{suf}"
        i += 1
    src.replace(target)
    return target

moved = []
for p in RAW.glob("*.csv"):
    moved.append(move_with_unique(p, PROC))

print("Moved to processed:", [m.name for m in moved])


Moved to processed: ['students_backup.csv']


### 4.2 Temp directories for safe pipelines

In [14]:

with tempfile.TemporaryDirectory() as tmpdir:
    tmp = Path(tmpdir)
    temp_out = tmp / "intermediate.json"
    json.dump({"ok": True, "ts": datetime.now().isoformat()}, open(temp_out, "w", encoding="utf-8"))
    final = Path("artifacts"); final.mkdir(exist_ok=True)
    final_out = final / "status.json"
    import shutil as _shutil
    _shutil.move(str(temp_out), str(final_out))
    print("Wrote atomically ->", final_out)


Wrote atomically -> artifacts\status.json


## 5) Automation Mini-Pipelines

### 5.1 Daily fetch (weather) with versioned filename

In [15]:

def fetch_daily_weather(lat=40.7, lon=-74.0) -> Optional[Dict[str, Any]]:
    return safe_get("https://api.open-meteo.com/v1/forecast",
                    params={"latitude": lat, "longitude": lon, "hourly": "temperature_2m"})

def save_versioned_json(data: Dict[str, Any], prefix: str, out_dir: Path = Path("data/weather")) -> Path:
    out_dir.mkdir(parents=True, exist_ok=True)
    ts = datetime.now().strftime("%Y%m%d_%H%M%S")
    path = out_dir / f"{prefix}_{ts}.json"
    with open(path, "w", encoding="utf-8") as f:
        json.dump(data, f, indent=2, ensure_ascii=False)
    print("Saved", path)
    return path

wx = fetch_daily_weather()
if wx:
    save_versioned_json(wx, "weather")
else:
    print("No weather data (offline or API issue)")


Saved data\weather\weather_20251207_212643.json


### 5.2 Folder cleanup (rotate old files, keep N newest)

In [16]:

def rotate_files(folder: Path, pattern: str = "*.json", keep: int = 5) -> None:
    files = sorted(folder.glob(pattern), key=lambda p: p.stat().st_mtime, reverse=True)
    for old in files[keep:]:
        old.unlink(missing_ok=True)
        print("Deleted old:", old)

rotate_files(Path("data/weather"), keep=3)


## 6) From Nested JSON to Rows

In [17]:

from typing import Any, Dict, List

def explode_transactions(record: Dict[str, Any]) -> List[Dict[str, Any]]:
    base = {k: v for k, v in record.items() if k != "transactions"}
    out = []
    for t in record.get("transactions", []):
        row = base.copy()
        row.update({f"tx_{k}": v for k, v in t.items()})
        out.append(row)
    return out

rows = explode_transactions({
    "user": {"name": "Prerna", "country": "USA"},
    "transactions": [{"ts": "2025-01-01", "amount": 120.0},
                     {"ts": "2025-01-03", "amount": 199.5}]
})
rows


[{'user': {'name': 'Prerna', 'country': 'USA'},
  'tx_ts': '2025-01-01',
  'tx_amount': 120.0},
 {'user': {'name': 'Prerna', 'country': 'USA'},
  'tx_ts': '2025-01-03',
  'tx_amount': 199.5}]

### Save flattened rows to CSV

In [18]:

def write_dicts_to_csv(path: Path, rows: List[Dict[str, Any]]) -> None:
    if not rows:
        return
    keys = sorted({k for r in rows for k in r.keys()})
    with open(path, "w", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=keys)
        writer.writeheader()
        for r in rows:
            writer.writerow(r)

write_dicts_to_csv(Path("flattened.csv"), rows)
print("Wrote flattened.csv")


Wrote flattened.csv



## 7) Practice â€” Apply & Adapt
1. Read a large CSV and compute summary stats without loading into memory.  
2. Convert NDJSON logs into a single CSV with selected fields.  
3. Fetch a paginated API (your choice), collect the first 200 records, and save compressed.  
4. Build a small **daily job**: fetch â†’ save versioned â†’ rotate old â†’ append to a master CSV.


### âœ… Reference Solutions

In [19]:

def csv_summary(path: str) -> Dict[str, Any]:
    cnt = 0; total = 0
    for row in iter_csv(path):
        try:
            total += int(row["score"]); cnt += 1
        except Exception:
            pass
    return {"rows": cnt, "avg_score": total / cnt if cnt else None}

csv_summary("students.csv")


{'rows': 4, 'avg_score': 85.25}

In [20]:

def ndjson_to_csv(ndjson_file: Path, csv_file: Path, fields: List[str]):
    with open(ndjson_file, "r", encoding="utf-8") as fin, open(csv_file, "w", newline="", encoding="utf-8") as fout:
        writer = csv.DictWriter(fout, fieldnames=fields)
        writer.writeheader()
        for line in fin:
            obj = json.loads(line)
            row = {k: obj.get(k) for k in fields}
            writer.writerow(row)

ndjson_to_csv(Path("events.ndjson"), Path("events.csv"), ["event", "user", "ok"])
print("Wrote events.csv")


Wrote events.csv


In [21]:

data = fetch_github_repos("pallets", limit=120)
save_json_gz(data, Path("pallets_repos.json.gz"))
len(data)


Saved gzip JSON -> pallets_repos.json.gz


17

In [22]:

def daily_job():
    data = fetch_daily_weather()
    if not data:
        print("daily_job: no data")
        return
    path = save_versioned_json(data, "weather")
    rotate_files(path.parent, keep=5)

daily_job()
print("Daily job executed")


Saved data\weather\weather_20251207_212645.json
Daily job executed



---
### Production Pointers Recap
- Prefer **`pathlib`** over raw string paths; write to **temp dirs** then move atomically.  
- Replace `print` with **`logging`** in real scripts; keep logs in a dedicated folder.  
- For APIs, always set **timeouts**, add **retries**, and handle **rate limits**.  
- Use **NDJSON** for streaming logs and large append-only data.  
- Version outputs, rotate old files, consider **compression** for large JSON.

*Expanded notebook generated on 2025-12-08 02:18:36.*
