Skip to content
This repository has been archived by the owner on Aug 18, 2022. It is now read-only.

Commit

Permalink
Merge branch 'fix/popen-deadlock'
Browse files Browse the repository at this point in the history
  • Loading branch information
tmontaigu committed Mar 8, 2020
2 parents 77ce2c0 + 2e04c00 commit 4dfcb72
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 8 deletions.
11 changes: 8 additions & 3 deletions pylas/compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,17 @@ def stdin(self):
def stdout(self):
return self.prc.stdout

def wait(self):
return self.prc.wait()

def communicate(self):
stdout_data, stderr_data = self.prc.communicate()
self._raise_if_bad_err_code(stderr_data.decode())
self.raise_if_bad_err_code(stderr_data.decode())
return stdout_data

def _raise_if_bad_err_code(self, error_msg):
def raise_if_bad_err_code(self, error_msg=None):
if error_msg is None:
error_msg = self.prc.stderr.read().decode()
if self.prc.returncode != 0:
raise RuntimeError(
"Laszip failed to {} with error code {}\n\t{}".format(
Expand All @@ -219,5 +224,5 @@ def _raise_if_bad_err_code(self, error_msg):
def wait_until_finished(self):
self.stdin.close()
self.prc.wait()
self._raise_if_bad_err_code(self.prc.stderr.read().decode())
self.raise_if_bad_err_code(self.prc.stderr.read().decode())

13 changes: 10 additions & 3 deletions pylas/lasdatas/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
LasZipProcess
)
from ..point import record, dims, PointFormat
from ..utils import ConveyorThread
from ..vlrs import known, vlrlist

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -361,14 +362,20 @@ def write(self, destination, do_compress=None):
self.write_to(destination, do_compress=do_compress)

def _compress_with_laszip_executable(self, out_stream):
if self.vlrs.get("ExtraBytesVlr"):
raise errors.LazError("Compressing LAS that has extra bytes is not supported with LASzip")
try:
out_stream.fileno()
except OSError:
laszip_prc = LasZipProcess(LasZipProcess.Actions.Compress)
self.write_to(laszip_prc.stdin)
stdout_data = laszip_prc.communicate()
out_stream.seek(0)
out_stream.write(stdout_data)
t = ConveyorThread(laszip_prc.stdout, out_stream)
t.start()
self.write_to(laszip_prc.stdin, do_compress=False)
laszip_prc.stdin.close()
t.join()
laszip_prc.wait()
laszip_prc.raise_if_bad_err_code()
else:
# The ouput is a file
# let laszip write directly to it, to avoid copies
Expand Down
11 changes: 9 additions & 2 deletions pylas/lasreader.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from .compression import lazrs_decompress_buffer, lazperf_decompress_buffer, LasZipProcess
from .lasdatas import las14, las12
from .point import record, PointFormat
from .utils import ConveyorThread
from .vlrs import rawvlr
from .vlrs.vlrlist import VLRList

Expand Down Expand Up @@ -162,9 +163,15 @@ def _decompress_with_laszip_executable(self):
fileno = self.stream.fileno()
except OSError:
laszip_prc = LasZipProcess(LasZipProcess.Actions.Decompress)
new_source = io.BytesIO()
t = ConveyorThread(laszip_prc.stdout, new_source)
t.start()
laszip_prc.stdin.write(self.stream.read())
stdout_data = laszip_prc.communicate()
new_source = io.BytesIO(stdout_data)
laszip_prc.stdin.close()
t.join()
laszip_prc.wait()
new_source.seek(0)
laszip_prc.raise_if_bad_err_code()
else:
# The input is a file
# let laszip read directly from it to avoid copying it
Expand Down
27 changes: 27 additions & 0 deletions pylas/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import threading


def ctypes_max_limit(byte_size, signed=False):
nb_bits = (byte_size * 8) - (1 if signed else 0)
return (2 ** nb_bits) - 1
Expand All @@ -15,3 +18,27 @@ def files_have_same_dtype(las_files):
"""
dtypes = {las.points.dtype for las in las_files}
return len(dtypes) == 1


class ConveyorThread(threading.Thread):
""" class to be used as a separate thread by calling start()
This class convey data from the input stream into the output stream.
This is used when piping data into laszip.exe using python's subprocess.Popen
when both of the stdin & stdout are in memory io objects because in such cases
there is a deadlock ocuring because we fill up the os stdout buffer
So we need a thread to read data from the stdout using another thread
to avoid deadlocking
"""
def __init__(self, input_stream, output_stream):
super().__init__()
self.input_stream = input_stream
self.output_stream = output_stream

def run(self) -> None:
for data in self.input_stream:
if data:
self.output_stream.write(data)
else:
break

0 comments on commit 4dfcb72

Please sign in to comment.