Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add decent impl of a FileConsumer #2804

Merged
merged 8 commits into from Jan 18, 2018
Merged

Add decent impl of a FileConsumer #2804

merged 8 commits into from Jan 18, 2018

Conversation

erikjohnston
Copy link
Member

Twisted core doesn't have a general purpose one, so we need to write one
ourselves.

Features:

  • All writing happens in background thread
  • Supports both push and pull producers
  • Push producers get paused if the consumer falls behind

Twisted core doesn't have a general purpose one, so we need to write one
ourselves.

Features:
- All writing happens in background thread
- Supports both push and pull producers
- Push producers get paused if the consumer falls behind
@@ -0,0 +1,158 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vecotr Ltd
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

vecotr

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's almost as if you're typing this by hand instead of having your editor generate it...

self._notify_empty_deferred = None

def registerProducer(self, producer, streaming):
"""Part of IProducer interface
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IConsumer?

if not streaming:
self.producer.resumeProducing()

self.paused_producer = False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we declare this in the constructor?

_RESUME_ON_QUEUE_SIZE = 2

def __init__(self, file_obj):
self.file_obj = file_obj
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason that some fields get underscores and some do not? I'd say they all should have one


self.bytes_queue.put_nowait(bytes)

# If this is a pushed based consumer and the queue is getting behind
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/pushed based consumer/PushProducer/ ?

streaming (bool): True if push based producer, False if pull
based.
"""
self.producer = producer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we have some sanity checks to prevent registering twice?

if self.bytes_queue.qsize() <= self._RESUME_ON_QUEUE_SIZE:
reactor.callFromThread(self._resume_paused_producer)

if self._notify_empty and self.bytes_queue.empty():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self._notify_empty is always truthy

# producer.
if self.producer and self.paused_producer:
if self.bytes_queue.qsize() <= self._RESUME_ON_QUEUE_SIZE:
reactor.callFromThread(self._resume_paused_producer)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this block until the callee finishes, or not? if not we could end up resuming multiple times - is that a problem?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fairly sure it doesn't, and its not a problem as _resume_paused_producer checks self.paused_producer again anyway.

def wait(self):
"""Returns a deferred that resolves when finished writing to file
"""
return make_deferred_yieldable(self.finished_deferred)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does this make_deferred_yieldable when nothing else does?

consumer.registerProducer(producer, True)

consumer.write("Foo")
yield consumer.wait_for_writes()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kinda feel like this might be better done by having the test file object let you wait for writes, rather than having the extra functionality in the consumer solely for testing, but ymmv

@richvdh richvdh assigned erikjohnston and unassigned richvdh Jan 17, 2018
Copy link
Member

@richvdh richvdh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@richvdh richvdh assigned erikjohnston and unassigned richvdh Jan 18, 2018
@erikjohnston erikjohnston merged commit b6dc704 into develop Jan 18, 2018
@erikjohnston erikjohnston deleted the erikj/file_consumer branch March 5, 2018 15:56
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants