In [None]:
from pathlib import Path

# GCP
PROJECT_ID   = "fifth-medley-468408-a2"
REGION       = "southamerica-east1"
ZONE         = "southamerica-east1-b"
BUCKET       = "bucket-mba-ricardo"
FOLDER       = "t4"
BASE_IN = f"gs://{BUCKET}/{FOLDER}"
BASE_OUT = f"gs://{BUCKET}/{FOLDER}/out"

# Local
LOCAL_FOLDER = 'dados'
DESNORM_LOCAL_FILES = {
    "base_desnormalizada_5mb.csv",
    "base_desnormalizada_50mb.csv",
    "base_desnormalizada_500mb.csv",
    "base_desnormalizada_1gb.csv"
}
NORM_LOCAL_FILES = {
    "base_normalizada_5mb/",
    "base_normalizada_50mb/",
    "base_normalizada_500mb/",
    "base_normalizada_1gb/"
}

# Job Execution
RUNS_CSV = Path(f"runs_{FOLDER}.csv")
REPS = 3
HAS_HEADER = True
MAX_WALLTIME_SECONDS = 3600

In [None]:
import os, re

def slugify(s: str) -> str:
    s = s.strip().lower().replace("_", "-")
    s = re.compile(r"[^a-z0-9-]+").sub("-", s)
    s = re.sub(r"-{2,}", "-", s).strip("-")
    return s[:63]

def dataset_tag_from_uri(uri: str) -> str:
    uri = uri.rstrip("/")
    base = os.path.basename(uri)
    stem = os.path.splitext(base)[0]
    return slugify(stem)

def build_job_name(experiment: str, variant: str, dataset_tag: str, run: int, ts) -> str:
    name = f"{experiment}-{variant}-{dataset_tag}-{ts}-{run}"
    return slugify(name)

def build_output_prefix(output_subdir: str, dataset_tag: str, job_name: str) -> str:
    return f"{BASE_OUT}/{output_subdir}/{dataset_tag}/{job_name}/part"

In [None]:
from google.cloud import storage

def prepare_gcs_inputs(
    project_id: str,
    bucket_name: str,
    folder: str,
    local_folder: str,
    files, 
    force_upload: bool = False,
    create_etl_folders: bool = True
):
    client = storage.Client(project=project_id)
    bucket = client.bucket(bucket_name)
    if not bucket.exists():
        raise RuntimeError(f"Bucket não encontrado: {bucket_name}")

    loc = (bucket.location or "").lower()

    if create_etl_folders:
        for etl_folder in (f"{folder}/temp/", f"{folder}/staging/", f"{folder}/out/"):
            bucket.blob(etl_folder + ".keep").upload_from_string("", content_type="text/plain")

    input_uris = []

    for item in files:
        local_path = Path(local_folder) / item
        if not local_path.exists():
            raise FileNotFoundError(f"Caminho local não encontrado: {local_path}")

        if local_path.is_dir():
            for subfile in local_path.rglob("*"):
                if subfile.is_file():
                    relative = subfile.relative_to(local_folder)
                    input_blob_path = f"{folder}/{relative.as_posix()}"
                    input_uri = f"gs://{bucket_name}/{input_blob_path}"
                    input_blob = bucket.blob(input_blob_path)

                    if input_blob.exists(client=client) and not force_upload:
                        print(f"Já existe no GCS: {input_uri}")
                    else:
                        ctype = "application/gzip" if subfile.suffix == ".gz" else "text/csv"
                        input_blob.upload_from_filename(subfile.as_posix(), content_type=ctype)
                        print(f"Enviado: {input_uri}")

                    input_uris.append(input_uri)

        else:
            input_blob_path = f"{folder}/{item}"
            input_uri = f"gs://{bucket_name}/{input_blob_path}"
            input_blob = bucket.blob(input_blob_path)

            if input_blob.exists(client=client) and not force_upload:
                print(f"Já existe no GCS: {input_uri}")
            else:
                ctype = "application/gzip" if item.endswith(".gz") else "text/csv"
                input_blob.upload_from_filename(local_path.as_posix(), content_type=ctype)
                print(f"Enviado: {input_uri}")

            input_uris.append(input_uri)

    ctx = {
        "INPUT_URI": input_uris,
        "TEMP_GCS":  f"gs://{bucket_name}/{folder}/temp",
        "STAGING_GCS": f"gs://{bucket_name}/{folder}/staging",
        "OUT_BASE":  f"gs://{bucket_name}/{folder}/out",
        "bucket_location": loc,
    }
    print("Prefixos prontos em:", ctx["TEMP_GCS"], ctx["STAGING_GCS"], ctx["OUT_BASE"])
    return ctx, input_uris


norm_ctx, NORM_INPUT_URIS = prepare_gcs_inputs(
    PROJECT_ID, BUCKET, FOLDER,
    local_folder=LOCAL_FOLDER,
    files=NORM_LOCAL_FILES
)
desnorm_ctx, DESNORM_INPUT_URIS = prepare_gcs_inputs(
    PROJECT_ID, BUCKET, FOLDER,
    local_folder=LOCAL_FOLDER,
    files=DESNORM_LOCAL_FILES
)
print(NORM_INPUT_URIS)
print(DESNORM_INPUT_URIS)

In [None]:
import pandas as pd, subprocess, datetime as dt

def sh(cmd):
    print(">>", cmd)
    result = subprocess.run(cmd, shell=True, text=True, capture_output=True)
    if result.returncode != 0:
        print(result.stdout)
        print(result.stderr)
        raise RuntimeError(f"Falhou: {cmd}")
    return result

def append_run(experiment, variant, job_name, start_ts, end_ts, extra=None):
    row = {
        "timestamp_utc": (dt.datetime.utcnow()).isoformat(timespec="seconds"),
        "experiment": experiment,
        "variant": variant,
        "job_name": job_name,
        "start_ts": start_ts.isoformat(timespec="seconds") if start_ts else None,
        "end_ts":   end_ts.isoformat(timespec="seconds")   if end_ts else None,
        "duration_s": (end_ts - start_ts).total_seconds() if (start_ts and end_ts) else None,
        **(extra or {})
    }
    df = pd.DataFrame([row])
    if RUNS_CSV.exists():
        base = pd.read_csv(RUNS_CSV)
        df = pd.concat([base, df], ignore_index=True)
    df.to_csv(RUNS_CSV, index=False)
    print("Job Registrado:", job_name)


In [None]:
import json, time

def convert_iso_do_dt(s):
    if not s:
        return None
    s = s.replace('Z', '+00:00')
    d = dt.datetime.fromisoformat(s)
    return d

def get_dataflow_job_id_by_name(job_name: str) -> str | None:
    command = f"gcloud dataflow jobs list --region={REGION} --format=json --filter=name={job_name}"
    out = sh(command).stdout
    arr = json.loads(out or "[]")
    return arr[0].get("id") if arr else None

def wait_dataflow_job(job_id: str, poll_s: int = 20):
    """Espera o job finalizar. Retorna (state, start_dt, end_dt)."""
    terminal = {"JOB_STATE_DONE","JOB_STATE_FAILED","JOB_STATE_CANCELLED","JOB_STATE_DRAINED"}
    while True:
        info = json.loads(sh(f"gcloud dataflow jobs describe {job_id} --region={REGION} --format=json").stdout or "{}")
        state = info.get("currentState")
        create_time = convert_iso_do_dt(info.get("createTime"))
        end_time    = convert_iso_do_dt(info.get("currentStateTime")) if state in terminal else None
        if state in terminal and end_time and create_time:
            return state, create_time, end_time
        time.sleep(poll_s)


In [None]:
from urllib.parse import urlparse
import gzip, re

def convert_local_to_gs(uri: str):
    assert uri.startswith("gs://")
    u = urlparse(uri.replace("gs://", "gs://dummy/"))
    return u.netloc, u.path[1:]

def count_records_any(uri: str, has_header=True) -> int:
    bkt, path = convert_local_to_gs(uri)
    client = storage.Client(project=PROJECT_ID)
    bucket = client.bucket(bkt)

    if "*" in path:
        prefix = path.split("*", 1)[0]
        blobs = [b for b in bucket.list_blobs(prefix=prefix) if re.fullmatch(path.replace("*", ".*"), b.name)]
    else:
        blobs = [bucket.blob(path)]
    
    total = 0
    for blob in blobs:
        with blob.open("rb") as base:
            reader = gzip.GzipFile(fileobj=base) if blob.name.endswith(".gz") else base
            while True:
                chunk = reader.read(1024 * 1024)
                if not chunk: break
                total += chunk.count(b"\n")
        if has_header and total > 0:
            total -= 1
    return max(total, 0)

In [None]:
# Values from southamerica-east1 - 2025-09
PRICE_BATCH_VCPU_HOUR   = 0.084     # USD/1 vCPU hour
PRICE_BATCH_MEMORY_GB_HOUR = 0.0053355  # USD/1 GB hour
PRICE_BATCH_SHUFFLE_GB  = 0.01747   # USD/1 GB

PRICE_FLEXRS_VCPU_HOUR   = 0.0534   # USD/1 vCPU hour
PRICE_FLEXRS_MEMORY_GB_HOUR = 0.0033889  # USD/1 GB hour
PRICE_FLEXRS_SHUFFLE_GB  = 0.01747  # USD/1 GB

