In [1]:
from environment import Environment, VMConfig
from smolagents import CodeAgent, HfApiModel


In [2]:
model_id = "meta-llama/Llama-3.3-70B-Instruct"
agent = CodeAgent(name="TestAgent", tools=[], model=HfApiModel(model_id=model_id), add_base_tools=True)


## Setting up the Environment

This notebook demonstrates how to set up a virtualized environment for testing using temporary directories.


In [None]:
import docker
import shutil
import os
import paramiko
import time
import logging
from typing import Optional, Dict, Any, Union
from docker.types import Mount

# Set up logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger("sandbox-test")


class QemuVMManager:
    """
    A class to manage QEMU virtual machines in Docker containers with SSH capabilities.
    """

    def __init__(self, docker_client: Optional[docker.DockerClient] = None):
        """Initialize the VM manager with a Docker client."""
        self.client = docker_client or docker.from_env()

    def get_or_create_container(
        self,
        storage_path: str,
        image: str = "qemux/qemu",
        container_name: str = "qemu",
        boot: str = "ubuntu",
        debug: str = "Y",
        ram_size: str = "4G",
        cpu_cores: str = "4",
        disk_size: str = "16G",
        novnc_port: int = 8006,
        ssh_port: int = 2222,
        restart_policy: str = "always",
        start_container: bool = True,
        recreate_if_exists: bool = False,
    ) -> docker.models.containers.Container:
        """
        Get an existing container or create a new one.

        Args:
            recreate_if_exists: If True, remove existing container and create a new one

        Returns:
            The container object
        """
        # Check if container already exists
        if self.container_exists(container_name):
            if recreate_if_exists:
                logger.info(f"Container {container_name} already exists, removing it")
                try:
                    container = self.client.containers.get(container_name)
                    if container.status == "running":
                        container.stop()
                    container.remove()
                    logger.info(f"Container {container_name} removed")
                except Exception as e:
                    logger.error(f"Error removing existing container: {str(e)}")
                    raise
            else:
                logger.info(f"Container {container_name} already exists, returning it")
                container = self.client.containers.get(container_name)

                # Start container if requested and not already running
                if start_container and container.status != "running":
                    container.start()
                    logger.info(f"Started existing container {container_name}")

                return container

        # Create new container
        return self.create_container(
            storage_path=storage_path,
            image=image,
            container_name=container_name,
            boot=boot,
            debug=debug,
            ram_size=ram_size,
            cpu_cores=cpu_cores,
            disk_size=disk_size,
            novnc_port=novnc_port,
            ssh_port=ssh_port,
            restart_policy=restart_policy,
            start_container=start_container,
        )

    def create_container(
        self,
        storage_path: str,
        image: str = "qemux/qemu",
        container_name: str = "qemu",
        boot: str = "ubuntu",
        debug: str = "Y",
        ram_size: str = "4G",
        cpu_cores: str = "4",
        disk_size: str = "16G",
        novnc_port: int = 8006,
        ssh_port: int = 2222,
        restart_policy: str = "always",
        start_container: bool = True,
    ) -> docker.models.containers.Container:
        """Create a QEMU container with the specified configuration."""
        storage_path = os.path.abspath(storage_path)
        logger.info(f"Creating container {container_name} with storage at {storage_path}")

        # Ensure the image exists
        if not self.ensure_image_exists(image):
            raise Exception(f"Failed to ensure image {image} exists")

        environment = {
            "BOOT": boot,
            "DEBUG": debug,
            "RAM_SIZE": ram_size,
            "CPU_CORES": cpu_cores,
            "DISK_SIZE": disk_size,
            "DISPLAY": ":0",
        }

        ports = {"8006/tcp": novnc_port, "22/tcp": ssh_port}
        mounts = [Mount(target="/storage", source=storage_path, type="bind")]
        devices = ["/dev/kvm", "/dev/net/tun"]

        try:
            container = self.client.containers.create(
                image=image,
                name=container_name,
                environment=environment,
                devices=devices,
                cap_add=["NET_ADMIN"],
                ports=ports,
                mounts=mounts,
                restart_policy={"Name": restart_policy},
                detach=True,
            )

            if start_container:
                container.start()
                logger.info(f"Container {container_name} created and started")
            else:
                logger.info(f"Container {container_name} created but not started")

            return container

        except docker.errors.APIError as e:
            logger.error(f"Docker API Error: {e}")
            logger.error(f"Storage path: {storage_path} (is absolute: {os.path.isabs(storage_path)})")
            raise

    def copy_storage(self, source_path: str, target_path: str) -> bool:
        """Copy VM storage from source to target path."""
        source_path = os.path.abspath(source_path)
        target_path = os.path.abspath(target_path)

        logger.info(f"Copying storage from {source_path} to {target_path}")

        try:
            if not os.path.exists(source_path):
                logger.error(f"Source directory {source_path} does not exist")
                return False

            os.makedirs(target_path, exist_ok=True)

            for item in os.listdir(source_path):
                source_item = os.path.join(source_path, item)
                target_item = os.path.join(target_path, item)

                if os.path.isdir(source_item):
                    shutil.copytree(source_item, target_item, dirs_exist_ok=True)
                else:
                    shutil.copy2(source_item, target_item)

            logger.info("Successfully copied storage")
            return True

        except Exception as e:
            logger.error(f"Error copying storage: {str(e)}")
            return False

    def wait_for_ssh(
        self,
        hostname: str = "localhost",
        port: int = 2222,
        username: str = "sandbox-user",
        password: str = "password",
        key_filename: Optional[str] = None,
        max_attempts: int = 10,
        initial_delay: int = 5,
    ) -> bool:
        """
        Wait for SSH to become available on the container.
        Returns True when SSH is ready, False if it times out.
        """
        logger.info(f"Waiting for SSH service at {hostname}:{port}...")
        time.sleep(initial_delay)  # Initial delay to let system boot

        for attempt in range(1, max_attempts + 1):
            try:
                client = paramiko.SSHClient()
                client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
                client.connect(
                    hostname=hostname,
                    port=port,
                    username=username,
                    password=password,
                    key_filename=key_filename,
                    timeout=10,
                )
                client.close()
                logger.info("SSH is now available")
                return True
            except Exception as e:
                wait_time = min(2**attempt, 60)  # Cap at 60 seconds
                logger.info(f"SSH not ready (attempt {attempt}/{max_attempts}): {e}")
                if attempt < max_attempts:
                    logger.info(f"Waiting {wait_time} seconds before retry")
                    time.sleep(wait_time)
                else:
                    logger.error("Timed out waiting for SSH")
                    return False

    def ensure_image_exists(self, image_name: str) -> bool:
        """
        Check if an image exists locally, and pull it if not.

        Args:
            image_name: Name of the Docker image (e.g., 'qemux/qemu')

        Returns:
            bool: True if image exists or was successfully pulled, False otherwise
        """
        try:
            # Try to get the image
            logger.info(f"Checking if image {image_name} exists locally...")
            try:
                self.client.images.get(image_name)
                logger.info(f"Image {image_name} already exists locally")
                return True
            except docker.errors.ImageNotFound:
                # Image doesn't exist, need to pull it
                logger.info(f"Image {image_name} not found locally, pulling from registry...")

                # Pull with progress reporting
                pull_output = self.client.api.pull(image_name, stream=True, decode=True)
                for line in pull_output:
                    if "progress" in line:
                        status = line.get("status", "")
                        progress = line.get("progress", "")
                        logger.info(f"Pulling {image_name}: {status} {progress}")
                    elif "status" in line:
                        logger.info(f"Pulling {image_name}: {line['status']}")

                # Verify the image was pulled successfully
                try:
                    self.client.images.get(image_name)
                    logger.info(f"Successfully pulled image {image_name}")
                    return True
                except docker.errors.ImageNotFound:
                    logger.error(f"Failed to pull image {image_name}")
                    return False

        except Exception as e:
            logger.error(f"Error ensuring image exists: {str(e)}")
        return False

    def container_exists(self, container_name: str) -> bool:
        """
        Check if a container with the given name already exists.

        Args:
            container_name: Name of the container to check

        Returns:
            bool: True if container exists, False otherwise
        """
        try:
            containers = self.client.containers.list(all=True, filters={"name": container_name})
            return len(containers) > 0
        except Exception as e:
            logger.error(f"Error checking if container exists: {str(e)}")
            return False

    def stop_container(
        self,
        container_name: str,
        timeout: int = 120,  # 2 minutes default timeout (same as your stop_grace_period)
        force: bool = False,
        remove: bool = False,
    ) -> bool:
        """
        Stop a running container gracefully.

        Args:
            container_name: Name of the container to stop
            timeout: Time to wait for container to stop gracefully (in seconds)
            force: If True, force kill the container if graceful stop fails
            remove: If True, remove the container after stopping

        Returns:
            bool: True if container was stopped successfully, False otherwise
        """
        try:
            # Check if container exists
            try:
                container = self.client.containers.get(container_name)
            except docker.errors.NotFound:
                logger.warning(f"Container {container_name} not found")
                return False

            # Check if container is running
            if container.status != "running":
                logger.info(f"Container {container_name} is not running (status: {container.status})")

                # Remove if requested even if not running
                if remove and container.status != "removing":
                    logger.info(f"Removing container {container_name}")
                    container.remove()
                    logger.info(f"Container {container_name} removed")

                return True

            # Try to stop container gracefully
            logger.info(f"Stopping container {container_name} with {timeout}s timeout...")
            try:
                container.stop(timeout=timeout)
                logger.info(f"Container {container_name} stopped gracefully")
            except docker.errors.APIError as e:
                logger.error(f"Error stopping container gracefully: {e}")

                # Force kill if requested
                if force:
                    logger.warning(f"Force killing container {container_name}")
                    try:
                        container.kill()
                        logger.info(f"Container {container_name} killed")
                    except docker.errors.APIError as kill_error:
                        logger.error(f"Failed to kill container: {kill_error}")
                        return False
                else:
                    return False

            # Refresh container status
            container.reload()

            # Remove if requested
            if remove and container.status != "removing":
                logger.info(f"Removing container {container_name}")
                try:
                    container.remove()
                    logger.info(f"Container {container_name} removed")
                except docker.errors.APIError as remove_error:
                    logger.error(f"Failed to remove container: {remove_error}")
                    return False

            return True

        except Exception as e:
            logger.error(f"Error stopping container {container_name}: {str(e)}")
            return False

    def cleanup_vm(
        self, container_name: str, storage_path: Optional[str] = None, delete_storage: bool = False, force: bool = False
    ) -> bool:
        """
        Stop container and optionally delete its storage for complete cleanup.

        Args:
            container_name: Name of the container to clean up
            storage_path: Path to the storage directory
            delete_storage: If True, delete the storage directory
            force: If True, force kill the container if graceful stop fails

        Returns:
            bool: True if cleanup was successful, False otherwise
        """
        # Stop and remove container
        if not self.stop_container(container_name, force=force, remove=True):
            if not force:
                logger.warning("Graceful stop failed, trying force cleanup")
                if not self.stop_container(container_name, force=True, remove=True):
                    logger.error(f"Failed to stop container {container_name} even with force")
                    return False

        # Delete storage if requested
        if delete_storage and storage_path:
            try:
                storage_path = os.path.abspath(storage_path)
                if os.path.exists(storage_path):
                    logger.info(f"Deleting storage directory: {storage_path}")
                    shutil.rmtree(storage_path)
                    logger.info(f"Storage directory deleted: {storage_path}")
                else:
                    logger.warning(f"Storage directory not found: {storage_path}")
            except Exception as e:
                logger.error(f"Error deleting storage directory: {str(e)}")
                return False

        logger.info(f"Cleanup of VM {container_name} completed successfully")
        return True


