# [0] DEV

In [2]:
import sys
import os

os.environ["CUDA_VISIBLE_DEVICES"] = "0"

# Simulate only the first two CLI arguments
sys.argv = [
    'main_train.py',  # This would be the name of your script
    '--hp', 'yamls/shapenetcar/upt/dim768_seq1024sdf512_cnext_lr5e4_sd02_reprcnn_grn_grid32.yaml',
    '--devices', '0'
]

#### TRAIN STAGE

In [3]:
import logging
import os
from pathlib import Path

import kappaprofiler as kp
import yaml
from torch.distributed import broadcast_object_list
from wandb.util import generate_id

from callbacks.base.callback_base import CallbackBase
from configs.cli_args import CliArgs
from configs.static_config import StaticConfig
from configs.wandb_config import WandbConfig
from datasets import dataset_from_kwargs
from datasets.dummy_dataset import DummyDataset
from distributed.config import is_rank0, is_distributed, get_rank, log_distributed_config
from models import model_from_kwargs
from models.dummy_model import DummyModel
from providers.dataset_config_provider import DatasetConfigProvider
from providers.path_provider import PathProvider
from summarizers.stage_summarizers import stage_summarizer_from_kwargs
from summarizers.summary_summarizers import summary_summarizer_from_kwargs
from trainers import trainer_from_kwargs
from utils.commands import command_from_kwargs
from utils.data_container import DataContainer
from utils.kappaconfig.util import save_unresolved_hp, save_resolved_hp, log_stage_hp
from utils.logging_util import add_global_handlers
from utils.memory_leak_util import get_tensors_in_memory
from utils.seed import set_seed, get_random_int
from utils.system_info import log_system_info, get_cli_command
from utils.version_check import check_versions
from utils.wandb_utils import init_wandb, finish_wandb


