diff --git a/saturnfs/client/file_transfer.py b/saturnfs/client/file_transfer.py index 9408015..0121ff6 100644 --- a/saturnfs/client/file_transfer.py +++ b/saturnfs/client/file_transfer.py @@ -461,7 +461,7 @@ def __init__( def upload_chunks( self, chunks: Iterable[UploadChunk], callback: Optional[Callback] = None ) -> Tuple[List[ObjectStorageCompletePart], bool]: - first_part_number = self._producer(chunks) + first_part_number = self._producer_init(chunks) if first_part_number == -1: # No chunks given return [], True @@ -475,27 +475,31 @@ def close(self): for _ in range(self.num_workers): self.upload_queue.put(None) - def _producer(self, chunks: Iterable[UploadChunk]) -> int: + def _producer_init(self, chunks: Iterable[UploadChunk]) -> int: + # Grab first chunk from iterable to determine the starting part_number first_chunk = next(iter(chunks), None) if first_chunk is None: return -1 self.upload_queue.put(first_chunk) - def _producer_thread(): - all_chunks_read: bool = False - for chunk in chunks: - if not self._put_chunk(chunk): - break - else: - all_chunks_read = True + # Start producer thread + Thread(target=self._producer, kwargs={"chunks": chunks}, daemon=True).start() + return first_chunk.part.part_number - uploads_finished = self._wait() + def _producer(self, chunks: Iterable[UploadChunk]) -> int: + # Iterate chunks onto the upload_queue until completed or error detected + all_chunks_read: bool = False + for chunk in chunks: + if not self._put_chunk(chunk): + break + else: + all_chunks_read = True - # Signal end of upload to the collector - self.completed_queue.put(UploadStop(error=not (uploads_finished and all_chunks_read))) + # Wait for workers to finish processing the queue + uploads_finished = self._wait() - Thread(target=_producer_thread, daemon=True).start() - return first_chunk.part.part_number + # Signal end of upload to the collector + self.completed_queue.put(UploadStop(error=not (uploads_finished and all_chunks_read))) def _put_chunk(self, chunk: UploadChunk, poll_interval: int = 5) -> bool: while True: