I've been using a custom stream for some time. It works the same way as partition, but with two differences:
-
You can specify timeout option - the number of seconds after which the stream will force-flush the buffer, even if number of elements in it are below the limit.
This can be useful when you have some data delivery time requirement, but data flow rate is not constant.
-
You can specify key option - a callable that will return a partitioning key from a message, effectively splitting the buffer into several independent buffers.
This is used when you need your batches to be grouped in some manner. I use this to make sure messages in each batch belong to the same source before passing them on to the sink that writes them to an appropriate database table. It's similar to the PartitionRecord processor in NiFi.
I can make a PR, but the questions are:
- should I? does it make sense to add this to core?
- if yes, should this go into
partition or some new operator? The standard behavior won't change so it's not breaking, but implementation details will. Someone might rely on partition._buffer being a list, not a defaultdict.
I've been using a custom stream for some time. It works the same way as
partition, but with two differences:You can specify
timeoutoption - the number of seconds after which the stream will force-flush the buffer, even if number of elements in it are below the limit.This can be useful when you have some data delivery time requirement, but data flow rate is not constant.
You can specify
keyoption - a callable that will return a partitioning key from a message, effectively splitting the buffer into several independent buffers.This is used when you need your batches to be grouped in some manner. I use this to make sure messages in each batch belong to the same source before passing them on to the sink that writes them to an appropriate database table. It's similar to the PartitionRecord processor in NiFi.
I can make a PR, but the questions are:
partitionor some new operator? The standard behavior won't change so it's not breaking, but implementation details will. Someone might rely onpartition._bufferbeing a list, not a defaultdict.