def get_type_job(job_info: dict) -> str:
    job_type = job_info.get("type")
    env = job_info.get("environment", {}) or {}
    flexrs_goal = env.get("flexResourceSchedulingGoal")
    version_job_type = (env.get("version") or {}).get("job_type")
    if job_type == "JOB_TYPE_STREAMING":
        return "streaming"
    if version_job_type == "FNAPI_BATCH":
        return "serverless"
    if flexrs_goal:
        return "flexrs"
    return "batch"

def calcular_custos(project_id, region, job_id):

    # Describe
    describe_cmd = f"gcloud dataflow jobs describe {job_id} --region={region} --format=json --project={project_id}"
    job_info = json.loads(sh(describe_cmd).stdout)

    start = dt.datetime.fromisoformat(job_info["createTime"].replace("Z", "+00:00"))
    end   = dt.datetime.fromisoformat(job_info["currentStateTime"].replace("Z", "+00:00"))
    duration_s = (end - start).total_seconds()

    type_job = get_type_job(job_info)
    if type_job == "flexrs":
        price_vcpu_hour, price_memory_gb_hour, price_shuffle_gb = PRICE_FLEXRS_VCPU_HOUR, PRICE_FLEXRS_MEMORY_GB_HOUR, PRICE_FLEXRS_SHUFFLE_GB
    else:
        price_vcpu_hour, price_memory_gb_hour, price_shuffle_gb = PRICE_BATCH_VCPU_HOUR, PRICE_BATCH_MEMORY_GB_HOUR, PRICE_BATCH_SHUFFLE_GB

    # Resource metrics
    metrics_cmd = f"gcloud beta dataflow metrics list {job_id} --region={region} --format=json --project={project_id}"
    metrics_info = json.loads(sh(metrics_cmd).stdout or "[]")

    vcpu_hours = 0.0
    memory_gb_hours = 0.0
    pd_gb_hours = 0.0
    ssd_gb_hours = 0.0
    shuffle_gb = 0.0
    total_shuffle_gb = 0.0
    current_vcpus = None
    current_memory_gb = None
    records_processed = 0

    for m in metrics_info:
        name = (m.get("name") or {}).get("name")
        val = float(m.get("scalar", 0) or 0)

        if name == "TotalVcpuTime":
            vcpu_hours = val / 3600.0
        elif name == "TotalMemoryUsage":
            memory_gb_hours = (val / 1024.0) / 3600.0
        elif name == "TotalPdUsage":
            pd_gb_hours = val / 3600.0
        elif name == "TotalSsdUsage":
            ssd_gb_hours = val / 3600.0
        elif name == "BillableShuffleDataProcessed":
            shuffle_gb = val
        elif name == "TotalShuffleDataProcessed":
            total_shuffle_gb = val
        elif name == "CurrentVcpuCount":
            current_vcpus = int(val)
        elif name == "CurrentMemoryUsage":
            current_memory_gb = val / 1024.0
        elif name == "ElementCount":
            records_processed = max(records_processed, int(val))
        elif name in ("rows_in", "rows_out"):
            records_processed = max(records_processed, int(val))

    throughput = (records_processed / duration_s) if (duration_s and records_processed) else None
    cost_vcpu = vcpu_hours * price_vcpu_hour
    cost_memory = memory_gb_hours * price_memory_gb_hour
    cost_shuffle = shuffle_gb * price_shuffle_gb
    total_cost_usd = cost_vcpu + cost_memory + cost_shuffle
    unit_cost_usd_per_mm = (total_cost_usd / (records_processed / 1e6)) if records_processed else None

    return {
        "job_id": job_id,
        "type_job": type_job,
        "duration_s": duration_s,
        "vcpu_hours": vcpu_hours,
        "memory_gb_hours": memory_gb_hours,
        "pd_gb_hours": pd_gb_hours,
        "ssd_gb_hours": ssd_gb_hours,
        "shuffle_gb": shuffle_gb,
        "total_shuffle_gb": total_shuffle_gb,
        "current_vcpus": current_vcpus,
        "current_memory_gb": current_memory_gb,
        "records_processed": records_processed,
        "throughput_rps": throughput,
        "cost_vcpu_usd": cost_vcpu,
        "cost_memory_usd": cost_memory,
        "cost_shuffle_usd": cost_shuffle,
        "total_cost_usd": total_cost_usd,
        "unit_cost_usd_per_mm": unit_cost_usd_per_mm,
    }


In [None]:
# Run Dataflow