def train_stage(stage_hp: dict, static_config: StaticConfig, cli_args: CliArgs, device: str):
    # set environment variables
    for key, value in stage_hp.get("env", {}).items():
        os.environ[key] = value if isinstance(value, str) else str(value)

    # resume
    if cli_args.resume_stage_id is not None:
        assert "initializer" not in stage_hp["trainer"]
        if cli_args.resume_checkpoint is None:
            checkpoint = "latest"
        elif cli_args.resume_checkpoint.startswith("E"):
            checkpoint = dict(epoch=int(cli_args.resume_checkpoint[1:]))
        elif cli_args.resume_checkpoint.startswith("U"):
            checkpoint = dict(update=int(cli_args.resume_checkpoint[1:]))
        elif cli_args.resume_checkpoint.startswith("S"):
            checkpoint = dict(sample=int(cli_args.resume_checkpoint[1:]))
        else:
            # any checkpoint (like cp=last or cp=best.accuracy1.test.main)
            checkpoint = cli_args.resume_checkpoint
        stage_hp["trainer"]["initializer"] = dict(
            kind="resume_initializer",
            stage_id=cli_args.resume_stage_id,
            checkpoint=checkpoint,
        )

    # retrieve stage_id from hp (allows queueing up dependent stages by hardcoding stage_ids in the yamls) e.g.:
    # - pretrain MAE with stageid abcdefgh
    # - finetune MAE where the backbone is initialized with the backbone from stage_id abcdefgh
    stage_id = stage_hp.get("stage_id", None)
    # generate stage_id and sync across devices
    if stage_id is None:
        stage_id = generate_id()
        if is_distributed():
            object_list = [stage_id] if is_rank0() else [None]
            broadcast_object_list(object_list)
            stage_id = object_list[0]
    stage_name = stage_hp.get("stage_name", "default_stage")

    # initialize logging
    path_provider = PathProvider(
        output_path=static_config.output_path,
        model_path=static_config.model_path,
        stage_name=stage_name,
        stage_id=stage_id,
        temp_path=static_config.temp_path,
    )
    message_counter = add_global_handlers(log_file_uri=path_provider.logfile_uri)

    # init seed
    run_name = cli_args.name or stage_hp.pop("name", None)
    seed = stage_hp.pop("seed", None)
    if seed is None:
        seed = 0
        logging.info(f"no seed specified -> using seed={seed}")

    # initialize wandb
    wandb_config_uri = stage_hp.pop("wandb", None)
    if wandb_config_uri == "disabled":
        wandb_mode = "disabled"
    else:
        wandb_mode = cli_args.wandb_mode or static_config.default_wandb_mode
    if wandb_mode == "disabled":
        wandb_config_dict = {}
        if cli_args.wandb_config is not None:
            logging.warning(f"wandb_config is defined via CLI but mode is disabled -> wandb_config is not used")
        if wandb_config_uri is not None:
            logging.warning(f"wandb_config is defined via yaml but mode is disabled -> wandb_config is not used")
    else:
        # retrieve wandb config from yaml
        if wandb_config_uri is not None:
            wandb_config_uri = Path("wandb_configs") / wandb_config_uri
            if cli_args.wandb_config is not None:
                logging.warning(f"wandb_config is defined via CLI and via yaml -> wandb_config from yaml is used")
        # retrieve wandb config from --wandb_config cli arg
        elif cli_args.wandb_config is not None:
            wandb_config_uri = Path("wandb_configs") / cli_args.wandb_config
        # use default wandb_config file
        else:
            wandb_config_uri = Path("wandb_config.yaml")
        with open(wandb_config_uri.with_suffix(".yaml")) as f:
            wandb_config_dict = yaml.safe_load(f)
    wandb_config = WandbConfig(mode=wandb_mode, **wandb_config_dict)
    config_provider, summary_provider = init_wandb(
        device=device,
        run_name=run_name,
        stage_hp=stage_hp,
        wandb_config=wandb_config,
        path_provider=path_provider,
        account_name=static_config.account_name,
        tags=stage_hp.pop("tags", None),
        notes=stage_hp.pop("notes", None),
        group=stage_hp.pop("group", None),
        group_tags=stage_hp.pop("group_tags", None),
    )
    # log codebase "high-level" version name (git commit is logged anyway)
    config_provider["code/mlp"] = "CVSim"
    config_provider["code/tag"] = os.popen("git describe --abbrev=0").read().strip()
    config_provider["code/name"] = "initial"

    # log setup
    logging.info("------------------")
    logging.info(f"stage_id: {stage_id}")
    logging.info(get_cli_command())
    check_versions(verbose=True)
    log_system_info()
    static_config.log()
    cli_args.log()
    log_distributed_config()
    log_stage_hp(stage_hp)
    if is_rank0():
        save_unresolved_hp(cli_args.hp, path_provider.stage_output_path / "hp_unresolved.yaml")
        save_resolved_hp(stage_hp, path_provider.stage_output_path / "hp_resolved.yaml")

    logging.info("------------------")
    logging.info(f"training stage '{path_provider.stage_name}'")
    if is_distributed():
        # using a different seed for every rank to ensure that stochastic processes are different across ranks
        # for large batch_sizes this shouldn't matter too much
        # this is relevant for:
        # - augmentations (augmentation parameters of sample0 of rank0 == augparams of sample0 of rank1 == ...)
        # - the masks of a MAE are the same for every rank
        # NOTE: DDP syncs the parameters in its __init__ method -> same initial parameters independent of seed
        seed += get_rank()
        logging.info(f"using different seeds per process (seed+rank) ")
    set_seed(seed)

    # init datasets
    logging.info("------------------")
    logging.info("initializing datasets")
    datasets = {}
    dataset_config_provider = DatasetConfigProvider(
        global_dataset_paths=static_config.get_global_dataset_paths(),
        local_dataset_path=static_config.get_local_dataset_path(),
        data_source_modes=static_config.get_data_source_modes(),
    )
    if "datasets" not in stage_hp:
        logging.info(f"no datasets found -> initialize dummy dataset")
        datasets["train"] = DummyDataset(
            size=256,
            x_shape=(2,),
            n_classes=2,
        )
    else:
        for dataset_key, dataset_kwargs in stage_hp["datasets"].items():
            logging.info(f"initializing {dataset_key}")
            datasets[dataset_key] = dataset_from_kwargs(
                dataset_config_provider=dataset_config_provider,
                path_provider=path_provider,
                **dataset_kwargs,
            )
    data_container_kwargs = {}
    if "prefetch_factor" in stage_hp:
        data_container_kwargs["prefetch_factor"] = stage_hp.pop("prefetch_factor")
    if "max_num_workers" in stage_hp:
        data_container_kwargs["max_num_workers"] = stage_hp.pop("max_num_workers")
    data_container = DataContainer(
        **datasets,
        num_workers=cli_args.num_workers,
        pin_memory=cli_args.pin_memory,
        config_provider=config_provider,
        seed=get_random_int(),
        **data_container_kwargs,
    )

    # init trainer
    logging.info("------------------")
    logging.info("initializing trainer")
    trainer_kwargs = {}
    if "max_batch_size" in stage_hp:
        trainer_kwargs["max_batch_size"] = stage_hp.pop("max_batch_size")
    trainer = trainer_from_kwargs(
        data_container=data_container,
        device=device,
        sync_batchnorm=cli_args.sync_batchnorm or static_config.default_sync_batchnorm,
        config_provider=config_provider,
        summary_provider=summary_provider,
        path_provider=path_provider,
        **stage_hp["trainer"],
        **trainer_kwargs,
    )
    # register datasets of callbacks (e.g. for ImageNet-C the dataset never changes so its pointless to specify)
    for callback in trainer.callbacks:
        callback.register_root_datasets(
            dataset_config_provider=dataset_config_provider,
            is_mindatarun=cli_args.testrun or cli_args.mindatarun,
        )

    # init model
    logging.info("------------------")
    logging.info("creating model")
    if "model" not in stage_hp:
        logging.info(f"no model defined -> use dummy model")
        model = DummyModel(
            input_shape=trainer.input_shape,
            output_shape=trainer.output_shape,
            update_counter=trainer.update_counter,
            path_provider=path_provider,
            is_frozen=True,
        )
    else:
        model = model_from_kwargs(
            **stage_hp["model"],
            input_shape=trainer.input_shape,
            output_shape=trainer.output_shape,
            update_counter=trainer.update_counter,
            path_provider=path_provider,
            data_container=data_container,
        )
    
    logging.info(f"model architecture:\n{model}")
    # moved to trainer as initialization on cuda is different than on cpu
    # model = model.to(stage_config.run_config.device)

    # train model
    trainer.train_model(model)

    # finish callbacks
    CallbackBase.finish()

    # summarize logvalues
    logging.info("------------------")
    logging.info(f"summarize logvalues")
    summary_provider.summarize_logvalues()

    # summarize stage
    if "stage_summarizers" in stage_hp and is_rank0():
        logging.info("------------------")
        logging.info("summarize stage")
        for kwargs in stage_hp["stage_summarizers"]:
            summarizer = stage_summarizer_from_kwargs(
                summary_provider=summary_provider,
                path_provider=path_provider,
                **kwargs,
            )
            summarizer.summarize()
    # summarize summary
    if "summary_summarizers" in stage_hp and is_rank0():
        summary_provider.flush()
        logging.info("------------------")
        for kwargs in stage_hp["summary_summarizers"]:
            summary_summarizer = summary_summarizer_from_kwargs(
                summary_provider=summary_provider,
                **kwargs,
            )
            summary_summarizer.summarize()
    summary_provider.flush()

    # add profiling times to summary_provider
    def try_log_profiler_time(summary_key, profiler_query):
        try:
            summary_provider[summary_key] = kp.profiler.get_node(profiler_query).total_time
        except AssertionError:
            pass

    try_log_profiler_time("profiler/train", "train")
    try_log_profiler_time("profiler/train/iterator", "train.iterator")
    try_log_profiler_time("profiler/train/data_loading", "train.data_loading")
    try_log_profiler_time("profiler/train/update", "train.update")
    try_log_profiler_time("profiler/train/to_device", "train.update.forward.to_device")
    try_log_profiler_time("profiler/train/forward", "train.update.forward")
    try_log_profiler_time("profiler/train/backward", "train.update.backward")
    summary_provider.flush()
    # log profiler times
    logging.info(f"full profiling times:\n{kp.profiler.to_string()}")
    kp.reset()

    # execute commands
    if "on_finish" in stage_hp and is_rank0():
        logging.info("------------------")
        logging.info("ON_FINISH COMMANDS")
        for command in stage_hp["on_finish"]:
            command = command_from_kwargs(**command, stage_id=stage_id)
            # noinspection PyBroadException
            try:
                command.execute()
            except:
                logging.exception(f"failed to execute {command}")

    # cleanup
    logging.info("------------------")
    logging.info(f"CLEANUP")
    data_container.dispose()
    message_counter.log()
    finish_wandb(wandb_config)

    # log how many tensors remain to be aware of potential memory leaks
    all_tensors, cuda_tensors = get_tensors_in_memory()
    logging.info("------------------")
    logging.info(f"{len(all_tensors)} tensors remaining in memory (cpu+gpu)")
    logging.info(f"{len(all_tensors) - len(cuda_tensors)} tensors remaining in memory (cpu)")
    logging.info(f"{len(cuda_tensors)} tensors remaining in memory (gpu)")

