Skip to content

Commit

Permalink
weakrefs to resolve cyclical dependencies that prevent garbage collec…
Browse files Browse the repository at this point in the history
…tion
  • Loading branch information
Andrew Moffat committed Nov 10, 2012
1 parent 123a514 commit d2d1b05
Showing 1 changed file with 24 additions and 18 deletions.
42 changes: 24 additions & 18 deletions sh.py
Expand Up @@ -80,6 +80,7 @@
import resource
from collections import deque
import logging
import weakref


# this is ugly, but we've added a module-level logging kill switch. the reason
Expand Down Expand Up @@ -1002,14 +1003,14 @@ class NoStdinData(Exception): pass
class StreamWriter(object):
def __init__(self, name, process, stream, stdin, bufsize):
self.name = name
self.process = process
self.process = weakref.ref(process)
self.stream = stream
self.stdin = stdin

self.log = logging.getLogger(repr(self))


self.stream_bufferer = StreamBufferer(self.process.call_args["encoding"],
self.stream_bufferer = StreamBufferer(self.process().call_args["encoding"],
bufsize)

# determine buffering for reading from the input we set for stdin
Expand Down Expand Up @@ -1050,7 +1051,7 @@ def __init__(self, name, process, stream, stdin, bufsize):


def __repr__(self):
return "<StreamWriter %s for %r>" % (self.name, self.process)
return "<StreamWriter %s for %r>" % (self.name, self.process())

def fileno(self):
return self.stream
Expand Down Expand Up @@ -1087,7 +1088,7 @@ def write(self):
except DoneReadingStdin:
if logging_enabled: self.log.debug("done reading")

if self.process.call_args["tty_in"]:
if self.process().call_args["tty_in"]:
# EOF time
try: char = termios.tcgetattr(self.stream)[6][termios.VEOF]
except: char = chr(4).encode()
Expand All @@ -1101,7 +1102,7 @@ def write(self):

# if we're not bytes, make us bytes
if IS_PY3 and hasattr(chunk, "encode"):
chunk = chunk.encode(self.process.call_args["encoding"])
chunk = chunk.encode(self.process().call_args["encoding"])

for chunk in self.stream_bufferer.process(chunk):
if logging_enabled: self.log.debug("got chunk size %d: %r", len(chunk), chunk[:30])
Expand All @@ -1120,25 +1121,26 @@ def close(self):
if logging_enabled: self.log.debug("got chunk size %d to flush: %r", len(chunk), chunk[:30])
try:
if chunk: os.write(self.stream, chunk)
if not self.process.call_args["tty_in"]:
if not self.process().call_args["tty_in"]:
if logging_enabled: self.log.debug("we used a TTY, so closing the stream")
os.close(self.stream)
except OSError: pass



class StreamReader(object):

def __init__(self, name, process, stream, handler, buffer, bufsize, pipe_queue=None):
self.name = name
self.process = process
self.process = weakref.ref(process)
self.stream = stream
self.buffer = buffer
self.pipe_queue = pipe_queue

self.pipe_queue = None
if pipe_queue: self.pipe_queue = weakref.ref(pipe_queue)

self.log = logging.getLogger(repr(self))

self.stream_bufferer = StreamBufferer(self.process.call_args["encoding"],
self.stream_bufferer = StreamBufferer(self.process().call_args["encoding"],
bufsize)

# determine buffering
Expand Down Expand Up @@ -1183,24 +1185,28 @@ def __init__(self, name, process, stream, handler, buffer, bufsize, pipe_queue=N


self.handler_args = ()
if num_args == implied_arg + 2: self.handler_args = (self.process.stdin,)
elif num_args == implied_arg + 3: self.handler_args = (self.process.stdin, self.process)
if num_args == implied_arg + 2:
self.handler_args = (self.process().stdin,)
elif num_args == implied_arg + 3:
self.handler_args = (self.process().stdin, self.process())


def fileno(self):
return self.stream

def __repr__(self):
return "<StreamReader %s for %r>" % (self.name, self.process)
return "<StreamReader %s for %r>" % (self.name, self.process())

def close(self):
chunk = self.stream_bufferer.flush()
if logging_enabled: self.log.debug("got chunk size %d to flush: %r", len(chunk), chunk[:30])
if logging_enabled: self.log.debug("got chunk size %d to flush: %r",
len(chunk), chunk[:30])
if chunk: self.write_chunk(chunk)

if self.handler_type == "fd" and hasattr(self.handler, "close"):
self.handler.flush()

if self.pipe_queue: self.pipe_queue.put(None)
if self.pipe_queue: self.pipe_queue().put(None)
try: os.close(self.stream)
except OSError: pass

Expand All @@ -1211,20 +1217,20 @@ def write_chunk(self, chunk):
if self.handler_type == "fn" and not self.should_quit:
# try to use the encoding first, if that doesn't work, send
# the bytes
try: to_handler = chunk.decode(self.process.call_args["encoding"])
try: to_handler = chunk.decode(self.process().call_args["encoding"])
except UnicodeDecodeError: to_handler = chunk
self.should_quit = self.handler(to_handler, *self.handler_args)

elif self.handler_type == "stringio":
self.handler.write(chunk.decode(self.process.call_args["encoding"]))
self.handler.write(chunk.decode(self.process().call_args["encoding"]))

elif self.handler_type in ("cstringio", "fd"):
self.handler.write(chunk)


if self.pipe_queue:
if logging_enabled: self.log.debug("putting chunk onto pipe: %r", chunk[:30])
self.pipe_queue.put(chunk)
self.pipe_queue().put(chunk)
self.buffer.append(chunk)


Expand Down

0 comments on commit d2d1b05

Please sign in to comment.