# Filter rows by image filename prefixes (batch streamed)

This notebook loads a CSV / Excel / JSON table, optionally converts large Excel to CSV for streaming, then streams the rows in batches (using the same chunking logic pattern demonstrated in Key_Driven_TIF_Validation.ipynb) to filter rows whose COLUMN_NAME_TO_COMPARE matches any image filename prefix (filename[:N]) from a specified folder. Matching rows are written to a CSV in OUTPUT_FOLDER.

In [1]:
from __future__ import annotations

import csv
import json
import logging
import os
from pathlib import Path
from typing import Any, Dict, Generator, Iterable, Iterator, List, Optional, Sequence, Tuple
import tempfile

LOGGER = logging.getLogger('filter_key_val_data')
if not LOGGER.handlers:
    handler = logging.StreamHandler()
    handler.setFormatter(logging.Formatter('%(asctime)s | %(levelname)s | %(name)s | %(message)s'))
    LOGGER.addHandler(handler)
LOGGER.setLevel(logging.INFO)

def _ensure_iterable_chunks(iterable: Iterable[Any], chunk_size: int) -> Iterator[List[Any]]:
    """Yield successive lists (chunks) from an iterable, size up to chunk_size.
    Mirrors the chunking logic approach used in the referenced notebook.
    """
    chunk: List[Any] = []
    cs = max(1, int(chunk_size))
    for item in iterable:
        chunk.append(item)
        if len(chunk) >= cs:
            yield chunk
            chunk = []
    if chunk:
        yield chunk


## Configuration

In [6]:
# Input table (CSV / Excel / JSON)
INPUT_TABLE_PATH = Path("D:/mock_api_TEST/filtered_rows.xlsx")  # change to your file path
INPUT_FORMAT = 'auto'  # 'auto' | 'csv' | 'excel' | 'json'
COLUMN_NAME_TO_COMPARE = '파일명'  # column in the input table to match against image filename prefixes

# Excel conversion and streaming
CONVERT_EXCEL_TO_CSV = True  # if True and input is Excel, convert to CSV for streaming
EXCEL_ROWS_THRESHOLD = 15000  # if Excel rows exceed this, convert to CSV for streaming
CSV_DELIMITER = ','  # delimiter for CSV reading/writing
CSV_ENCODING = 'utf-8-sig'
CSV_OUTPUT_ENCODING = 'utf-8-sig'  # used for output CSV to preserve non-ASCII (e.g., Korean) in Excel
BATCH_SIZE = 8000  # streaming batch size (mirrors usage in the reference notebook)

# Image folder and matching
IMAGE_PATH = Path('D:/real_data_key/images_test_key')  # folder containing images (tif, tiff, jpeg, jpg, png, etc.)
ALLOWED_IMAGE_EXTENSIONS = {'.tif', '.tiff', '.jpeg', '.jpg', '.png'}
MATCH_PREFIX_LEN = 17  # N in filename[:N]
STRIP_WHITESPACE = True
CASE_INSENSITIVE = True

# Output
OUTPUT_FOLDER = Path('D:/real_data_key')
OUTPUT_FILENAME = 'filtered_rows.xlsx'

# JSON nuances (only used if INPUT_FORMAT == 'json' or auto-detected)
JSON_RECORDS_IS_LINES = False  # True if NDJSON (one JSON object per line)
JSON_ARRAY_FIELD = None        # If the JSON file is an object with an array field, set the field name

# Logging verbosity override (optional)
DEBUG_LOG = False
if DEBUG_LOG:
    LOGGER.setLevel(logging.DEBUG)

OUTPUT_FOLDER.mkdir(parents=True, exist_ok=True)


## Helpers: image prefix collection and input streaming

In [7]:
def list_image_prefixes(image_dir: Path, allowed_exts: Sequence[str], n: int, case_insensitive: bool) -> List[str]:
    image_dir = Path(image_dir)
    if not image_dir.exists():
        LOGGER.warning(f'IMAGE_PATH does not exist: {image_dir!s}')
        return []
    allowed = {e.lower() for e in allowed_exts}
    prefixes: List[str] = []
    for root, _dirs, files in os.walk(image_dir):
        for fn in files:
            ext = os.path.splitext(fn)[1].lower()
            if ext in allowed:
                stem = os.path.splitext(fn)[0]
                pref = stem[: max(0, int(n))]
                if case_insensitive:
                    pref = pref.lower()
                prefixes.append(pref)
    return prefixes

def _try_import_openpyxl():
    try:
        import openpyxl  # type: ignore
        return openpyxl
    except Exception as e:
        LOGGER.warning('openpyxl not available; Excel conversion/streaming may be limited. %s', e)
        return None

def count_excel_rows(xlsx_path: Path) -> int:
    op = _try_import_openpyxl()
    if not op:
        return -1
    wb = op.load_workbook(filename=str(xlsx_path), read_only=True, data_only=True)
    ws = wb.active
    # max_row includes header
    return int(ws.max_row or 0)