def apply_limit(s: str) -> str:
    s = s.lower()
    s = re.sub(r'[^a-z0-9_-]', '-', s)
    return s[:63]

def run_dataflow(
    experiment: str,
    variant: str,
    run_id: int,
    input_uri: str,
    output_subdir: str,
    aditional_flags: list[str] | None = None,
    autoscaling_algorithm: str | None = "THROUGHPUT_BASED",
    num_workers: int | None = 1,
    max_num_workers: int | None = 4,
    worker_machine_type: str | None = "n2-standard-2",
    iterations: int | None = None,
    flexrs_goal: str | None = None, # "SPEED_OPTIMIZED"|"COST_OPTIMIZED"
):

    dataset_tag = dataset_tag_from_uri(input_uri)
    timestamp  = (dt.datetime.utcnow()).strftime('%Y%m%d-%H%M%S')
    job = build_job_name(experiment.lower(), variant, dataset_tag, timestamp, run_id)
    output_prefix = build_output_prefix(output_subdir, dataset_tag, job)

    label_flags = [
        f'--labels=experiment={apply_limit(experiment)}',
        f'--labels=variant={apply_limit(variant)}',
        f'--labels=run={apply_limit(str(run_id))}',
        f'--labels=folder={apply_limit(FOLDER)}',
        f"--labels=dataset={apply_limit(dataset_tag)}",
        f'--labels=job={apply_limit(job)}',
    ]

    flags = [
        f'--runner=DataflowRunner',
        f'--project={PROJECT_ID}',
        f'--region={REGION}',
        f"--temp_location=gs://{BUCKET}/{FOLDER}/temp",
        f"--staging_location=gs://{BUCKET}/{FOLDER}/staging",
        f'--job_name={job}',
        f'--input={input_uri}',
        f'--output={output_prefix}',
        f'--autoscaling_algorithm={autoscaling_algorithm}',
        f"--num_workers={num_workers}",
        f"--max_num_workers={max_num_workers}",
        f"--worker_machine_type={worker_machine_type}",
        f'--no_pipeline_monitoring',
        f'--dataflow_service_options=max_workflow_runtime_walltime_seconds={MAX_WALLTIME_SECONDS}',
        *label_flags
    ]

    if HAS_HEADER:
        flags += [f'--has_header']
    if iterations:
        flags += [f'--iterations={iterations}']
    if flexrs_goal:
        flags += [f'--flexrs_goal={flexrs_goal}']

    if aditional_flags:    
        flags += aditional_flags

    cmd = "python mbapipeline.py " + " ".join(flags)
    sh(cmd)

    job_id = get_dataflow_job_id_by_name(job)
    if not job_id:
        raise RuntimeError("Não foi possível obter o job_id pelo nome.")
    state, t0, t1 = wait_dataflow_job(job_id)

    duration_s = (t1 - t0).total_seconds()

    try:
        metrics_cost = calcular_custos(PROJECT_ID, REGION, job_id)
    except Exception as e:
        print("Falha ao calcular as métricas", e)
        metrics_cost = {}

    append_run(experiment, variant, job, t0, t1, {
        "runner": "dataflow",
        "job_id": job_id,
        "state": state,
        "duration_s": duration_s,
        "worker_type": worker_machine_type,
        "num_workers": num_workers,
        "max_num_workers": max_num_workers,
        "flexrs_goal": flexrs_goal,
        "out_prefix": output_prefix,
        "input_uri": input_uri,
        "dataset_tag": dataset_tag,
        **metrics_cost
    })
    return job, output_prefix, job_id

In [None]:
NORM_INPUT_URIS = [
    "gs://bucket-mba-ricardo/t4/base_normalizada_5mb/",
    "gs://bucket-mba-ricardo/t4/base_normalizada_500mb/",
    "gs://bucket-mba-ricardo/t4/base_normalizada_1gb/",
    "gs://bucket-mba-ricardo/t4/base_normalizada_50mb/"
]

In [None]:
print(DESNORM_INPUT_URIS)
print(NORM_INPUT_URIS)

In [None]:
for uri in DESNORM_INPUT_URIS:
    for i in range(1, REPS + 1):
        run_dataflow(
            "T4", "desnormalizada", i,
            input_uri=uri,
            output_subdir="t4-desnormalizada",
            num_workers=1, max_num_workers=4,
            worker_machine_type="n2-standard-4",
            iterations=100,
        )

In [None]:
for uri in NORM_INPUT_URIS:
    for i in range(1, REPS + 1):
        run_dataflow(
            "T4", "normalizada", i,
            input_uri=uri,
            output_subdir="t4-normalizada",
            num_workers=1, max_num_workers=4,
            worker_machine_type="n2-standard-4",
            iterations=100,
        )