In [4]:
##%overwritefile
##%file:../src/RealTimeSubprocess.py
##%noruncode
class RealTimeSubprocess(subprocess.Popen):
    """
    A subprocess that allows to read its stdout and stderr in real time
    """

    inputRequest = "<inputRequest>"
    kobj=None
    def setkobj(self,k=None):
        self.kobj=k
    def __init__(self, cmd, write_to_stdout, write_to_stderr, read_from_stdin,cwd=None,shell=False,env=None,kobj=None):
        """
        :param cmd: the command to execute
        :param write_to_stdout: a callable that will be called with chunks of data from stdout
        :param write_to_stderr: a callable that will be called with chunks of data from stderr
        """
        self.kobj=kobj
        self._write_to_stdout = write_to_stdout
        self._write_to_stderr = write_to_stderr
        self._read_from_stdin = read_from_stdin
        if env!=None and len(env)<1:env=None
        
        super().__init__(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE,
                            bufsize=0,cwd=cwd,shell=shell,env=env)

        self._stdout_queue = Queue()
        self._stdout_thread = Thread(target=RealTimeSubprocess._enqueue_output, args=(self.stdout, self._stdout_queue))
        self._stdout_thread.daemon = True
        self._stdout_thread.start()

        self._stderr_queue = Queue()
        self._stderr_thread = Thread(target=RealTimeSubprocess._enqueue_output, args=(self.stderr, self._stderr_queue))
        self._stderr_thread.daemon = True
        self._stderr_thread.start()

    @staticmethod
    def _enqueue_output(stream, queue):
        """
        Add chunks of data from a stream to a queue until the stream is empty.
        """
        for line in iter(lambda: stream.read(4096), b''):
            queue.put(line)
        stream.close()

    def write_contents(self,magics=None):
        """
        Write the available content from stdin and stderr where specified when the instance was created
        :return:
        """

        def read_all_from_queue(queue):
            res = b''
            size = queue.qsize()
            while size != 0:
                res += queue.get_nowait()
                size -= 1
            return res
        stderr_contents = read_all_from_queue(self._stderr_queue)
        if stderr_contents:
            if self.kobj!=None:
                self.kobj._logln(stderr_contents.decode('UTF-8', errors='ignore'),3)
            else:
                self._write_to_stderr(stderr_contents.decode('UTF-8', errors='ignore'))
        stdout_contents = read_all_from_queue(self._stdout_queue)
        if stdout_contents:
            if self.kobj.get_magicsSvalue(magics,"outputtype").startswith("image"):
                self._write_to_stdout(stdout_contents,magics)
                ##reset outputtype
                magics['_st']["outputtype"]="text/plain"
                return

            contents = stdout_contents.decode('UTF-8', errors='ignore')
            # if there is input request, make output and then
            # ask frontend for input
            start = contents.find(self.__class__.inputRequest)
            if(start >= 0):
                contents = contents.replace(self.__class__.inputRequest, '')
                if(len(contents) > 0):
                    self._write_to_stdout(contents,magics)
                readLine = ""
                while(len(readLine) == 0):
                    readLine = self._read_from_stdin()
                # need to add newline since it is not captured by frontend
                readLine += "\n"
                self.stdin.write(readLine.encode())
            else:
                self._write_to_stdout(contents,magics)

    def wait_end(self,magics):
        while self.poll() is None:
            if self.kobj.get_magicsSvalue(magics,"outputtype").startswith("image"):
                continue
            self.write_contents(magics)
        self.write_contents(magics)
        self._write_to_stdout("The process end:"+str(self.pid)+"\n",magics)
        ############################################
        self.write_contents(magics)
        # wait for threads to finish, so output is always shown
        self._stdout_thread.join()
        self._stderr_thread.join()
        self.write_contents(magics)
        return self.returncode

[MyPython] Info:file h:\Jupyter\Myjupyter-kernel\base\../src/RealTimeSubprocess.py created successfully