def excel_to_csv_stream(xlsx_path: Path, csv_out_path: Path, encoding: str = 'utf-8', delimiter: str = ',') -> Path:
    op = _try_import_openpyxl()
    if not op:
        raise RuntimeError('openpyxl is required for Excel to CSV conversion')
    LOGGER.info(
        "Converting Excel to CSV for streaming: '%s' -> '%s'", xlsx_path, csv_out_path
    )
    wb = op.load_workbook(filename=str(xlsx_path), read_only=True, data_only=True)
    ws = wb.active
    with open(csv_out_path, 'w', newline='', encoding=encoding) as f:
        writer = csv.writer(f, delimiter=delimiter)
        for i, row in enumerate(ws.iter_rows(values_only=True), 1):
            # Convert None to '' for CSV
            out = [ '' if (c is None) else c for c in row ]
            writer.writerow(out)
    return csv_out_path

def read_csv_header(csv_path: Path, delimiter: str = ',') -> List[str]:
    with open(csv_path, 'r', newline='', encoding=CSV_ENCODING) as f:
        reader = csv.reader(f, delimiter=delimiter)
        header = next(reader, None)
        return header or []

def iter_csv_rows(csv_path: Path, delimiter: str = ',') -> Iterator[Dict[str, Any]]:
    with open(csv_path, 'r', newline='', encoding=CSV_ENCODING) as f:
        reader = csv.DictReader(f, delimiter=delimiter)
        for row in reader:
            yield row

def iter_json_rows(json_path: Path, records_is_lines: bool = False, array_field: Optional[str] = None) -> Iterator[Dict[str, Any]]:
    if records_is_lines:
        with open(json_path, 'r', encoding='utf-8') as f:
            for line in f:
                line = line.strip()
                if not line:
                    continue
                yield json.loads(line)
    else:
        with open(json_path, 'r', encoding='utf-8') as f:
            data = json.load(f)
        if isinstance(data, list):
            for obj in data:
                if isinstance(obj, dict):
                    yield obj
        elif isinstance(data, dict) and array_field and isinstance(data.get(array_field), list):
            for obj in data[array_field]:
                if isinstance(obj, dict):
                    yield obj
        else:
            LOGGER.warning('JSON structure not recognized for streaming; expect list or object with array_field')
            return

def detect_format(path: Path, configured: str = 'auto') -> str:
    if configured and configured.lower() != 'auto':
        return configured.lower()
    ext = path.suffix.lower()
    if ext in {'.csv'}:
        return 'csv'
    if ext in {'.xlsx', '.xls'}:
        return 'excel'
    if ext in {'.json', '.ndjson'}:
        return 'json'
    return 'csv'  # default

def normalize_value(val: Any, strip_ws: bool, lower: bool) -> str:
    s = '' if val is None else str(val)
    if strip_ws:
        s = s.strip()
    if lower:
        s = s.lower()
    return s


## Build image prefixes set

In [8]:
image_prefixes = list_image_prefixes(IMAGE_PATH, ALLOWED_IMAGE_EXTENSIONS, MATCH_PREFIX_LEN, CASE_INSENSITIVE)
image_prefix_set = set(image_prefixes)
LOGGER.info('Collected %d image filename prefixes from %s', len(image_prefix_set), IMAGE_PATH)
# Optional preview
preview = list(image_prefixes[:10])
preview


2025-09-02 15:17:09,671 | INFO | filter_key_val_data | Collected 0 image filename prefixes from D:\real_data_key\images_test_key


[]

## Prepare input streaming (CSV / Excel / JSON)

In [9]:
fmt = detect_format(INPUT_TABLE_PATH, INPUT_FORMAT)
LOGGER.info('Detected/selected input format: %s', fmt)

tmp_csv_path: Optional[Path] = None
header: Optional[List[str]] = None
row_iter: Optional[Iterator[Dict[str, Any]]] = None

if fmt == 'csv':
    header = read_csv_header(INPUT_TABLE_PATH, delimiter=CSV_DELIMITER)
    row_iter = iter_csv_rows(INPUT_TABLE_PATH, delimiter=CSV_DELIMITER)
elif fmt == 'excel':
    do_convert = CONVERT_EXCEL_TO_CSV
    if do_convert:
        try:
            nrows = count_excel_rows(INPUT_TABLE_PATH)
        except Exception:
            nrows = -1
        if (nrows < 0) or (nrows >= EXCEL_ROWS_THRESHOLD):
            do_convert = True
        else:
            # small excel: still convert to unify pipeline (optional)
            do_convert = True
    if do_convert:
        tmp_csv_path = Path(tempfile.gettempdir()) / f"{INPUT_TABLE_PATH.stem}_key_stream.csv"
        excel_to_csv_stream(INPUT_TABLE_PATH, tmp_csv_path, encoding=CSV_ENCODING, delimiter=CSV_DELIMITER)
        header = read_csv_header(tmp_csv_path, delimiter=CSV_DELIMITER)
        row_iter = iter_csv_rows(tmp_csv_path, delimiter=CSV_DELIMITER)
    else:
        # Fallback (not used if we always convert)
        raise RuntimeError('Excel direct streaming not configured. Enable CONVERT_EXCEL_TO_CSV.')
