Skip to content

Commit

Permalink
Split producer into separate funcs so the first chunk doesn't stay in…
Browse files Browse the repository at this point in the history
… scope and can be garbage collected after it is consumed by a worker
  • Loading branch information
bhperry committed Feb 27, 2024
1 parent 8deb199 commit 8df4718
Showing 1 changed file with 18 additions and 14 deletions.
32 changes: 18 additions & 14 deletions saturnfs/client/file_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ def __init__(
def upload_chunks(
self, chunks: Iterable[UploadChunk], callback: Optional[Callback] = None
) -> Tuple[List[ObjectStorageCompletePart], bool]:
first_part_number = self._producer(chunks)
first_part_number = self._producer_init(chunks)
if first_part_number == -1:
# No chunks given
return [], True
Expand All @@ -475,27 +475,31 @@ def close(self):
for _ in range(self.num_workers):
self.upload_queue.put(None)

def _producer(self, chunks: Iterable[UploadChunk]) -> int:
def _producer_init(self, chunks: Iterable[UploadChunk]) -> int:
# Grab first chunk from iterable to determine the starting part_number
first_chunk = next(iter(chunks), None)
if first_chunk is None:
return -1
self.upload_queue.put(first_chunk)

def _producer_thread():
all_chunks_read: bool = False
for chunk in chunks:
if not self._put_chunk(chunk):
break
else:
all_chunks_read = True
# Start producer thread
Thread(target=self._producer, kwargs={"chunks": chunks}, daemon=True).start()
return first_chunk.part.part_number

uploads_finished = self._wait()
def _producer(self, chunks: Iterable[UploadChunk]) -> int:
# Iterate chunks onto the upload_queue until completed or error detected
all_chunks_read: bool = False
for chunk in chunks:
if not self._put_chunk(chunk):
break
else:
all_chunks_read = True

# Signal end of upload to the collector
self.completed_queue.put(UploadStop(error=not (uploads_finished and all_chunks_read)))
# Wait for workers to finish processing the queue
uploads_finished = self._wait()

Thread(target=_producer_thread, daemon=True).start()
return first_chunk.part.part_number
# Signal end of upload to the collector
self.completed_queue.put(UploadStop(error=not (uploads_finished and all_chunks_read)))

def _put_chunk(self, chunk: UploadChunk, poll_interval: int = 5) -> bool:
while True:
Expand Down

0 comments on commit 8df4718

Please sign in to comment.