In [8]:
import struct
from docker import DockerClient
import docker
import os
from rich import print
import re
import time
import queue
import select



import threading
import queue
import os
import select
import struct

class DockerStreamWrapper:
    def __init__(self, exec_id, sock):
        self.exec_id = exec_id
        self._sock = sock
        self._stdout_r, self._stdout_w = os.pipe()
        self._stderr_r, self._stderr_w = os.pipe()
        self.stdout = self.Stream(self, self._stdout_r)
        self.stderr = self.Stream(self, self._stderr_r)

        self._stop_event = threading.Event()
        self._thread = threading.Thread(target=self._listen)
        self._thread.start()
    
    class Stream:
        def __init__(self, parent, read_fd):
            self.parent = parent
            self._read_fd = read_fd
            self._buffer = ""

        def readline(self, timeout=3):
            while '\n' not in self._buffer:
                ready_to_read, _, _ = select.select([self._read_fd], [], [], timeout)
                if not ready_to_read:
                    return None
                chunk = os.read(self._read_fd, 1024).decode('utf-8')
                self._buffer += chunk

            newline_pos = self._buffer.find('\n')
            line = self._buffer[:newline_pos]
            self._buffer = self._buffer[newline_pos + 1:]
            return line

    def _listen(self):
        while not self._stop_event.is_set():
            ready_to_read, _, _ = select.select([self._sock], [], [], 1)
            if ready_to_read:
                raw_data = self._sock.recv(2048)
                stdout, stderr = self.demux_docker_stream(raw_data)
                os.write(self._stdout_w, stdout.encode())
                os.write(self._stderr_w, stderr.encode())

    def demux_docker_stream(self, data):
        stdout = ""
        stderr = ""
        offset = 0
        while offset + 8 <= len(data):
            header = data[offset:offset + 8]
            stream_type, length = struct.unpack('>BxxxL', header)
            offset += 8
            chunk = data[offset:offset + length].decode('utf-8')
            offset += length
            if stream_type == 1:
                stdout += chunk
            elif stream_type == 2:
                stderr += chunk

        return stdout, stderr

    def write(self, data):
        commands = []
        current_command = ""
        escape_next = False  # Flag to indicate if the next character is part of an escape sequence

        for char in data:
            if escape_next:
                if char == 'n':  # Newline
                    if current_command.rstrip():
                        commands.append(current_command.rstrip())
                    current_command = ""
                elif char == 'r':  # Carriage return
                    current_command = ""
                elif char == '\\':  # Backslash
                    current_command += '\\'
                else:  # Any other escaped character (e.g., \t, \', \")
                    current_command += char
                escape_next = False
            else:
                if char == '\\':
                    escape_next = True
                else:
                    current_command += char

        # Add the last command if not empty
        if current_command.rstrip():
            commands.append(current_command.rstrip())


        for cmd in commands:
            if cmd.endswith('\n'):
                self._sock.sendall(cmd.encode('utf-8'))
            else:
                self._sock.sendall(cmd.encode('utf-8') + b'\n')

    def flush(self):
        pass

    def close(self):
        self._stop_event.set()
        self._thread.join()
        os.close(self._stdout_r)
        os.close(self._stdout_w)
        os.close(self._stderr_r)
        os.close(self._stderr_w)



class DockerProcWrapper:
    def __init__(self, command, session_path):
        self.client = docker.APIClient()
        self.image_name = "openinterpreter-runtime-container:latest"
        self.session_path = session_path
        self.id = os.path.basename(session_path)
        self.lang = self.extract_language_from_command(command)
        self.exec_id = None
        self.exec_socket = None

        # Initialize container
        self.init_container()

        self.init_exec_instance(command)
        

        self.wrapper = DockerStreamWrapper(self.exec_id, self.exec_socket)
        self.stdout = self.wrapper.stdout
        self.stderr = self.wrapper.stderr
        self.stdin = self.wrapper

    def init_container(self):
        self.container = None
        try:
            containers = self.client.containers(
                filters={"label": f"session_id={self.id}"}, all=True)
            if containers:
                self.container = containers[0]
                container_id = self.container.get('Id')
                container_info = self.client.inspect_container(container_id)
                if container_info.get('State', {}).get('Running') is False:
                    print(container_info.get('State', {}))
                    self.client.start(container=container_id)
                    print("waiting for container start")
                    self.wait_for_container_start(container_id)
                    print("container started")
            else:
                volume_binding = {
                    self.session_path: {'bind': '/mnt/data', 'mode': 'rw'}
                }
                env_client = docker.from_env()
                self.container = env_client.containers.run(
                    self.image_name,
                    detach=True,
                    command="/bin/bash -i",  # Keep the container running with bash
                    labels={'session_id': self.id},
                    volumes={self.session_path: {'bind': '/mnt/data', 'mode': 'rw'}},
                    user="nobody",  # Run as non-root user for security,
                    stdin_open = True,
                    tty=True
                )
                self.client.start(container=self.container.id)
                print("waiting for container start")
                self.wait_for_container_start(self.container.id)
                print("container started")

        except Exception as e:
            print(f"An error occurred: {e}")

    def init_exec_instance(self, command):
        if self.container:
            self.exec_id = self.client.exec_create(
                self.container.id,
                cmd=command,
                stdin=True,
                stdout=True,
                stderr=True,
                workdir="/mnt/data",
                user="nobody",
                tty=False

            )['Id']
            self.exec_socket = self.client.exec_start(
                self.exec_id, socket=True, tty=True, demux=True)._sock
            

    @staticmethod
    def extract_language_from_command(command):
        # Normalize the command to lower case for easier searching
        command_lower = command.lower()

        # Extract Python
        if "python" in command_lower or os.path.basename(command_lower).startswith("python"):
            return "python"

        # Extract R
        if re.search(r'\bR\b', command):
            return "r"

        # Extract Shell
        if any(shell in command_lower for shell in ["bash", "sh", "zsh", "fish"]):
            return "shell"

        # Extract Node.js
        if "node" in command_lower:
            return "javascript"

        # Return unknown if we can't determine the language
        return "unknown"

    def wait_for_container_start(self, container_id, timeout=30):
        start_time = time.time()
        while True:
            container_info = self.client.inspect_container(container_id)
            if container_info.get('State', {}).get('Running') is True:
                return True
            elif time.time() - start_time > timeout:
                raise TimeoutError(
                    "Container did not start within the specified timeout.")
            time.sleep(1)

import threading
import uuid
import uuid
def test_docker_wrapper():
    # Initialize a DockerProcWrapper instance with bash in interactive mode
    wrapper = DockerProcWrapper("python3 ", session_path="/workspaces/open-interpreter/interpreter/dockerfiles/ses-" + str(uuid.uuid4()))

    print("Interactive Docker shell. Type 'exit' to quit.")

    while True:
        # Read user input
        command = input(">>> ")
        
        # Check for exit command
        if command.lower() == 'exit':
            break

        # Send the command to the Docker container
        wrapper.stdin.write(command + "\n")
        wrapper.stdin.flush()

        # Read from stdout and stderr until an empty line is received
        print("Reading from stdout:")
        while True:
            line = wrapper.stdout.readline()
            if line is None:
                break
            print(f"STDOUT: {line}")

        print("Reading from stderr:")
        while True:
            line2 = wrapper.stderr.readline()
            if not line2:
                break
            print(f"STDERR: {line2}")

    # Clean up
    del wrapper

if __name__ =='__main__':
    test_docker_wrapper()

KeyboardInterrupt: 

In [2]:
import struct
from docker import DockerClient
import docker
import os
from rich import print
import re
import time
import queue
import select



import threading
import queue
import os
import select
import struct