#### MAIN TRAIN

In [4]:
from utils.version_check import check_versions

check_versions(verbose=False)

import logging
import os

import kappaprofiler as kp
import torch

from configs.cli_args import parse_run_cli_args
from configs.static_config import StaticConfig
from distributed.config import barrier, get_rank, get_local_rank, get_world_size, is_managed
from distributed.run import run_single_or_multiprocess, run_managed
#from train_stage import train_stage
from utils.kappaconfig.util import get_stage_hp
from utils.logging_util import add_global_handlers, log_from_all_ranks
from utils.pytorch_cuda_timing import cuda_start_event, cuda_end_event

from main_train import main_single

def main():
    # parse cli_args immediately for fast cli_args validation
    cli_args = parse_run_cli_args()
    static_config = StaticConfig(uri="static_config.yaml", datasets_were_preloaded=cli_args.datasets_were_preloaded)
    # initialize loggers for setup (seperate id)
    add_global_handlers(log_file_uri=None)

    if is_managed():
        run_managed(
            accelerator=cli_args.accelerator,
            devices=cli_args.devices,
            main_single=main_single,
        )
    else:
        run_single_or_multiprocess(
            accelerator=cli_args.accelerator,
            devices=cli_args.devices,
            main_single=main_single,
            master_port=cli_args.master_port or static_config.master_port,
            mig_devices=static_config.mig_config,
        )

executable: /home/ubuntu/UPT/upt_venv/bin/python
executable: /home/ubuntu/UPT/upt_venv/bin/python


### Set Up CLI Args

In [5]:
main()

11-10 18:57:49 I ------------------
11-10 18:57:49 I running single process training
11-10 18:57:49 I device 0: NVIDIA A10G (id=0)
11-10 18:57:49 I initialized process rank=0 local_rank=0 pid=3082396
11-10 18:57:49 I initialized 1 processes
11-10 18:57:49 I initialized profiler to call sync cuda
11-10 18:57:50 I log file: results/stage1/xsm5phdu/log.txt
11-10 18:57:50 I no seed specified -> using seed=0
11-10 18:57:50 I ------------------
11-10 18:57:50 I initializing wandb (mode=online)


