New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add operator bufferTimedWithPressure with sizeOf on Observable #739

Merged
merged 5 commits into from Oct 17, 2018

Conversation

Projects
None yet
2 participants
@ybr
Contributor

ybr commented Oct 8, 2018

  • Changed the contract of BufferWithSelectorObservable to take an extra paramter sizeOf: A => Int
  • Changed the way BufferWithSelectorObservable computes the size of the buffer using sizeOf
  • Added the operator Observable[A].bufferTimedWithPressure(period: FiniteDuration, maxSize: Int, sizeOf: A => Int): Observable[Seq[A]]
  • Changed operators using BufferWithSelectorObservable with default sizeOf set to 1 to keep the same behaviour
  • Added myself to AUTHORS

Monix issue:
#735

ybr added some commits Oct 5, 2018

Add operator bufferTimedWithPressure with sizeOf sizeOf on Observable
- Changed the contract of BufferWithSelectorObservable to take an extra paramter sizeOf: A => Int
- Changed the way BufferWithSelectorObservable computes the size of the buffer using sizeOf
- Added the operator Observable[A].bufferTimedWithPressure(period: FiniteDuration, maxSize: Int, sizeOf: A => Int): Observable[Seq[A]]
- Changed operators using BufferWithSelectorObservable with default sizeOf set to 1 to keep the same behaviour
- Added myself to AUTHORS

Monix issue:
 #735
Add operator bufferTimedWithPressure with sizeOf sizeOf on Observable…
…, fix PR

- Add exclusion to mima filters for DirectMissingMethodProblem on monix.reactive.internal.operators.BufferWithSelectorObservable.this

Thank you Piotr Gawryś @Avasil

Monix issue:
 #735
@@ -739,7 +739,47 @@ abstract class Observable[+A] extends Serializable { self =>
*/
final def bufferTimedWithPressure(period: FiniteDuration, maxSize: Int): Observable[Seq[A]] = {

This comment has been minimized.

@Avasil

Avasil Oct 15, 2018

Collaborator

If we piggyback on new implementation I think we could just leave one method with default sizeOf argument. It should also fix doc ambiguity issue

This comment has been minimized.

@ybr

ybr Oct 16, 2018

Contributor

Ok I'm going to change that. Thank you.

@@ -53,7 +54,8 @@ private[reactive] final class BufferWithSelectorObservable[+A,S](
upstreamSubscriber.synchronized {
if (downstreamIsDone) Stop else {
buffer += elem
if (maxSize > 0 && buffer.length >= maxSize)
if (maxSize > 0 &&
buffer.foldLeft(0)(_ + sizeOf(_)) >= maxSize) // sum size of elements in the buffer

This comment has been minimized.

@Avasil

Avasil Oct 15, 2018

Collaborator

I wonder about performance penalty for bigger buffers (goes from O(1) to O(N)). Maybe we should track buffer size? What do you think @alexandru - is it something to worry about?

This comment has been minimized.

@ybr

ybr Oct 16, 2018

Contributor

@Avasil I agree. The point of having the sizeOf paremeter and in our case was to decrease dramatically the count of elements in the buffer we reach at most 10 elements each containing x100K values so the penalty is very low. But since sizeOf can be hard to compute it certainly worths to care about that.

This comment has been minimized.

@ybr

ybr Oct 16, 2018

Contributor

@Avasil Yes I agree with you. Furthermore, sizeOf could be resource consuming so it will surely becomes unacceptable anyway.

Add Observable.bufferTimedWithPressure with default value for sizeOf
- sizeOf default value is a function computing a weight of 1 for each element in the buffer
- keep the buffer weight as an internal state of BufferWithSelectorObservable to save calls to sizeOf for optimization purpose
- add an exclusion to mima filters on Observable.bufferTimedWithPressure

Monix issue:
 #735
@codecov

This comment has been minimized.

codecov bot commented Oct 16, 2018

Codecov Report

Merging #739 into master will increase coverage by <.01%.
The diff coverage is 85.71%.

@@            Coverage Diff             @@
##           master     #739      +/-   ##
==========================================
+ Coverage   90.51%   90.51%   +<.01%     
==========================================
  Files         398      398              
  Lines       11256    11259       +3     
  Branches     2077     2071       -6     
==========================================
+ Hits        10188    10191       +3     
  Misses       1068     1068

@ybr ybr changed the title from Add operator bufferTimedWithPressure with sizeOf sizeOf on Observable to Add operator bufferTimedWithPressure with sizeOf on Observable Oct 16, 2018

@Avasil

Avasil approved these changes Oct 16, 2018

Looks good @ybr thanks for contribution!
If neither @alexandru or @oleg-py have any comments I'll merge in the evening!

@Avasil Avasil merged commit 7c97870 into monix:master Oct 17, 2018

1 check passed

continuous-integration/travis-ci/pr The Travis CI build passed
Details
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment