In [0]:
from datetime import datetime
import random

# -------- CONFIGURACIÓN --------
SOURCE_PATH = "/databricks-datasets/airlines/"  # origen
TARGET_BASE = "/tmp/airlines_sampled"           # base de destino
# Control de la muestra:
SAMPLE_COUNT = 100       # número de archivos a muestrear (prioriza si > 0)
SAMPLE_FRACTION = 0.02   # fracción de archivos a muestrear si SAMPLE_COUNT <= 0
OVERWRITE = True         # si el destino ya existe, lo limpia antes

# -------- helpers --------
def list_all_files_recursive(path):
    """Lista recursivamente todos los archivos (no directorios) bajo un path en DBFS."""
    result = []
    try:
        items = dbutils.fs.ls(path)
    except Exception as e:
        print(f"Error listando {path}: {e}")
        return result
    for item in items:
        if item.isDir():
            result.extend(list_all_files_recursive(item.path))
        else:
            result.append(item)
    return result

def ensure_clean_target(target):
    if OVERWRITE:
        try:
            dbutils.fs.rm(target, recurse=True)
        except Exception:
            pass
    dbutils.fs.mkdirs(target)

# -------- ejecución --------
all_files = list_all_files_recursive(SOURCE_PATH)
total_files = len(all_files)
if total_files == 0:
    raise RuntimeError(f"No se encontraron archivos en {SOURCE_PATH}")

# decidir tamaño de muestra
if SAMPLE_COUNT and SAMPLE_COUNT > 0:
    sample_n = min(SAMPLE_COUNT, total_files)
else:
    sample_n = max(1, int(total_files * SAMPLE_FRACTION))

sampled = random.sample(all_files, k=sample_n)

# carpeta destino con timestamp
timestamp = datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
target_dir = f"{TARGET_BASE}_airlines/"
ensure_clean_target(target_dir)

# copiar archivos seleccionados
print(f"Total de archivos disponibles: {total_files}")
print(f"Tomando muestra de {sample_n} archivos y copiando a {target_dir}")
copied = []
failed = []
for f in sampled:
    dest_path = target_dir + f.name
    try:
        dbutils.fs.cp(f.path, dest_path)
        copied.append(f.name)
    except Exception as e:
        failed.append((f.name, str(e)))

# resumen
print("\n--- Resumen ---")
print(f"Fuente: {SOURCE_PATH}")
print(f"Destino de la muestra: {target_dir}")
print(f"Archivos muestreados: {len(copied)}")
for name in copied:
    print(f"  ✔ {name}")
if failed:
    print(f"\nErrores al copiar {len(failed)} archivos:")
    for name, err in failed:
        print(f"  ✖ {name}: {err}")

# mostrar contenido del destino
print("\nContenido del directorio de muestra:")
display(dbutils.fs.ls(target_dir))

print(f"\nRuta final de muestra para usar en pipelines: {target_dir}")