In [3]:
import os
os.chdir('/home/sagemaker-user/ml-ops/lab3')
os.listdir()

['.env-sample',
 '.gitignore',
 'Dockerfile',
 'Dockerfile.airflow',
 'Dockerfile.mlflow',
 'Dockerfile.ms',
 'Dockerfile.train',
 'README.md',
 'dags',
 'data',
 'docker-compose.yaml',
 'requirements-airflow.txt',
 'requirements.txt',
 'scripts',
 'src',
 'sm']

In [4]:
import importlib
import src.sm.processing.data_prep as dp

importlib.reload(dp)

dv = dp.build_training_dataset(data_version="debug")
dv

TypeError: build_training_dataset() missing 2 required positional arguments: 'input_dir' and 'output_dir'

In [1]:
# ETL 
import os
import sagemaker
from datetime import datetime, UTC
import uuid
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput

def generate_run_id(prefix: str = "run") -> str:
    """Generate a unique run_id using UTC timestamp + short UUID."""
    timestamp = datetime.now(UTC).strftime("%Y-%m-%d-%H-%M-%S")
    short_uuid = uuid.uuid4().hex[:6]
    return f"{prefix}-{timestamp}-{short_uuid}"

# SageMaker session and execution role
sess = sagemaker.Session()
role = sagemaker.get_execution_role()

# Explicit project bucket (no default bucket)
BUCKET = "mlops-project-sm"

# Project S3 root prefix (your objects live under s3://bucket/data/...)
S3_ROOT = "data"
RAW_PREFIX = f"{S3_ROOT}/raw"
PROCESSED_PREFIX = f"{S3_ROOT}/processed"

# Generate run_id for this processing run
run_id = generate_run_id("banking-prep")
print("Using run_id:", run_id)

# Input must point to a NON-empty S3 prefix
raw_input_s3 = f"s3://{BUCKET}/{RAW_PREFIX}/"

# Output for this run_id
processed_output_s3 = f"s3://{BUCKET}/{PROCESSED_PREFIX}/runs/{run_id}/"

# Absolute path to your processing script
script_path = os.path.abspath("processing/data_prep.py")
reqs_path = os.path.abspath("processing/requirements.txt")

print("Script path:", script_path)

processor = SKLearnProcessor(
    framework_version="1.2-1",
    role=role,
    instance_type="ml.t3.medium",
    instance_count=1,
    base_job_name="banking-data-prep",
    sagemaker_session=sess,
)

processor.run(
    code=script_path,
    arguments=[
        "--data_version", "auto",
        "--input_dir", "/opt/ml/processing/input",
        "--output_dir", "/opt/ml/processing/output",
        "--run_id", run_id,
    ],
    inputs=[
        ProcessingInput(
            source=raw_input_s3,
            destination="/opt/ml/processing/input",
        )
    ],
    outputs=[
        ProcessingOutput(
            source="/opt/ml/processing/output",
            destination=processed_output_s3,
        )
    ],
    logs=True,
)


sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml
Using run_id: banking-prep-2025-12-29-17-05-46-7943a9
Script path: /home/sagemaker-user/ml-ops/lab3/src/sm/processing/data_prep.py


INFO:sagemaker:Creating processing-job with name banking-data-prep-2025-12-29-17-05-46-299


