Skip to content

Commit

Permalink
Merge pull request #123 from thomasj02/output_locking
Browse files Browse the repository at this point in the history
Add locking around output buffer
  • Loading branch information
minrk committed Apr 15, 2016
2 parents bc4fe89 + 26a12db commit d6dd8a8
Showing 1 changed file with 11 additions and 7 deletions.
18 changes: 11 additions & 7 deletions ipykernel/iostream.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import os
import threading
import sys
import threading
import uuid
import warnings
from io import StringIO, UnsupportedOperation
Expand Down Expand Up @@ -222,6 +223,7 @@ def __init__(self, session, pub_thread, name, pipe=None):
self._flush_lock = threading.Lock()
self._flush_timeout = None
self._io_loop = pub_thread.io_loop
self._buffer_lock = threading.Lock()
self._new_buffer()

def _is_master_process(self):
Expand Down Expand Up @@ -314,7 +316,8 @@ def write(self, string):
string = string.decode(self.encoding, 'replace')

is_child = (not self._is_master_process())
self._buffer.write(string)
with self._buffer_lock:
self._buffer.write(string)
if is_child:
# newlines imply flush in subprocesses
# mp.Pool cannot be trusted to flush promptly (or ever),
Expand All @@ -333,12 +336,13 @@ def writelines(self, sequence):

def _flush_buffer(self):
"""clear the current buffer and return the current buffer data"""
data = u''
if self._buffer is not None:
buf = self._buffer
self._new_buffer()
data = buf.getvalue()
buf.close()
with self._buffer_lock:
data = u''
if self._buffer is not None:
buf = self._buffer
self._new_buffer()
data = buf.getvalue()
buf.close()
return data

def _new_buffer(self):
Expand Down

0 comments on commit d6dd8a8

Please sign in to comment.