elif fmt == 'json':
    row_iter = iter_json_rows(INPUT_TABLE_PATH, records_is_lines=JSON_RECORDS_IS_LINES, array_field=JSON_ARRAY_FIELD)
    header = None  # will infer from the first matching row
else:
    raise ValueError(f'Unsupported format: {fmt}')

row_iter is not None


2025-09-02 15:17:10,494 | INFO | filter_key_val_data | Detected/selected input format: excel
2025-09-02 15:17:10,508 | INFO | filter_key_val_data | Converting Excel to CSV for streaming: 'D:\mock_api_TEST\filtered_rows.xlsx' -> 'C:\Users\jeeb\AppData\Local\Temp\filtered_rows_key_stream.csv'


True

## Stream rows in batches, filter by image prefixes, and write output CSV/XLSX

In [10]:
output_path = OUTPUT_FOLDER / OUTPUT_FILENAME
output_ext = output_path.suffix.lower()
written_header = False
matched_count = 0
scanned_count = 0
batches_processed = 0

# Prepare output handles lazily when we see first match (header may need to be inferred)
out_fh = None
writer = None
wb = None
ws = None

def ensure_outputs(fieldnames: List[str]):
    global writer, out_fh, wb, ws, written_header
    if (writer is None) and (wb is None):
        if output_ext == '.xlsx':
            op = _try_import_openpyxl()
            if not op:
                raise RuntimeError('openpyxl is required to write XLSX output')
            wb = op.Workbook()
            ws = wb.active
            ws.title = 'filtered'
            ws.append(fieldnames)
        else:
            mode = 'w'
            out_fh = open(output_path, mode, newline='', encoding=CSV_OUTPUT_ENCODING)
            writer = csv.DictWriter(out_fh, fieldnames=fieldnames, delimiter=CSV_DELIMITER)
            writer.writeheader()

def write_rows(rows: List[Dict[str, Any]], fieldnames: List[str]):
    if ws is not None:
        for row in rows:
            ws.append([row.get(col) for col in fieldnames])
    else:
        for row in rows:
            writer.writerow(row)

# Build a streaming of the normalized compare values paired with full row (dict)
def normalized_rows(iter_rows: Iterator[Dict[str, Any]]) -> Iterator[Tuple[str, Dict[str, Any]]]:
    for r in iter_rows:
        # Normalize value (strip whitespace and case-fold if CASE_INSENSITIVE), then align to prefix length
        key_val = normalize_value(r.get(COLUMN_NAME_TO_COMPARE), STRIP_WHITESPACE, CASE_INSENSITIVE)
        if MATCH_PREFIX_LEN is not None:
            try:
                key_val = key_val[: max(0, int(MATCH_PREFIX_LEN))]
            except Exception:
                key_val = key_val
        yield key_val, r

for i, batch in enumerate(_ensure_iterable_chunks(normalized_rows(row_iter), BATCH_SIZE), 1):
    batches_processed += 1
    scanned_count += len(batch)
    # Filter matches
    matches: List[Dict[str, Any]] = []
    for key_val, row in batch:
        if key_val in image_prefix_set:
            matches.append(row)
    if not matches:
        continue
    matched_count += len(matches)
    # Initialize outputs if needed
    if header is None:
        # infer from first match keys
        header = list(matches[0].keys())
    ensure_outputs(header)
    # Write matches
    write_rows(matches, header)
    LOGGER.info('Batch %d: scanned=%d, matched=%d (cumulative matched=%d)', i, len(batch), len(matches), matched_count)

# Finalize outputs
if writer is not None and out_fh is not None:
    out_fh.close()
if wb is not None:
    wb.save(str(output_path))

summary = {
    'batches_processed': batches_processed,
    'rows_scanned': scanned_count,
    'rows_matched': matched_count,
    'output_csv': str(output_path),
    'input_path': str(INPUT_TABLE_PATH),
    'image_path': str(IMAGE_PATH),
    'prefix_len': MATCH_PREFIX_LEN,
}
summary


{'batches_processed': 1,
 'rows_scanned': 26,
 'rows_matched': 0,
 'output_csv': 'D:\\real_data_key\\filtered_rows.xlsx',
 'input_path': 'D:\\mock_api_TEST\\filtered_rows.xlsx',
 'image_path': 'D:\\real_data_key\\images_test_key',
 'prefix_len': 17}

## Notes
- If your input is Excel and very large, ensure `openpyxl` is installed for efficient streaming conversion. The notebook will convert Excel to a temporary CSV to stream rows in batches.
- JSON inputs: set `JSON_RECORDS_IS_LINES=True` for NDJSON (one JSON object per line) or `JSON_ARRAY_FIELD` when the JSON file is an object with an embedded array.
- Matching is exact on the normalized values: row[COLUMN_NAME_TO_COMPARE] (optionally stripped and lowercased) must equal the image filename prefix (filename[:MATCH_PREFIX_LEN]) built in the same normalization mode.
- Output header for CSV/XLSX is derived from the input CSV header when applicable, otherwise inferred from the first matched row for JSON.