# Trusted Zone — Images and Documents Filtering

This notebook implements the transformation and validation step for the **Trusted Zone** of our data pipeline.  
Its goal is to ensure that every recipe document kept in the Trusted Zone has at least one associated image, and that no orphan images are stored without a corresponding recipe entry.  

By doing so, we maintain the integrity of the multimodal dataset and make the data ready for subsequent analytical and modeling stages.


## 1. Setup and Config

In [1]:
import os, io, json, re
from pathlib import PurePosixPath
from datetime import datetime, timezone
from typing import Dict, List, Set, Iterable

import boto3
from botocore.config import Config
from botocore.exceptions import ClientError
from dotenv import load_dotenv

load_dotenv()

# S3 / MinIO
MINIO_USER     = os.getenv("MINIO_USER")
MINIO_PASSWORD = os.getenv("MINIO_PASSWORD")
MINIO_ENDPOINT = os.getenv("MINIO_ENDPOINT")

session = boto3.session.Session(
    aws_access_key_id=MINIO_USER,
    aws_secret_access_key=MINIO_PASSWORD,
    region_name="us-east-1"
)
s3 = session.client(
    "s3",
    endpoint_url=MINIO_ENDPOINT,
    config=Config(signature_version="s3v4", s3={"addressing_style": "path"})
)

# Paths
FORM_BUCKET         = "formatted-zone"
FORM_IMAGES_PREFIX  = "images"
FORM_DOCS_KEY       = "documents/recipes.jsonl"

TRUST_BUCKET        = "trusted-zone"
TRUST_IMAGES_PREFIX = "images"
TRUST_DOCS_KEY      = "documents/recipes.jsonl"
TRUST_REPORT_PREFIX = "reports"

# Behavior
DRY_RUN   = False
OVERWRITE = True

def utc_ts():
    return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H-%M-%SZ")


## 2. S3 Helpers

We begin by loading credentials and defining S3 client sessions (using the MinIO-compatible interface).  
The Trusted Zone will receive its data from the **Formatted Zone**, which contains cleaned and standardized files.  
These sections simply prepare the environment for accessing and writing data across the two buckets.

In [2]:
def s3_list_keys(bucket: str, prefix: str) -> Iterable[str]:
    paginator = s3.get_paginator("list_objects_v2")
    for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
        for obj in page.get("Contents", []) or []:
            key = obj["Key"]
            if not key.endswith("/"):
                yield key

def s3_head(bucket: str, key: str):
    try:
        return s3.head_object(Bucket=bucket, Key=key)
    except ClientError as e:
        if e.response.get("Error", {}).get("Code") in ("404", "NoSuchKey", "NotFound"):
            return None
        raise

def s3_copy_object(src_bucket: str, src_key: str, dst_bucket: str, dst_key: str, overwrite: bool = True):
    if not overwrite and s3_head(dst_bucket, dst_key) is not None:
        return "skip-exists"
    return s3.copy_object(
        Bucket=dst_bucket,
        Key=dst_key,
        CopySource={"Bucket": src_bucket, "Key": src_key},
        MetadataDirective="COPY"
    )

def read_jsonl_lines(bucket: str, key: str):
    obj = s3.get_object(Bucket=bucket, Key=key)
    for raw in obj["Body"].iter_lines():
        if raw:  # skip empty
            yield raw


## 3. Extract recipeId from image filenames

Each image stored in the Formatted Zone follows a structured naming convention that encodes metadata, including the recipe identifier.  
The general pattern is:

**fileType$dataSource$ingestionTimestamp$hash__recipeId_positionOnImagesUrlArrayFromLayer2.extension**

From these filenames, we extract the `recipeId` using a regular expression.  
This allows us to associate every image with its corresponding recipe entry, even when multiple images exist for the same recipe.  
The result of this step is two structures:

- `img_ids`: a set of **unique recipe IDs** that have at least one image.
- `id_to_imgkeys`: a dictionary mapping each `recipeId` to **all its image keys** (to preserve one-to-many relationships).