[34m[1mwandb[0m: Using wandb-core as the SDK backend. Please refer to https://wandb.me/wandb-core for more information.

KeyboardInterrupt



### Disect the Train Stage

In [6]:
import logging
import os
import platform

import psutil
import torch
import yaml
from torch.distributed import init_process_group, destroy_process_group, barrier
from torch.multiprocessing import spawn

#from .config import (
#    is_managed,
#    get_world_size_from_env,
#    get_rank_from_env,
#    get_local_rank,
#    is_custom_managed_run,
#    is_mpi_managed_run,
#    get_nodes,
#)

def run_managed(accelerator, devices, main_single):
    assert is_managed()
    if accelerator == "gpu":
        # custom managed run doesn't set CUDA_VISIBLE_DEVICES
        if is_custom_managed_run() or is_mpi_managed_run() or len(os.environ["CUDA_VISIBLE_DEVICES"].split(",")) > 1:
            os.environ["CUDA_VISIBLE_DEVICES"] = str(get_local_rank())
        _check_single_device_visible()
    if devices is None:
        world_size = get_world_size_from_env()
        if world_size == 1:
            _run_managed_singleprocess(accelerator, main_single)
        else:
            # use all GPUs for training
            _run_managed_multiprocess(accelerator, main_single)
    else:
        # use single GPU (e.g. run_folder from every GPU)
        world_size, device_ids = _parse_devices(accelerator, devices)
        assert world_size == 1 and len(device_ids) == 1
        _log_device_info(accelerator, device_ids)
        _run_managed_singleprocess(accelerator, main_single)


def _run_managed_singleprocess(accelerator, main_single):
    # single process
    logging.info(f"running single process slurm training")
    device = _accelerator_to_device(accelerator)
    main_single(device=device)


def _run_managed_multiprocess(accelerator, main_single):
    # setup MASTER_ADDR & MASTER_PORT
    assert "MASTER_ADDR" in os.environ
    assert "MASTER_PORT" in os.environ

    # get config from env variables
    world_size = get_world_size_from_env()
    rank = get_rank_from_env()

    # init process group
    logging.info(
        f"initializing rank={rank} local_rank={get_local_rank()} "
        f"nodes={get_nodes()} hostname={platform.uname().node} "
        f"master_addr={os.environ['MASTER_ADDR']} master_port={os.environ['MASTER_PORT']} "
        f"(waiting for all {world_size} processes to connect)"
    )
    init_process_group(backend=get_backend(accelerator), init_method="env://", world_size=world_size, rank=rank)
    barrier()

    # start main_single
    device = _accelerator_to_device(accelerator)
    main_single(device=device)
    destroy_process_group()


def run_single_or_multiprocess(accelerator, devices, main_single, master_port, mig_devices):
    logging.info("------------------")
    # single node run
    assert devices is not None
    world_size, device_ids = _parse_devices(accelerator, devices, mig_devices)
    if world_size == 1:
        # single process
        logging.info(f"running single process training")
        if accelerator == "gpu":
            os.environ["CUDA_VISIBLE_DEVICES"] = device_ids[0]
            _check_single_device_visible()
        _log_device_info(accelerator, device_ids)
        device = _accelerator_to_device(accelerator)
        main_single(device=device)
    else:
        # spawn multi process training
        logging.info(
            f"running multi process training on {world_size} processes "
            f"(devices={devices} host={platform.uname().node})"
        )
        master_port = _get_free_port(master_port)
        logging.info(f"master port: {master_port}")
        # dont log device info as this would load torch on device0 and block the VRAM required for this
        # log_device_info(accelerator, device_ids)
        args = (accelerator, device_ids, master_port, world_size, main_single)
        spawn(_run_multiprocess, nprocs=world_size, args=args)


def _run_multiprocess(rank, accelerator, device_ids, master_port, world_size, main_single):
    # currently only single node is supported
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = str(master_port)
    
    torch.cuda.set_device(int(device_ids[rank]))

    if accelerator == "gpu":
        os.environ["CUDA_VISIBLE_DEVICES"] = device_ids[rank]
        #_check_single_device_visible()

    from torch.distributed import init_process_group, destroy_process_group
    init_process_group(
        backend=get_backend(accelerator, device_ids),
        init_method="env://",
        world_size=world_size,
        rank=rank,
    )
    device = _accelerator_to_device(accelerator)
    main_single(device=device)
    destroy_process_group()


def get_backend(accelerator, device_ids=None):
    if accelerator == "cpu":
        # gloo is recommended for cpu multiprocessing
        # https://pytorch.org/docs/stable/distributed.html#which-backend-to-use
        return "gloo"
    if os.name == "nt":
        # windows doesn't support nccl (I think)
        return "gloo"
    # MIG doesn't support NCCL
    if device_ids is not None:
        for device_id in device_ids:
            try:
                int(device_id)
            except ValueError:
                return "gloo"
    # nccl is recommended for gpu multiprocessing
    # https://pytorch.org/docs/stable/distributed.html#which-backend-to-use
    return "nccl"


def _get_free_port(start_port):
    taken_ports = set()
    for connection in psutil.net_connections():
        if connection.laddr.ip == "127.0.0.1":
            taken_ports.add(connection.laddr.port)
        if len(connection.raddr) > 0 and connection.raddr.ip == "127.0.0.1":
            taken_ports.add(connection.raddr.port)

    for port in range(start_port, 65535):
        if port not in taken_ports:
            return port
    raise ValueError(f"all ports starting from {start_port} are taken")


def _parse_devices(accelerator, devices, mig_devices=None):
    try:
        # single process
        device_ids = [int(devices)]
    except ValueError:
        # multi process
        device_ids = yaml.safe_load(f"[{devices}]")
        msg = f"invalid devices specification '{devices}' (specify multiple devices like this '0,1,2,3')"
        assert all(isinstance(d, int) for d in device_ids), msg
    # os.environ["CUDA_VISIBLE_DEVICES"] requires string
    device_ids = [str(device_id) for device_id in device_ids]

    if accelerator == "gpu" and mig_devices is not None:
        # map to MIG device ids
        hostname = platform.uname().node
        if hostname in mig_devices:
            for i in range(len(device_ids)):
                device_id = int(device_ids[i])
                if device_id in mig_devices[hostname]:
                    mig_device_id = mig_devices[hostname][device_id]
                    device_ids[i] = mig_device_id
                    logging.info(f"device_id is MIG device with id {mig_device_id}")

    return len(device_ids), device_ids


def _check_single_device_visible():
    assert "CUDA_VISIBLE_DEVICES" in os.environ
    visible_device_count = torch.cuda.device_count()
    assert visible_device_count <= 1, os.environ


def _log_device_info(accelerator, device_ids):
    if accelerator == "cpu":
        for i in range(len(device_ids)):
            logging.info(f"device {i}: cpu")
    elif accelerator == "gpu":
        # retrieve device names via nvidia-smi because CUDA_VISIBLE_DEVICES needs to be set before calling anything
        # in torch.cuda -> only 1 visible device
        all_devices = os.popen("nvidia-smi --query-gpu=gpu_name --format=csv,noheader").read().strip().split("\n")
        for i, device_id in enumerate(device_ids):
            try:
                device_id = int(device_id)
                logging.info(f"device {i}: {all_devices[device_id]} (id={device_id})")
            except ValueError:
                # MIG device
                logging.info(f"using MIG device")
    else:
        raise NotImplementedError


def _accelerator_to_device(accelerator):
    if accelerator == "cpu":
        return "cpu"
    elif accelerator == "gpu":
        return "cuda"
    raise NotImplementedError

In [7]:
import sys
import os

os.environ["CUDA_VISIBLE_DEVICES"] = "0"

# Simulate only the first two CLI arguments
sys.argv = [
    'main_train.py',  # This would be the name of your script
    '--hp', 'yamls/shapenetcar/upt/dim768_seq1024sdf512_cnext_lr5e4_sd02_reprcnn_grn_grid32.yaml',
    '--devices', '0'
]

In [8]:
# parse cli_args immediately for fast cli_args validation
cli_args = parse_run_cli_args()
static_config = StaticConfig(uri="static_config.yaml", datasets_were_preloaded=cli_args.datasets_were_preloaded)
# initialize loggers for setup (seperate id)
add_global_handlers(log_file_uri=None)

<MessageCounter (NOTSET)>

In [9]:
accelerator=cli_args.accelerator
devices=cli_args.devices
main_single=main_single
master_port=cli_args.master_port or static_config.master_port
mig_devices=static_config.mig_config

In [10]:
# single process
world_size, device_ids = _parse_devices(accelerator, devices, mig_devices)
logging.info(f"running single process training")
if accelerator == "gpu":
    os.environ["CUDA_VISIBLE_DEVICES"] = device_ids[0]
    #_check_single_device_visible()
_log_device_info(accelerator, device_ids)
device = _accelerator_to_device(accelerator)
#main_single(device=device)

11-10 18:57:54 I running single process training
11-10 18:57:54 I device 0: NVIDIA A10G (id=0)


In [11]:
cli_args = parse_run_cli_args()
static_config = StaticConfig(uri="static_config.yaml", datasets_were_preloaded=cli_args.datasets_were_preloaded)
add_global_handlers(log_file_uri=None)
with log_from_all_ranks():
    logging.info(f"initialized process rank={get_rank()} local_rank={get_local_rank()} pid={os.getpid()}")
#barrier()
logging.info(f"initialized {get_world_size()} processes")

# CUDA_LAUNCH_BLOCKING=1 for debugging
#os.environ["CUDA_LAUNCH_BLOCKING"] = str(1)

# cudnn
if cli_args.accelerator == "gpu":
    if cli_args.cudnn_benchmark or static_config.default_cudnn_benchmark:
        torch.backends.cudnn.benchmark = True
        assert not static_config.default_cudnn_deterministic, "cudnn_benchmark can make things non-deterministic"
    else:
        logging.warning(f"disabled cudnn benchmark")
        if static_config.default_cudnn_deterministic:
            torch.backends.cudnn.deterministic = True
            os.environ["CUBLAS_WORKSPACE_CONFIG"] = ":4096:8"
            logging.warning(f"enabled cudnn deterministic")

# profiling
if cli_args.accelerator == "gpu":
    if cli_args.cuda_profiling or static_config.default_cuda_profiling:
        kp.setup_async(cuda_start_event, cuda_end_event)
        logging.info(f"initialized profiler to call sync cuda")
else:
    kp.setup_async_as_sync()

# load hyperparameters
stage_hp = get_stage_hp(
    cli_args.hp,
    template_path="zztemplates",
    testrun=cli_args.testrun,
    minmodelrun=cli_args.minmodelrun,
    mindatarun=cli_args.mindatarun,
    mindurationrun=cli_args.mindurationrun,
)

# train stage
#train_stage(
#    stage_hp=stage_hp,
#    static_config=static_config,
#    cli_args=cli_args,
#    device=device,
#)

11-10 18:57:55 I initialized process rank=0 local_rank=0 pid=3082396
11-10 18:57:55 I initialized 1 processes
11-10 18:57:55 I initialized profiler to call sync cuda


### TRAIN STAGE

In [12]:
# set environment variables
for key, value in stage_hp.get("env", {}).items():
    os.environ[key] = value if isinstance(value, str) else str(value)

# resume
if cli_args.resume_stage_id is not None:
    assert "initializer" not in stage_hp["trainer"]
    if cli_args.resume_checkpoint is None:
        checkpoint = "latest"
    elif cli_args.resume_checkpoint.startswith("E"):
        checkpoint = dict(epoch=int(cli_args.resume_checkpoint[1:]))
    elif cli_args.resume_checkpoint.startswith("U"):
        checkpoint = dict(update=int(cli_args.resume_checkpoint[1:]))
    elif cli_args.resume_checkpoint.startswith("S"):
        checkpoint = dict(sample=int(cli_args.resume_checkpoint[1:]))
    else:
        # any checkpoint (like cp=last or cp=best.accuracy1.test.main)
        checkpoint = cli_args.resume_checkpoint
    stage_hp["trainer"]["initializer"] = dict(
        kind="resume_initializer",
        stage_id=cli_args.resume_stage_id,
        checkpoint=checkpoint,
    )

# retrieve stage_id from hp (allows queueing up dependent stages by hardcoding stage_ids in the yamls) e.g.:
# - pretrain MAE with stageid abcdefgh
# - finetune MAE where the backbone is initialized with the backbone from stage_id abcdefgh
stage_id = stage_hp.get("stage_id", None)
# generate stage_id and sync across devices
if stage_id is None:
    stage_id = generate_id()
    if is_distributed():
        object_list = [stage_id] if is_rank0() else [None]
        broadcast_object_list(object_list)
        stage_id = object_list[0]
stage_name = stage_hp.get("stage_name", "default_stage")

# initialize logging
path_provider = PathProvider(
    output_path=static_config.output_path,
    model_path=static_config.model_path,
    stage_name=stage_name,
    stage_id=stage_id,
    temp_path=static_config.temp_path,
)
message_counter = add_global_handlers(log_file_uri=path_provider.logfile_uri)

# init seed
run_name = cli_args.name or stage_hp.pop("name", None)
seed = stage_hp.pop("seed", None)
if seed is None:
    seed = 0
    logging.info(f"no seed specified -> using seed={seed}")

# initialize wandb
wandb_config_uri = stage_hp.pop("wandb", None)
if wandb_config_uri == "disabled":
    wandb_mode = "disabled"
else:
    wandb_mode = cli_args.wandb_mode or static_config.default_wandb_mode
if wandb_mode == "disabled":
    wandb_config_dict = {}
    if cli_args.wandb_config is not None:
        logging.warning(f"wandb_config is defined via CLI but mode is disabled -> wandb_config is not used")
    if wandb_config_uri is not None:
        logging.warning(f"wandb_config is defined via yaml but mode is disabled -> wandb_config is not used")
else:
    # retrieve wandb config from yaml
    if wandb_config_uri is not None:
        wandb_config_uri = Path("wandb_configs") / wandb_config_uri
        if cli_args.wandb_config is not None:
            logging.warning(f"wandb_config is defined via CLI and via yaml -> wandb_config from yaml is used")
    # retrieve wandb config from --wandb_config cli arg
    elif cli_args.wandb_config is not None:
        wandb_config_uri = Path("wandb_configs") / cli_args.wandb_config
    # use default wandb_config file
    else:
        wandb_config_uri = Path("wandb_config.yaml")
    with open(wandb_config_uri.with_suffix(".yaml")) as f:
        wandb_config_dict = yaml.safe_load(f)
wandb_config = WandbConfig(mode=wandb_mode, **wandb_config_dict)
config_provider, summary_provider = init_wandb(
    device=device,
    run_name=run_name,
    stage_hp=stage_hp,
    wandb_config=wandb_config,
    path_provider=path_provider,
    account_name=static_config.account_name,
    tags=stage_hp.pop("tags", None),
    notes=stage_hp.pop("notes", None),
    group=stage_hp.pop("group", None),
    group_tags=stage_hp.pop("group_tags", None),
)
# log codebase "high-level" version name (git commit is logged anyway)
config_provider["code/mlp"] = "CVSim"
config_provider["code/tag"] = os.popen("git describe --abbrev=0").read().strip()
config_provider["code/name"] = "initial"

# log setup
logging.info("------------------")
logging.info(f"stage_id: {stage_id}")
logging.info(get_cli_command())
check_versions(verbose=True)
log_system_info()
static_config.log()
cli_args.log()
log_distributed_config()
log_stage_hp(stage_hp)
if is_rank0():
    save_unresolved_hp(cli_args.hp, path_provider.stage_output_path / "hp_unresolved.yaml")
    save_resolved_hp(stage_hp, path_provider.stage_output_path / "hp_resolved.yaml")

logging.info("------------------")
logging.info(f"training stage '{path_provider.stage_name}'")
if is_distributed():
    # using a different seed for every rank to ensure that stochastic processes are different across ranks
    # for large batch_sizes this shouldn't matter too much
    # this is relevant for:
    # - augmentations (augmentation parameters of sample0 of rank0 == augparams of sample0 of rank1 == ...)
    # - the masks of a MAE are the same for every rank
    # NOTE: DDP syncs the parameters in its __init__ method -> same initial parameters independent of seed
    seed += get_rank()
    logging.info(f"using different seeds per process (seed+rank) ")
set_seed(seed)

11-10 18:57:57 I log file: results/stage1/ugse6jw9/log.txt
11-10 18:57:57 I no seed specified -> using seed=0
11-10 18:57:57 I ------------------
11-10 18:57:57 I initializing wandb (mode=online)


[34m[1mwandb[0m: Currently logged in as: [33mpablo-hermoso-moreno[0m. Use [1m`wandb login --relogin`[0m to force relogin


11-10 18:57:57 I logged into wandb (host=https://api.wandb.ai/)


11-10 18:57:58 I ------------------
11-10 18:57:58 I stage_id: ugse6jw9
11-10 18:57:58 I python main_train.py --hp yamls/shapenetcar/upt/dim768_seq1024sdf512_cnext_lr5e4_sd02_reprcnn_grn_grid32.yaml --devices 0
11-10 18:57:58 I ------------------
11-10 18:57:58 I VERSION CHECK
11-10 18:57:58 I executable: /home/ubuntu/UPT/upt_venv/bin/python
11-10 18:57:58 I python version: 3.10.9
11-10 18:57:58 I torch version: 2.3.1+cu121
11-10 18:57:58 I torch.cuda version: 12.1
11-10 18:57:58 I torchvision.version: 0.18.1+cu121
11-10 18:57:58 I kappabenchmark version: 0.0.10
11-10 18:57:58 I kappaconfig version: 1.0.31
11-10 18:57:58 I kappadata version: 1.4.15
11-10 18:57:58 I kappamodules version: 0.1.99
11-10 18:57:58 I kappaprofiler version: 1.0.11
11-10 18:57:58 I kappaschedules version: 0.0.31
11-10 18:57:58 I torchmetrics version: 1.4.3
11-10 18:57:58 I ------------------
11-10 18:57:58 I SYSTEM INFO
11-10 18:57:58 I host name: ip-172-31-65-120
11-10 18:57:58 I OS: Linux-5.15.0-1058-aws-x86_

fatal: No names found, cannot describe anything.


11-10 18:58:00 I CUDA version: 12.2
11-10 18:58:00 W could not retrieve current git commit hash from ./.git/FETCH_HEAD
11-10 18:58:00 I initialized process rank=0 local_rank=0 pid=3082396 hostname=ip-172-31-65-120
11-10 18:58:00 I total_cpu_count: 192
11-10 18:58:00 I ------------------
11-10 18:58:00 I STATIC CONFIG
11-10 18:58:00 I account_name: dev
11-10 18:58:00 I output_path: results
11-10 18:58:00 I local_dataset_path: /home/ubuntu/UPT/data
11-10 18:58:00 I Filesystem      Size  Used Avail Use% Mounted on
11-10 18:58:00 I /dev/root        16T  8.3T  6.9T  55% /
11-10 18:58:00 I ------------------
11-10 18:58:00 I CLI ARGS
11-10 18:58:00 I hp: yamls/shapenetcar/upt/dim768_seq1024sdf512_cnext_lr5e4_sd02_reprcnn_grn_grid32.yaml
11-10 18:58:00 I accelerator: gpu
11-10 18:58:00 I devices: 0
11-10 18:58:00 I testrun: False
11-10 18:58:00 I minmodelrun: False
11-10 18:58:00 I mindatarun: False
11-10 18:58:00 I mindurationrun: False
11-10 18:58:00 I datasets_were_preloaded: False
11-10 1

In [13]:
# init datasets
logging.info("------------------")
logging.info("initializing datasets")
datasets = {}
dataset_config_provider = DatasetConfigProvider(
    global_dataset_paths=static_config.get_global_dataset_paths(),
    local_dataset_path=static_config.get_local_dataset_path(),
    data_source_modes=static_config.get_data_source_modes(),
)
if "datasets" not in stage_hp:
    logging.info(f"no datasets found -> initialize dummy dataset")
    datasets["train"] = DummyDataset(
        size=256,
        x_shape=(2,),
        n_classes=2,
    )
else:
    for dataset_key, dataset_kwargs in stage_hp["datasets"].items():
        logging.info(f"initializing {dataset_key}")
        datasets[dataset_key] = dataset_from_kwargs(
            dataset_config_provider=dataset_config_provider,
            path_provider=path_provider,
            **dataset_kwargs,
        )
data_container_kwargs = {}
if "prefetch_factor" in stage_hp:
    data_container_kwargs["prefetch_factor"] = stage_hp.pop("prefetch_factor")
if "max_num_workers" in stage_hp:
    data_container_kwargs["max_num_workers"] = stage_hp.pop("max_num_workers")
data_container = DataContainer(
    **datasets,
    num_workers=cli_args.num_workers,
    pin_memory=cli_args.pin_memory,
    config_provider=config_provider,
    seed=get_random_int(),
    **data_container_kwargs,
)

11-10 18:58:00 I ------------------
11-10 18:58:00 I initializing datasets
11-10 18:58:00 I initializing train
11-10 18:58:01 I data_source (global): '/home/ubuntu/UPT/data/shapenet_car_processed'
11-10 18:58:01 I data_source (local): '/home/ubuntu/UPT/data/shapenet_car'
11-10 18:58:01 I initializing test
11-10 18:58:01 I data_source (global): '/home/ubuntu/UPT/data/shapenet_car_processed'
11-10 18:58:01 I data_source (local): '/home/ubuntu/UPT/data/shapenet_car'


In [27]:
data_container.

<utils.data_container.DataContainer at 0x7fe731718700>

In [14]:
# init trainer
logging.info("------------------")
logging.info("initializing trainer")
trainer_kwargs = {}
if "max_batch_size" in stage_hp:
    trainer_kwargs["max_batch_size"] = stage_hp.pop("max_batch_size")
trainer = trainer_from_kwargs(
    data_container=data_container,
    device=device,
    sync_batchnorm=cli_args.sync_batchnorm or static_config.default_sync_batchnorm,
    config_provider=config_provider,
    summary_provider=summary_provider,
    path_provider=path_provider,
    **stage_hp["trainer"],
    **trainer_kwargs,
)
# register datasets of callbacks (e.g. for ImageNet-C the dataset never changes so its pointless to specify)
for callback in trainer.callbacks:
    callback.register_root_datasets(
        dataset_config_provider=dataset_config_provider,
        is_mindatarun=cli_args.testrun or cli_args.mindatarun,
    )

11-10 18:58:01 I ------------------
11-10 18:58:01 I initializing trainer
11-10 18:58:01 I using precision: torch.bfloat16 (desired=bfloat16 backup=float16)
11-10 18:58:01 I main_sampler: RandomSampler(num_repeats=1)


In [30]:
data_container

<utils.data_container.DataContainer at 0x7fe731718700>

In [29]:
device

'cuda'

In [22]:
# init model
logging.info("------------------")
logging.info("creating model")
if "model" not in stage_hp:
    logging.info(f"no model defined -> use dummy model")
    model = DummyModel(
        input_shape=trainer.input_shape,
        output_shape=trainer.output_shape,
        update_counter=trainer.update_counter,
        path_provider=path_provider,
        is_frozen=True,
    )
else:
    model = model_from_kwargs(
        **stage_hp["model"],
        input_shape=trainer.input_shape,
        output_shape=trainer.output_shape,
        update_counter=trainer.update_counter,
        path_provider=path_provider,
        data_container=data_container,
    )

logging.info(f"model architecture:\n{model}")
# moved to trainer as initialization on cuda is different than on cpu
# model = model.to(stage_config.run_config.device)

11-10 18:47:12 I ------------------
11-10 18:47:12 I creating model
11-10 18:47:12 I model has no initializers -> not loading a checkpoint or an optimizer state
11-10 18:47:12 I model has no initializers -> not loading a checkpoint or an optimizer state
11-10 18:47:13 I model has no initializers -> not loading a checkpoint or an optimizer state
11-10 18:47:13 I model has no initializers -> not loading a checkpoint or an optimizer state
11-10 18:47:14 I model has no initializers -> not loading a checkpoint or an optimizer state
11-10 18:47:14 I model architecture:
RansSimformerNognnSdfModel(
  (grid_encoder): RansGridConvnext(
    (model): ConvNext(
      (stem): Sequential(
        (0): Conv3d(4, 192, kernel_size=(2, 2, 2), stride=(2, 2, 2))
        (1): LayerNorm3d(
          (layer): LayerNorm((192,), eps=1e-06, elementwise_affine=True)
        )
      )
      (stages): ModuleList(
        (0): ConvNextStage(
          (downsampling): Identity()
          (blocks): Sequential(
      

In [23]:
# train model
trainer.train_model(model)

11-10 18:47:26 I rans_grid_convnext applying model specific initialization
11-10 18:47:26 I rans_perceiver applying model specific initialization
11-10 18:47:26 I transformer_model applying model specific initialization
11-10 18:47:26 I rans_perceiver applying model specific initialization
11-10 18:47:26 I applying model specific initialization
11-10 18:47:26 I rans_grid_convnext initialize optimizer
11-10 18:47:26 I base lr: 5e-4
11-10 18:47:26 I scaled lr: 1.95e-6
11-10 18:47:26 I lr_scaler=LinearLrScaler(divisor=256)
11-10 18:47:26 I lr_scale_factor=1
11-10 18:47:26 I group modifiers exclude_bias_from_wd=True exclude_norm_from_wd=True add_model_specific_param_group_modifiers=True [ExcludeFromWdByNameModifier(name=type_token)]
11-10 18:47:26 I using 2 param groups:
11-10 18:47:26 I weight_decay=0.0 len(params)=52
11-10 18:47:26 I len(params)=21
11-10 18:47:26 I rans_perceiver initialize optimizer
11-10 18:47:26 I base lr: 5e-4
11-10 18:47:26 I scaled lr: 1.95e-6
11-10 18:47:26 I lr_s

AssertionError: 

Exception ignored in: <function _MultiProcessingDataLoaderIter.__del__ at 0x7f8263ad9630>
Traceback (most recent call last):
  File "/home/ubuntu/UPT/upt_venv/lib/python3.10/site-packages/torch/utils/data/dataloader.py", line 1479, in __del__
    self._shutdown_workers()
  File "/home/ubuntu/UPT/upt_venv/lib/python3.10/site-packages/torch/utils/data/dataloader.py", line 1462, in _shutdown_workers
    if w.is_alive():
  File "/opt/conda/lib/python3.10/multiprocessing/process.py", line 160, in is_alive
    assert self._parent_pid == os.getpid(), 'can only test a child process'
AssertionError: can only test a child process
Exception ignored in: <function _MultiProcessingDataLoaderIter.__del__ at 0x7f8263ad9630>
Traceback (most recent call last):
  File "/home/ubuntu/UPT/upt_venv/lib/python3.10/site-packages/torch/utils/data/dataloader.py", line 1479, in __del__
    self._shutdown_workers()
  File "/home/ubuntu/UPT/upt_venv/lib/python3.10/site-packages/torch/utils/data/dataloader.py", line

[1;34mwandb[0m: 🚀 View run [33msnc-all-sdfpos-e1000-subsam1-lr5e4-sdfperconly-seqlen1024-sdf512-cnext-dim768-sd02unif-reprcnn-grn-grid32/stage1[0m at: [34mhttps://wandb.ai/pablo-hermoso-moreno/TEST-UPT/runs/x1alubub[0m


In [None]:
# finish callbacks
CallbackBase.finish()

# summarize logvalues
logging.info("------------------")
logging.info(f"summarize logvalues")
summary_provider.summarize_logvalues()

# summarize stage
if "stage_summarizers" in stage_hp and is_rank0():
    logging.info("------------------")
    logging.info("summarize stage")
    for kwargs in stage_hp["stage_summarizers"]:
        summarizer = stage_summarizer_from_kwargs(
            summary_provider=summary_provider,
            path_provider=path_provider,
            **kwargs,
        )
        summarizer.summarize()
# summarize summary
if "summary_summarizers" in stage_hp and is_rank0():
    summary_provider.flush()
    logging.info("------------------")
    for kwargs in stage_hp["summary_summarizers"]:
        summary_summarizer = summary_summarizer_from_kwargs(
            summary_provider=summary_provider,
            **kwargs,
        )
        summary_summarizer.summarize()
summary_provider.flush()

# add profiling times to summary_provider
def try_log_profiler_time(summary_key, profiler_query):
    try:
        summary_provider[summary_key] = kp.profiler.get_node(profiler_query).total_time
    except AssertionError:
        pass

try_log_profiler_time("profiler/train", "train")
try_log_profiler_time("profiler/train/iterator", "train.iterator")
try_log_profiler_time("profiler/train/data_loading", "train.data_loading")
try_log_profiler_time("profiler/train/update", "train.update")
try_log_profiler_time("profiler/train/to_device", "train.update.forward.to_device")
try_log_profiler_time("profiler/train/forward", "train.update.forward")
try_log_profiler_time("profiler/train/backward", "train.update.backward")
summary_provider.flush()
# log profiler times
logging.info(f"full profiling times:\n{kp.profiler.to_string()}")
kp.reset()

# execute commands
if "on_finish" in stage_hp and is_rank0():
    logging.info("------------------")
    logging.info("ON_FINISH COMMANDS")
    for command in stage_hp["on_finish"]:
        command = command_from_kwargs(**command, stage_id=stage_id)
        # noinspection PyBroadException
        try:
            command.execute()
        except:
            logging.exception(f"failed to execute {command}")

# cleanup
logging.info("------------------")
logging.info(f"CLEANUP")
data_container.dispose()
message_counter.log()
finish_wandb(wandb_config)

# log how many tensors remain to be aware of potential memory leaks
all_tensors, cuda_tensors = get_tensors_in_memory()
logging.info("------------------")
logging.info(f"{len(all_tensors)} tensors remaining in memory (cpu+gpu)")
logging.info(f"{len(all_tensors) - len(cuda_tensors)} tensors remaining in memory (cpu)")
logging.info(f"{len(cuda_tensors)} tensors remaining in memory (gpu)")