class DockerStreamWrapper:
    def __init__(self, exec_id, sock):
        self.exec_id = exec_id
        self._sock = sock
        self._stdout_r, self._stdout_w = os.pipe()
        self._stderr_r, self._stderr_w = os.pipe()
        self.stdout = self.Stream(self, self._stdout_r)
        self.stderr = self.Stream(self, self._stderr_r)

        ## stdin pipe and fd. dosent need a pipe, but its easier and thread safe and less mem intensive than a queue.Queue()
        self._stdin_r, self._stdin_w = os.pipe()  # Pipe for stdin
        self.stdin = os.fdopen(self._stdin_w, 'w')
        self._stdin_buffer = b""  # Buffer for stdin data. more complex = better fr

        ## start recieving thread to watch socket, and send data from stdin pipe.
        self._stop_event = threading.Event()
        self._thread = threading.Thread(target=self._listen, daemon=True)
        self._thread.start()
    
    class Stream:
        def __init__(self, parent, read_fd):
            self.parent = parent
            self._read_fd = read_fd
            self._buffer = ""

        def readline(self, timeout=3):
            while '\n' not in self._buffer:
                ready_to_read, _, _ = select.select([self._read_fd], [], [], timeout)
                if not ready_to_read:
                    return None
                chunk = os.read(self._read_fd, 1024).decode('utf-8')
                self._buffer += chunk

            newline_pos = self._buffer.find('\n')
            line = self._buffer[:newline_pos]
            self._buffer = self._buffer[newline_pos + 1:]
            return line

    def _listen(self):
        while not self._stop_event.is_set():
            ready_to_read, _, _ = select.select([self._sock, self._stdin_r], [], [], 1)
            
            for s in ready_to_read:
                if s == self._sock:
                    raw_data = self._sock.recv(2048)
                    stdout, stderr = self.demux_docker_stream(raw_data)
                    os.write(self._stdout_w, stdout.encode())
                    os.write(self._stderr_w, stderr.encode())
                elif s == self._stdin_r:
                    # Read from the read end of the stdin pipe and add to the buffer
                    data_to_write = os.read(self._stdin_r, 2048).decode('utf-8')
                    
                    # Remove escape characters for quotes but leave other backslashes untouched
                    data_to_write =  re.sub(r'\\([\'"])', r'\1', data_to_write)

                    data_to_write = data_to_write.replace('\\n', '\n')

                    self._stdin_buffer += data_to_write.encode()

                    # Check for newline and send line by line
                    while b'\n' in self._stdin_buffer:
                        newline_pos = self._stdin_buffer.find(b'\n')
                        line = self._stdin_buffer[:newline_pos + 1]  # Include the newline
                        self._stdin_buffer = self._stdin_buffer[newline_pos + 1:]


                        # Send the line to the Docker container
                        self._sock.sendall(line)

    def demux_docker_stream(self, data):
        stdout = ""
        stderr = ""
        offset = 0
        while offset + 8 <= len(data):
            header = data[offset:offset + 8]
            stream_type, length = struct.unpack('>BxxxL', header)
            offset += 8
            chunk = data[offset:offset + length].decode('utf-8')
            offset += length
            if stream_type == 1:
                stdout += chunk
            elif stream_type == 2:
                stderr += chunk

        return stdout, stderr

    def flush(self):
        pass

    def close(self):
        self._stop_event.set()
        self._thread.join()
        os.close(self._stdout_r)
        os.close(self._stdout_w)
        os.close(self._stderr_r)
        os.close(self._stderr_w)