...............[34m>>> INPUT_DIR:  /opt/ml/processing/input[0m
[34m>>> OUTPUT_DIR: /opt/ml/processing/output[0m
[34m>>> DATA VERSION: banking-prep-2025-12-29-17-05-46-7943a9[0m
[34m>>> Loading GOLDEN train from: /opt/ml/processing/input/historical/train.csv
    GOLDEN train shape: (10003, 2)[0m
[34m>>> Loading GOLDEN test  from: /opt/ml/processing/input/historical/test.csv
    GOLDEN test shape:  (3080, 2)[0m
[34m>>> FEEDBACK inference dir:   /opt/ml/processing/input/logs/inference[0m
[34m>>> FEEDBACK corrections dir: /opt/ml/processing/input/logs/corrections
    Found inference JSONL files:   13
    Found corrections JSONL files: 3
    Reading /opt/ml/processing/input/logs/inference/2025-12-22/batch_231355_dc83da00-77aa-4467-8d8d-8f6098bf9ad1.jsonl
    Reading /opt/ml/processing/input/logs/inference/2025-12-22/0017b824-1d6c-43d1-ad53-0f9ee619c385.jsonl
    Reading /opt/ml/processing/input/logs/inference/2025-12-22/003f4307-3740-4b25-bc00-095ac31cc7d7.jsonl
    Reading /op

In [10]:
import sagemaker
sess = sagemaker.Session()
print("SageMaker region:", sess.boto_region_name)

SageMaker region: us-east-1


In [10]:
import boto3
import re

BUCKET = "mlops-project-sm"

def resolve_data_uris(data_version: str | None):
    """
    data_version:
      - None / "latest" => use processed/latest/*
      - otherwise       => use processed/runs/<data_version>/*
    """
    if not data_version or data_version == "latest":
        train_s3 = f"s3://{BUCKET}/data/processed/latest/train_latest.parquet"
        test_s3  = f"s3://{BUCKET}/data/processed/latest/test_latest.parquet"
        return train_s3, test_s3, "latest"

    # Assume it's a run_id
    train_s3 = f"s3://{BUCKET}/data/processed/runs/{data_version}/train.parquet"
    test_s3  = f"s3://{BUCKET}/data/processed/runs/{data_version}/test.parquet"
    return train_s3, test_s3, data_version

DATA_VERSION = "latest"  # or конкретный run_id
train_s3, test_s3, effective_version = resolve_data_uris(DATA_VERSION)
print("Using data_version:", effective_version)
print(train_s3)
print(test_s3)


Using data_version: latest
s3://mlops-project-sm/data/processed/latest/train_latest.parquet
s3://mlops-project-sm/data/processed/latest/test_latest.parquet


In [11]:
import boto3

def get_latest_run_id():
    s3 = boto3.client("s3")
    prefix = "data/processed/runs/"

    paginator = s3.get_paginator("list_objects_v2")
    run_ids = set()

    for page in paginator.paginate(Bucket=BUCKET, Prefix=prefix, Delimiter="/"):
        for cp in page.get("CommonPrefixes", []):
            # e.g. data/processed/runs/banking-prep-2025-12-25-.../
            run_prefix = cp["Prefix"]
            run_id = run_prefix[len(prefix):].strip("/")

            # Optional: filter only your runs
            if run_id.startswith("banking-prep-"):
                run_ids.add(run_id)

    if not run_ids:
        raise RuntimeError(f"No runs found under s3://{BUCKET}/{prefix}")

    # Your run_id starts with timestamp; lexical sort works if format is consistent.
    # If not consistent, better sort by LastModified of a known file (more complex).
    return sorted(run_ids)[-1]

DATA_VERSION = None  # None => auto
if DATA_VERSION is None:
    DATA_VERSION = get_latest_run_id()

train_s3, test_s3, effective_version = resolve_data_uris(DATA_VERSION)
print("Auto-selected run_id:", effective_version)
print("Using data_version:", effective_version)
print(train_s3)
print(test_s3)


Auto-selected run_id: banking-prep-2025-12-29-17-05-46-7943a9
Using data_version: banking-prep-2025-12-29-17-05-46-7943a9
s3://mlops-project-sm/data/processed/runs/banking-prep-2025-12-29-17-05-46-7943a9/train.parquet
s3://mlops-project-sm/data/processed/runs/banking-prep-2025-12-29-17-05-46-7943a9/test.parquet


In [13]:
# Train
import sagemaker
from sagemaker.sklearn.estimator import SKLearn
from sagemaker.inputs import TrainingInput
from sagemaker.network import NetworkConfig
from datetime import datetime
import uuid
from pathlib import Path
import os


def generate_run_id(prefix="train"):
    ts = datetime.utcnow().strftime("%Y-%m-%d-%H-%M-%S")
    m = re.search(r"(\d{4}-\d{2}-\d{2}-\d{2}-\d{2}-\d{2}-[a-f0-9]+)$", effective_version)
    if not m:
        raise ValueError(f"Cannot parse data version from: {s}")

    data_version = m.group(1)
    return f"{prefix}-{data_version}-{ts}"


sess = sagemaker.Session()
role = sagemaker.get_execution_role()
print(role)

BUCKET = "mlops-project-sm"
run_id = generate_run_id("banking-train")
print("Training run_id:", run_id)
print("Using data_version:", effective_version)
print("Train S3:", train_s3)
print("Test  S3:", test_s3)

# Local paths in Studio
entry_point = os.path.abspath("training/train.py")
reqs_local = os.path.abspath("training/requirements.txt")

print("Entry point:", entry_point)
print("Reqs:", reqs_local)

metric_definitions = [
    {"Name": "accuracy", "Regex": r"\[METRIC\]\s+accuracy=([0-9]*\.?[0-9]+)"},
    {"Name": "f1_weighted", "Regex": r"\[METRIC\]\s+f1_weighted=([0-9]*\.?[0-9]+)"},
]

MLFLOW_TRACKING_URI = "http://mlflow:uWUeXJfpA2w6dkry@34.205.81.69"
MLFLOW_EXPERIMENT = "banking-support-classifier"


# net = NetworkConfig(
#     subnets=["subnet-00597ad7ed124d785", "subnet-0d4ff2e37f7573eb6"],
#     security_group_ids=["sg-0063e8fdc77aae1fe"],
# )

estimator = SKLearn(
    entry_point=entry_point,
    role=role,
    # network_config=net,
    framework_version="1.2-1",
    instance_type="ml.m5.large",
    instance_count=1,
    base_job_name="banking-training",
    sagemaker_session=sess,
    metric_definitions=metric_definitions,
    use_spot_instances=True,
    max_wait=3600,
    max_run=1800,
    hyperparameters={
        "data_version": effective_version,
        "requirements": "/opt/ml/input/data/requirements/requirements.txt",
        "train_s3": train_s3,
        "test_s3": test_s3,
        "train_file": "train.parquet",
        "test_file": "test.parquet",
        "max_features": 50000,
        "C": 2.0,
        "mlflow_tracking_uri": MLFLOW_TRACKING_URI,
        "mlflow_experiment": MLFLOW_EXPERIMENT,
        "mlflow_run_name": run_id,
    },
)

inputs = {
    "train": TrainingInput(train_s3, content_type="application/x-parquet"),
    "test": TrainingInput(test_s3, content_type="application/x-parquet"),
}

# Upload requirements.txt to S3 (simple and reliable)
reqs_s3_prefix = f"code/training/requirements/{run_id}"
reqs_s3_uri = sess.upload_data(path=reqs_local, bucket=BUCKET, key_prefix=reqs_s3_prefix)
print("Uploaded requirements to:", reqs_s3_uri)

inputs["requirements"] = TrainingInput(reqs_s3_uri, content_type="text/plain")

# Launch training
estimator.fit(inputs=inputs, job_name=run_id, wait=True, logs=True)


  ts = datetime.utcnow().strftime("%Y-%m-%d-%H-%M-%S")
INFO:sagemaker.telemetry.telemetry_logging:SageMaker Python SDK will collect telemetry to help us better understand our user's needs, diagnose issues, and deliver additional features.
To opt out of telemetry, please disable via TelemetryOptOut parameter in SDK defaults config. For more information, refer to https://sagemaker.readthedocs.io/en/stable/overview.html#configuring-and-using-defaults-with-the-sagemaker-python-sdk.


arn:aws:iam::191072691166:role/ml-ops-SageMaker-ExecutionRole
Training run_id: banking-train-2025-12-29-17-05-46-7943a9-2026-01-01-04-41-58
Using data_version: banking-prep-2025-12-29-17-05-46-7943a9
Train S3: s3://mlops-project-sm/data/processed/runs/banking-prep-2025-12-29-17-05-46-7943a9/train.parquet
Test  S3: s3://mlops-project-sm/data/processed/runs/banking-prep-2025-12-29-17-05-46-7943a9/test.parquet
Entry point: /home/sagemaker-user/ml-ops/lab3/src/sm/training/train.py
Reqs: /home/sagemaker-user/ml-ops/lab3/src/sm/training/requirements.txt
Uploaded requirements to: s3://mlops-project-sm/code/training/requirements/banking-train-2025-12-29-17-05-46-7943a9-2026-01-01-04-41-58/requirements.txt


INFO:sagemaker:Creating training-job with name: banking-train-2025-12-29-17-05-46-7943a9-2026-01-01-04-41-58


2026-01-01 04:42:00 Starting - Starting the training job...
2026-01-01 04:42:15 Starting - Preparing the instances for training...
2026-01-01 04:42:37 Downloading - Downloading input data...
2026-01-01 04:43:18 Downloading - Downloading the training image......
  import pkg_resources[0m
[34m2026-01-01 04:44:22,423 sagemaker-containers INFO     Imported framework sagemaker_sklearn_container.training[0m
[34m2026-01-01 04:44:22,428 sagemaker-training-toolkit INFO     No GPUs detected (normal if no gpus installed)[0m
[34m2026-01-01 04:44:22,432 sagemaker-training-toolkit INFO     No Neurons detected (normal if no neurons installed)[0m
[34m2026-01-01 04:44:22,459 sagemaker_sklearn_container.training INFO     Invoking user training script.[0m
[34m2026-01-01 04:44:22,840 sagemaker-training-toolkit INFO     No GPUs detected (normal if no gpus installed)[0m
[34m2026-01-01 04:44:22,844 sagemaker-training-toolkit INFO     No Neurons detected (normal if no neurons installed)[0m
[34m2