# DDP_KBIT Jupyter Notebook Interface

This notebook provides a simple interface to run the DDP_KBIT distributed deep learning system without using command line arguments. It wraps the existing `main.py` functionality for easy experimentation.

## Setup and Imports

## 세션 초기화 (매번 실행 필요)

아래 셀을 매 세션마다 가장 먼저 실행하여 로컬 모듈에 연결하세요.

In [1]:
import os
import sys
import json
from pathlib import Path

def find_ddp_kbit_path():
    """Find DDP_KBIT directory in common locations."""
    current_dir = Path.cwd()
    possible_paths = [
        current_dir,
        current_dir.parent,
        Path("/mnt/data/DDP_KBIT"),
        Path(r"D:\Nextcloud3\kbit\DDP_KBIT"),
    ]
    
    for path in possible_paths:
        if (path / "main.py").exists():
            return path
    return None

def setup_module_path():
    """Setup Python path for DDP_KBIT modules."""
    ddp_path = find_ddp_kbit_path()
    if not ddp_path:
        print("❌ Could not find DDP_KBIT directory")
        return None
    
    # Add both the directory and its parent to handle package imports
    parent_path = str(ddp_path.parent)
    ddp_str = str(ddp_path)
    
    if parent_path not in sys.path:
        sys.path.insert(0, parent_path)
    if ddp_str not in sys.path:
        sys.path.insert(0, ddp_str)
        
    print(f"✓ Found DDP_KBIT at: {ddp_path}")
    return ddp_path

# Initialize DDP_KBIT
if setup_module_path():
    try:
        from main import (
            setup_logging, 
            load_external_config,
            run_training_mode,
            run_experiment_mode, 
            create_sample_config
        )
        print("✓ Successfully imported DDP_KBIT modules")
        print("🎉 DDP_KBIT notebook interface ready!")
    except ImportError as e:
        print(f"❌ Error importing DDP_KBIT modules: {e}")
        print("⚠️  DDP_KBIT setup incomplete. Some features may not work.")
else:
    print("⚠️  DDP_KBIT setup incomplete. Some features may not work.")

✓ Found DDP_KBIT at: /mnt/data/DDP_KBIT


  from torch.distributed.optim import ZeroRedundancyOptimizer


✓ Successfully imported DDP_KBIT modules
🎉 DDP_KBIT notebook interface ready!


## Configuration Setup

In [2]:
# Setup logging
setup_logging("INFO")

# Create a mock args object to simulate command line arguments
class NotebookArgs:
    def __init__(self):
        self.config_path = "sample_config.json"
        self.distributed = False
        self.experiment_type = "single"
        self.iterations = 3
        self.log_level = "INFO"

# Initialize default arguments
args = NotebookArgs()

print("✓ Configuration setup complete")
print(f"Config path: {args.config_path}")
print(f"Distributed: {args.distributed}")
print(f"Iterations: {args.iterations}")

✓ Configuration setup complete
Config path: sample_config.json
Distributed: False
Iterations: 3


## Create Sample Configuration (Run this first)

In [3]:
# Create a sample configuration file
create_sample_config()
print("✓ Sample configuration created!")

# Display the configuration
if os.path.exists("sample_config.json"):
    with open("sample_config.json", 'r') as f:
        config = json.load(f)
    print("\nCurrent configuration:")
    print(json.dumps(config, indent=2))

Created sample_config.json - customize this file for your needs.
✓ Sample configuration created!

Current configuration:
{
  "training_config": {
    "base_model_type": "NeuralNetwork",
    "optimizer_class": "torch.optim.adam.Adam",
    "optimizer_params": {
      "lr": 0.001
    },
    "loss_fn": "torch.nn.modules.loss.CrossEntropyLoss",
    "perform_validation": true,
    "num_epochs": 1,
    "batch_size": 32,
    "metrics": {
      "loss": "Loss",
      "accuracy": "Accuracy"
    }
  },
  "mongo_config": {
    "connection_id": "my-mongo-1",
    "mongo_database": "kbit-db",
    "collection": "mnist_train_avro"
  },
  "kafka_config": {
    "bootstrap_servers": [
      "155.230.35.200:32100",
      "155.230.35.213:32100",
      "155.230.35.215:32100"
    ],
    "data_load_topic": "kbit-p3r1"
  },
  "data_loader_config": {
    "data_loader_type": "kafka",
    "local_data_path": "/root/processed_mnist",
    "offsets_data": [
      "0:0:19999",
      "1:0:19999",
      "2:0:19999"
    ],

## Training Mode

Run single node or distributed training.

In [4]:
# Single node training
print("🚀 Starting single node training...")
args.distributed = False

try:
    run_training_mode(args)
    print("✅ Training completed successfully!")
except Exception as e:
    print(f"❌ Training failed: {e}")

2025-08-20 05:48:47 - root - INFO - Starting training mode...


🚀 Starting single node training...
FILES IN THIS DIRECTORY
['DDP_KBIT', 'jars', 'config.json', 'mnist_pb2.py', 'spark_DL_checkpoints']


25/08/20 05:48:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/08/20 05:48:49 WARN ResourceUtils: The configuration of cores (exec = 5 task = 1, runnable tasks = 5) will result in wasted resources due to resource gpu limiting the number of runnable tasks per executor to: 1. Please adjust your configuration.
25/08/20 05:48:50 WARN RapidsPluginUtils: RAPIDS Accelerator 24.06.1 using cudf 24.06.0, private revision 755b4dd03c753cacb7d141f3b3c8ff9f83888b69
25/08/20 05:48:50 WARN RapidsPluginUtils: spark.rapids.sql.multiThreadedRead.numThreads is set to 20.
25/08/20 05:48:50 WARN RapidsPluginUtils: The current setting of spark.task.resource.gpu.amount (1.0) is not ideal to get the best performance from the RAPIDS Accelerator plugin. It's recommended to be 1/{executor core count} unl

Current Spark configuration:
spark.app.id = app-20250820054850-0023
spark.app.initial.file.urls = spark://192.168.141.56:39337/files/mnist_pb2.py
spark.app.initial.jar.urls = spark://192.168.141.56:39337/jars/jsr305-3.0.0.jar,spark://192.168.141.56:39337/jars/spark-sql-kafka-0-10_2.12-3.5.1.jar,spark://192.168.141.56:39337/jars/commons-pool2-2.11.1.jar,spark://192.168.141.56:39337/jars/rapids-4-spark_2.12-24.06.1.jar,spark://192.168.141.56:39337/jars/commons-logging-1.1.3.jar,spark://192.168.141.56:39337/jars/spark-token-provider-kafka-0-10_2.12-3.5.1.jar,spark://192.168.141.56:39337/jars/hadoop-client-runtime-3.3.4.jar,spark://192.168.141.56:39337/jars/lz4-java-1.8.0.jar,spark://192.168.141.56:39337/jars/kafka-clients-3.4.1.jar,spark://192.168.141.56:39337/jars/spark-streaming-kafka-0-10_2.12-3.5.1.jar,spark://192.168.141.56:39337/jars/slf4j-api-2.0.7.jar,spark://192.168.141.56:39337/jars/snappy-java-1.1.10.3.jar,spark://192.168.141.56:39337/jars/hadoop-client-api-3.3.4.jar
spark.app.

In [5]:
# Distributed training (uncomment to run)
# print("🚀 Starting distributed training...")
# args.distributed = True

# try:
#     run_training_mode(args)
#     print("✅ Distributed training completed successfully!")
# except Exception as e:
#     print(f"❌ Distributed training failed: {e}")

## Experiment Mode

Run single experiments or multiple iterations with statistical analysis.

In [6]:
# Single experiment
print("🧪 Running single experiment...")
args.experiment_type = "single"

try:
    run_experiment_mode(args)
    print("✅ Single experiment completed successfully!")
except Exception as e:
    print(f"❌ Single experiment failed: {e}")

2025-08-20 05:48:51 - root - INFO - Starting experiment mode: single


🧪 Running single experiment...
FILES IN THIS DIRECTORY
['DDP_KBIT', 'jars', 'config.json', 'mnist_pb2.py', 'spark_DL_checkpoints']
Current Spark configuration:


2025-08-20 05:48:51 - root - ERROR - Experiments failed: exp_fn() missing 3 required positional arguments: 'training_config', 'kafka_config', and 'data_loader_config'


spark.app.id = app-20250820054851-0024
spark.app.initial.file.urls = spark://192.168.141.56:39337/files/mnist_pb2.py
spark.app.initial.jar.urls = spark://192.168.141.56:39337/jars/jsr305-3.0.0.jar,spark://192.168.141.56:39337/jars/spark-sql-kafka-0-10_2.12-3.5.1.jar,spark://192.168.141.56:39337/jars/commons-pool2-2.11.1.jar,spark://192.168.141.56:39337/jars/rapids-4-spark_2.12-24.06.1.jar,spark://192.168.141.56:39337/jars/commons-logging-1.1.3.jar,spark://192.168.141.56:39337/jars/spark-token-provider-kafka-0-10_2.12-3.5.1.jar,spark://192.168.141.56:39337/jars/hadoop-client-runtime-3.3.4.jar,spark://192.168.141.56:39337/jars/lz4-java-1.8.0.jar,spark://192.168.141.56:39337/jars/kafka-clients-3.4.1.jar,spark://192.168.141.56:39337/jars/spark-streaming-kafka-0-10_2.12-3.5.1.jar,spark://192.168.141.56:39337/jars/slf4j-api-2.0.7.jar,spark://192.168.141.56:39337/jars/snappy-java-1.1.10.3.jar,spark://192.168.141.56:39337/jars/hadoop-client-api-3.3.4.jar
spark.app.name = DDP_KBIT_Experiments
s

In [7]:
# Multiple experiments with statistical analysis
print("🧪 Running multiple experiments...")
args.experiment_type = "multiple"
args.iterations = 5  # You can change this number

try:
    run_experiment_mode(args)
    print(f"✅ {args.iterations} experiments completed successfully!")
except Exception as e:
    print(f"❌ Multiple experiments failed: {e}")

2025-08-20 05:48:52 - root - INFO - Starting experiment mode: multiple


🧪 Running multiple experiments...
FILES IN THIS DIRECTORY
['DDP_KBIT', 'jars', 'config.json', 'mnist_pb2.py', 'spark_DL_checkpoints']


2025-08-20 05:48:52 - root - ERROR - Experiments failed: run_multiple_experiments() got an unexpected keyword argument 'iterations'


Current Spark configuration:
spark.app.id = app-20250820054852-0025
spark.app.initial.file.urls = spark://192.168.141.56:39337/files/mnist_pb2.py
spark.app.initial.jar.urls = spark://192.168.141.56:39337/jars/jsr305-3.0.0.jar,spark://192.168.141.56:39337/jars/spark-sql-kafka-0-10_2.12-3.5.1.jar,spark://192.168.141.56:39337/jars/commons-pool2-2.11.1.jar,spark://192.168.141.56:39337/jars/rapids-4-spark_2.12-24.06.1.jar,spark://192.168.141.56:39337/jars/commons-logging-1.1.3.jar,spark://192.168.141.56:39337/jars/spark-token-provider-kafka-0-10_2.12-3.5.1.jar,spark://192.168.141.56:39337/jars/hadoop-client-runtime-3.3.4.jar,spark://192.168.141.56:39337/jars/lz4-java-1.8.0.jar,spark://192.168.141.56:39337/jars/kafka-clients-3.4.1.jar,spark://192.168.141.56:39337/jars/spark-streaming-kafka-0-10_2.12-3.5.1.jar,spark://192.168.141.56:39337/jars/slf4j-api-2.0.7.jar,spark://192.168.141.56:39337/jars/snappy-java-1.1.10.3.jar,spark://192.168.141.56:39337/jars/hadoop-client-api-3.3.4.jar
spark.app.

## Custom Configuration

Modify configuration parameters for your specific needs.

In [8]:
# Customize configuration
custom_config = {
    "spark_config": {
        "master": "local[*]",
        "app_name": "DDP_KBIT_Custom",
        "executor_instances": 4,
        "executor_cores": 2,
        "executor_memory": "8g"
    },
    "training_config": {
        "epochs": 10,
        "batch_size": 128,
        "learning_rate": 0.0001
    },
    "data_config": {
        "kafka_servers": ["localhost:9092"],
        "topic": "custom_topic",
        "batch_size": 64
    }
}

# Save custom configuration
custom_config_path = "custom_config.json"
with open(custom_config_path, "w") as f:
    json.dump(custom_config, f, indent=2)

# Update args to use custom config
args.config_path = custom_config_path

print(f"✓ Custom configuration saved to: {custom_config_path}")
print("\nCustom configuration:")
print(json.dumps(custom_config, indent=2))

PermissionError: [Errno 13] Permission denied: 'custom_config.json'

## Utility Functions

Helper functions for notebook usage.

In [None]:
def quick_train(distributed=False, config_path="sample_config.json"):
    """Quick training function for easy execution."""
    args.distributed = distributed
    args.config_path = config_path
    
    print(f"🚀 Quick training - Distributed: {distributed}")
    try:
        run_training_mode(args)
        print("✅ Training completed!")
    except Exception as e:
        print(f"❌ Training failed: {e}")

def quick_experiment(experiment_type="single", iterations=3):
    """Quick experiment function for easy execution."""
    args.experiment_type = experiment_type
    args.iterations = iterations
    
    print(f"🧪 Quick experiment - Type: {experiment_type}, Iterations: {iterations}")
    try:
        run_experiment_mode(args)
        print("✅ Experiment completed!")
    except Exception as e:
        print(f"❌ Experiment failed: {e}")

print("✓ Utility functions loaded!")
print("\nUse these functions for quick execution:")
print("- quick_train(distributed=False)")
print("- quick_experiment(experiment_type='multiple', iterations=5)")

## Quick Execution Examples

Use the utility functions for quick execution.

In [None]:
# Example: Quick single training
# quick_train()

# Example: Quick multiple experiments
# quick_experiment(experiment_type="multiple", iterations=3)

print("💡 Uncomment the lines above to run quick examples!")