class SSHManager:
    """
    A class to manage SSH connections and operations.
    """

    def __init__(
        self,
        hostname: str = "localhost",
        port: int = 2222,
        username: str = "sandbox-user",
        password: Optional[str] = "password",
        key_filename: Optional[str] = None,
    ):
        """Initialize SSH manager with connection details."""
        self.hostname = hostname
        self.port = port
        self.username = username
        self.password = password
        self.key_filename = key_filename
        self.client = None

    def connect(self, timeout: int = 30) -> paramiko.SSHClient:
        """Establish an SSH connection with retry logic."""
        if self.client is not None:
            try:
                # Test if connection is still alive
                self.client.exec_command("echo test", timeout=5)
                return self.client
            except:
                # Connection is dead, close it
                self.close()

        self.client = paramiko.SSHClient()
        self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

        max_attempts = 5
        for attempt in range(1, max_attempts + 1):
            try:
                logger.info(f"SSH connection attempt {attempt}/{max_attempts}...")
                self.client.connect(
                    hostname=self.hostname,
                    port=self.port,
                    username=self.username,
                    password=self.password,
                    key_filename=self.key_filename,
                    timeout=timeout,
                )
                logger.info("SSH connection established")
                return self.client
            except Exception as e:
                if attempt == max_attempts:
                    logger.error(f"Failed to establish SSH connection: {e}")
                    raise
                else:
                    wait_time = min(2**attempt, 30)  # Cap at 30 seconds
                    logger.info(f"Connection attempt failed: {e}. Retrying in {wait_time}s")
                    time.sleep(wait_time)

    def close(self):
        """Close the SSH connection."""
        if self.client:
            self.client.close()
            self.client = None
            logger.info("SSH connection closed")

    def transfer_directory(self, local_path: str, remote_path: str) -> bool:
        """Transfer a local directory to remote via SFTP."""
        try:
            client = self.connect()
            local_path = os.path.abspath(local_path)

            if not os.path.exists(local_path):
                logger.error(f"Local directory '{local_path}' does not exist")
                return False

            # Create remote directory
            sftp = client.open_sftp()
            self.exec_command(f"mkdir -p {remote_path}")

            # Upload files
            for root, dirs, files in os.walk(local_path):
                rel_path = os.path.relpath(root, local_path)
                if rel_path == ".":
                    rel_path = ""

                remote_dir = os.path.join(remote_path, rel_path).replace("\\", "/")
                if rel_path:
                    try:
                        sftp.mkdir(remote_dir)
                    except IOError:
                        # Directory might already exist
                        pass

                for file in files:
                    local_file = os.path.join(root, file)
                    remote_file = os.path.join(remote_dir, file).replace("\\", "/")
                    sftp.put(local_file, remote_file)
                    logger.debug(f"Uploaded: {local_file} -> {remote_file}")

            sftp.close()
            logger.info(f"Successfully transferred {local_path} to {remote_path}")
            return True

        except Exception as e:
            logger.error(f"Error transferring directory: {str(e)}")
            return False

    def exec_command(self, command: str, directory: str = "", timeout: int = 60) -> Dict[str, Any]:
        """Run a command on the remote machine."""
        try:
            client = self.connect()

            full_command = command
            if directory:
                full_command = f"cd {directory} && {command}"

            logger.info(f"Executing: {full_command}")
            stdin, stdout, stderr = client.exec_command(full_command, timeout=timeout)

            exit_status = stdout.channel.recv_exit_status()
            stdout_str = stdout.read().decode("utf-8")
            stderr_str = stderr.read().decode("utf-8")

            if exit_status == 0:
                logger.info("Command executed successfully")
                if stdout_str.strip():
                    logger.debug(f"Output: {stdout_str.strip()}")
            else:
                logger.error(f"Command failed with exit status {exit_status}")
                if stderr_str.strip():
                    logger.error(f"Error: {stderr_str.strip()}")

            return {"status": exit_status, "stdout": stdout_str, "stderr": stderr_str}

        except Exception as e:
            logger.error(f"Error executing command: {str(e)}")
            return {"status": -1, "stdout": "", "stderr": str(e)}

    def run_uv_command(self, local_dir: str, remote_dir: str, command: str = "main.py") -> bool:
        """Transfer directory and run uv command."""
        try:
            # Transfer directory
            if not self.transfer_directory(local_dir, remote_dir):
                return False

            # Run the command
            uv_cmd = f"uv run {command}"
            logger.info(f"Running: {uv_cmd} in {remote_dir}")
            result = self.exec_command(uv_cmd, directory=remote_dir)

            return result["status"] == 0

        except Exception as e:
            logger.error(f"Error in run_uv_command: {str(e)}")
            return False

        finally:
            self.close()


In [None]:
base_dir = os.path.abspath(os.getcwd())
storage_path = os.path.join(base_dir, "docker/vms/ubuntu/")
target_storage_path = os.path.join(base_dir, "docker/environments/ubuntu1")

# Initialize managers
vm = QemuVMManager()

# Create a VM with custom settings
vm.copy_storage(storage_path, target_storage_path)
container = vm.get_or_create_container(
    storage_path=target_storage_path, container_name="ubuntu-environment-1", ram_size="4G", cpu_cores="4"
)
# Wait for VM to boot
vm.wait_for_ssh()

# Set up SSH
ssh = SSHManager(username="sandbox-user", password="password")

# Transfer and run project
project_dir = "./server"
remote_dir = "/home/sandbox-user/Desktop/server"
ssh.transfer_directory(project_dir, remote_dir)

# Install uv and run project
result = ssh.exec_command("uv run main.py", directory=remote_dir)
print(f"Command result: {result['status']}")