In [7]:
# Recognize names like:
#   images/type$src$ts$hash__000018c8a5_0.jpg
#   images/type$src$ts$hash__abcd_ef-12_3.JPEG
# ID part: letters, digits, underscore, dash
ID_REGEX = re.compile(
    r"__([A-Za-z0-9_\-]+)_(\d+)\.(?:jpe?g|png|webp|gif|bmp|tiff)$",
    re.IGNORECASE
)

def recipe_id_from_image_key(key: str) -> str | None:
    name = PurePosixPath(key).name
    m = ID_REGEX.search(name)
    return m.group(1) if m else None

img_ids: Set[str] = set()                    # unique IDs (filter purpose only)
id_to_imgkeys: Dict[str, List[str]] = {}     # ALL images per ID

count_keys = 0
for key in s3_list_keys(FORM_BUCKET, FORM_IMAGES_PREFIX + "/"):
    count_keys += 1
    rid = recipe_id_from_image_key(key)
    if not rid:
        continue
    img_ids.add(rid)
    id_to_imgkeys.setdefault(rid, []).append(key)

# Make copies deterministic (optional)
for rid in id_to_imgkeys:
    id_to_imgkeys[rid].sort()

total_images = sum(len(v) for v in id_to_imgkeys.values())
print(f"[INFO] scanned image keys: {count_keys}")
print(f"[INFO] unique recipeIds with images: {len(img_ids)}")
print(f"[INFO] total image files matched to recipeIds: {total_images}")


[INFO] scanned image keys: 11
[INFO] unique recipeIds with images: 7
[INFO] total image files matched to recipeIds: 11


## 4. Stream-filter docs.jsonl by img_ids and multipart-upload to Trusted

The recipes dataset (`recipes.jsonl`) can be extremely large.  
Instead of checking every image individually, we use the set of `img_ids` extracted earlier to filter only the recipes that have at least one corresponding image.

This step ensures that:
- Every recipe in the Trusted Zone can be paired with one or more valid images.
- Entries without visual data (recipes with missing images) are excluded, as they cannot contribute to multimodal analyses.

The filtering is performed in a **streaming** manner using multipart uploads, which prevents memory overload even for very large JSONL files.

In [8]:
MIN_PART_SIZE = 8 * 1024 * 1024  # 8 MB

class MultipartJSONLWriter:
    def __init__(self, bucket: str, key: str, content_type="application/x-ndjson", metadata=None):
        self.bucket = bucket
        self.key = key
        self.buf = io.BytesIO()
        self.parts = []
        self.part_num = 1
        self.open = True
        extra = {
            "Bucket": bucket,
            "Key": key,
            "ContentType": content_type,
            "Metadata": metadata or {},
        }
        resp = s3.create_multipart_upload(**extra)
        self.upload_id = resp["UploadId"]

    def _flush_part(self):
        self.buf.seek(0)
        body = self.buf.read()
        if not body:
            self.buf.seek(0)
            self.buf.truncate(0)
            return
        resp = s3.upload_part(
            Bucket=self.bucket, Key=self.key,
            UploadId=self.upload_id, PartNumber=self.part_num, Body=body
        )
        self.parts.append({"ETag": resp["ETag"], "PartNumber": self.part_num})
        self.part_num += 1
        self.buf.seek(0); self.buf.truncate(0)

    def write_line(self, raw_line_bytes: bytes):
        # raw_line_bytes is already one JSON object line (no trailing \n required)
        self.buf.write(raw_line_bytes)
        self.buf.write(b"\n")
        if self.buf.tell() >= MIN_PART_SIZE:
            self._flush_part()

    def close(self):
        if not self.open:
            return
        try:
            # If there’s leftover data, flush as a last part
            self._flush_part()
            if not self.parts:
                # No data kept: abort multipart, optionally create empty object
                s3.abort_multipart_upload(
                    Bucket=self.bucket, Key=self.key, UploadId=self.upload_id
                )
                # Optional: write a 0-byte file so the path exists
                s3.put_object(
                    Bucket=self.bucket, Key=self.key, Body=b"",
                    ContentType="application/x-ndjson",
                    Metadata={"note": "empty after filtering", "ts": utc_ts()},
                )
            else:
                s3.complete_multipart_upload(
                    Bucket=self.bucket, Key=self.key, UploadId=self.upload_id,
                    MultipartUpload={"Parts": self.parts}
                )
        finally:
            self.open = False