class DockerProcWrapper:
    def __init__(self, command, session_path):
        self.client = docker.APIClient()
        self.image_name = "openinterpreter-runtime-container:latest"
        self.session_path = session_path
        self.id = os.path.basename(session_path)
        self.lang = self.extract_language_from_command(command)
        self.exec_id = None
        self.exec_socket = None

        # Initialize container
        self.init_container()

        self.init_exec_instance(command)
        

        self.wrapper = DockerStreamWrapper(self.exec_id, self.exec_socket)
        self.stdout = self.wrapper.stdout
        self.stderr = self.wrapper.stderr
        self.stdin = self.wrapper.stdin

        self.stdin.write(command + "\n")
    def init_container(self):
        self.container = None
        try:
            containers = self.client.containers(
                filters={"label": f"session_id={self.id}"}, all=True)
            if containers:
                self.container = containers[0]
                container_id = self.container.get('Id')
                container_info = self.client.inspect_container(container_id)
                if container_info.get('State', {}).get('Running') is False:
                    print(container_info.get('State', {}))
                    self.client.start(container=container_id)
                    print("waiting for container start")
                    self.wait_for_container_start(container_id)
                    print("container started")
            else:
                volume_binding = {
                    self.session_path: {'bind': '/mnt/data', 'mode': 'rw'}
                }
                env_client = docker.from_env()
                self.container = env_client.containers.run(
                    self.image_name,
                    detach=True,
                    command="/bin/bash -i",  # Keep the container running with bash
                    labels={'session_id': self.id},
                    volumes={self.session_path: {'bind': '/mnt/data', 'mode': 'rw'}},
                    user="nobody",  # Run as non-root user for security,
                    stdin_open = True,
                    tty=False
                )
                self.client.start(container=self.container.id)
                print("waiting for container start")
                self.wait_for_container_start(self.container.id)
                print("container started")

        except Exception as e:
            print(f"An error occurred: {e}")

    def init_exec_instance(self, command):
        if self.container:
            self.exec_id = self.client.exec_create(
                self.container.id,
                cmd="/bin/bash",
                stdin=True,
                stdout=True,
                stderr=True,
                workdir="/mnt/data",
                user="nobody",
                tty=False

            )['Id']
            self.exec_socket = self.client.exec_start(
                self.exec_id, socket=True, tty=False, demux=False)._sock
            

    @staticmethod
    def extract_language_from_command(command):
        # Normalize the command to lower case for easier searching
        command_lower = command.lower()

        # Extract Python
        if "python" in command_lower or os.path.basename(command_lower).startswith("python"):
            return "python"

        # Extract R
        if re.search(r'\bR\b', command):
            return "r"

        # Extract Shell
        if any(shell in command_lower for shell in ["bash", "sh", "zsh", "fish"]):
            return "shell"

        # Extract Node.js
        if "node" in command_lower:
            return "javascript"

        # Return unknown if we can't determine the language
        return "unknown"

    def wait_for_container_start(self, container_id, timeout=30):
        start_time = time.time()
        while True:
            container_info = self.client.inspect_container(container_id)
            if container_info.get('State', {}).get('Running') is True:
                return True
            elif time.time() - start_time > timeout:
                raise TimeoutError(
                    "Container did not start within the specified timeout.")
            time.sleep(1)

import threading
import uuid
import uuid
def test_docker_wrapper():
    # Initialize a DockerProcWrapper instance with bash in interactive mode
    wrapper = DockerProcWrapper("python3 -i -q -u", session_path="/workspaces/open-interpreter/interpreter/dockerfiles/ses-" + str(uuid.uuid4()))

    print("Interactive Docker shell. Type 'exit' to quit.")

    while True:
        # Read user input
        command = input(">>> ")
        
        # Check for exit command
        if command.lower() == 'exit':
            break

        # Send the command to the Docker container
        wrapper.stdin.write(command + "\n")
        wrapper.stdin.flush()

        # Read from stdout and stderr until an empty line is received
        print("Reading from stdout:")
        while True:
            line = wrapper.stdout.readline()
            if line is None:
                break
            print(f"STDOUT: {line}")

        print("Reading from stderr:")
        while True:
            line2 = wrapper.stderr.readline()
            if not line2:
                break
            print(f"STDERR: {line2}")

    # Clean up
    del wrapper

if __name__ =='__main__':
    test_docker_wrapper()

KeyboardInterrupt: Interrupted by user

In [None]:
import os
os._exit(0)


: 