def filter_docs_to_trusted_by_ids():
    total = kept = 0
    if DRY_RUN:
        for raw in read_jsonl_lines(FORM_BUCKET, FORM_DOCS_KEY):
            total += 1
            try:
                rid = json.loads(raw).get("id")
            except Exception:
                continue
            if rid in img_ids:
                kept += 1
        print(f"[DRY_RUN] total={total} kept={kept}")
        return total, kept

    writer = MultipartJSONLWriter(
        TRUST_BUCKET, TRUST_DOCS_KEY,
        content_type="application/x-ndjson",
        metadata={"note": "filtered to ids that have images", "ts": utc_ts()}
    )
    try:
        for raw in read_jsonl_lines(FORM_BUCKET, FORM_DOCS_KEY):
            total += 1
            try:
                rec = json.loads(raw)
            except Exception:
                continue
            rid = rec.get("id")
            if rid and rid in img_ids:
                kept += 1
                writer.write_line(raw)
    finally:
        # Always close; it handles zero-kept gracefully
        writer.close()

    print(f"[OK] wrote filtered docs to s3://{TRUST_BUCKET}/{TRUST_DOCS_KEY}")
    return total, kept

total_docs, kept_docs = filter_docs_to_trusted_by_ids()
print(f"[STATS] docs total={total_docs} kept={kept_docs} dropped={total_docs-kept_docs}")


[OK] wrote filtered docs to s3://trusted-zone/documents/recipes.jsonl
[STATS] docs total=1029720 kept=7 dropped=1029713


## 5. Copy only the kept images to Trusted, preserving filenames

Once the valid recipe entries have been identified, the corresponding images are copied from the **Formatted Zone** to the **Trusted Zone**.

For each recipe ID, all associated images are transferred, preserving the original filenames.  
This allows the dataset to maintain the full range of visual variants (different angles, styles, or preparation steps) linked to a single recipe.

At this stage, only images that have a valid recipe entry are copied — any **orphan images** remain in the Formatted Zone.

In [9]:
copied = skipped = 0

if DRY_RUN:
    print("[DRY_RUN] Skipping image copies.")
else:
    # Only copy images for IDs that survived the filtering (i.e., in img_ids)
    # Since we filtered docs by img_ids, all img_ids are "kept"
    for rid, keys in id_to_imgkeys.items():
        # keys are full 'images/...' relative keys in formatted-zone
        for src_key in keys:
            dst_key = f"{TRUST_IMAGES_PREFIX}/{PurePosixPath(src_key).name}"
            try:
                s3_copy_object(FORM_BUCKET, src_key, TRUST_BUCKET, dst_key, overwrite=OVERWRITE)
                copied += 1
            except ClientError as e:
                print(f"[WARN] copy failed {src_key} -> {dst_key}: {e}")
                skipped += 1

print(f"[STATS] images copied={copied} skipped={skipped}")


[STATS] images copied=11 skipped=0


## Report

In [None]:
report = {
    "ts": utc_ts(),
    "source_docs": f"s3://{FORM_BUCKET}/{FORM_DOCS_KEY}",
    "source_images_prefix": f"s3://{FORM_BUCKET}/{FORM_IMAGES_PREFIX}/",
    "kept_doc_count": kept_docs,
    "total_doc_count": total_docs,
    "unique_recipe_ids_with_images": len(img_ids),
    "images_copied": copied,
    "images_skipped": skipped
}

if not DRY_RUN:
    s3.put_object(
        Bucket=TRUST_BUCKET,
        Key=f"{TRUST_REPORT_PREFIX}/images_filter_{utc_ts()}.json",
        Body=json.dumps(report, ensure_ascii=False, indent=2).encode("utf-8"),
        ContentType="application/json"
    )
    print(f"[OK] wrote report -> s3://{TRUST_BUCKET}/{TRUST_REPORT_PREFIX}/")
else:
    print("[DRY_RUN] report:", json.dumps(report, indent=2))
