New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for compression: Gzip and deflate #1244
Conversation
bufferSize: Int = 32 * 1024, | ||
chunkSize: Int = 32 * 1024, | ||
compressionParameters: CompressionParameters = CompressionParameters.Default, | ||
noWrap: Boolean = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we keep it as Boolean
or model it as in fs2: https://github.com/typelevel/fs2/blob/main/core/jvm/src/main/scala/fs2/compression.scala#L15 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm personally fine with API without DeflateParams
overload
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the hard work, it looks very good and the test coverage is fantastic. :)
I've left some comments, mainly nitpicks.
I didn't fully review compression logic and I only glanced at the tests, I'll have another pass later, PR is quite big
monix-reactive/jvm/src/main/scala/monix/reactive/compress/CompressionException.scala
Outdated
Show resolved
Hide resolved
monix-reactive/jvm/src/main/scala/monix/reactive/compress/CompressionParameters.scala
Outdated
Show resolved
Hide resolved
monix-reactive/jvm/src/main/scala/monix/reactive/compress/CompressionParameters.scala
Outdated
Show resolved
Hide resolved
...reactive/jvm/src/main/scala/monix/reactive/compress/internal/operators/DeflateOperator.scala
Outdated
Show resolved
Hide resolved
...reactive/jvm/src/main/scala/monix/reactive/compress/internal/operators/DeflateOperator.scala
Outdated
Show resolved
Hide resolved
val DEFLATE: Byte = Deflater.DEFLATED.toByte | ||
} | ||
|
||
implicit class RichByteObservable(source: Observable[Byte]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively, we could use Transformer
that was brought recently but it would be a precedence. :D
I feel like it could be a bit nicer and more discoverable (source.transform(gzip)
) than syntax import.
I would also move the methods to the top of the file so users don't have to scroll through private fields if they explore the file
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the Transformer
is a new way of doing such things then we should go with it.
I don't know if it is nicer or more discoverable but certainly it would be easier to use it from java.
I will take a look at Transformer
and try to use it.
Also transform might be better because this operation doesn't change the shape of the stream as most of the operators do but only affects its payload.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't use it too frequently myself, but many people like it, it's the equivalent of via
/ transduce
/ through
etc
fileName: Option[String] = None, | ||
comment: Option[String] = None, | ||
modificationTime: Option[Instant] = None | ||
): Observable[Byte] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it desirable to return Observable[Byte]
?
I feel like it would be better to take and return Observable[Array[Byte]]
and if the users need Observable[Byte]
then they could do buffering and concatenation themselves.
fs2 and ZIO return Stream[Byte] but in reality, it's a stream of Chunk[Byte] which has to be explicit in Observable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When you read data in a streaming fashion in binary format (e.g. from file) you mostly get Observable[Byte]
, so that would require users to always do buffering before using any compression transformer. Also they would need to know what chunk size to choose. Choosing too small or too big value can cause performance degradation. This explains my intention behind using Observable[Byte]
as the input.
To not introduce any confusion and be transparent for the end users I decided to do not change the type and return Observable[Byte]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was wrong, you might get Stream[Byte]
but that's rather popular among libraries that chunk data under the hood (fs2/zio) which effectively means that you always deal with Stream[Array[Byte]]
either explicitly or not. That's good as transforming bytes one by one might not be the optimal solution in terms of performance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also monix-nio returns Observable[Array[Byte]]
and if we should be compatible with anything, that should be monix-nio.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As another reason for Array
, Akka Streams (which doesn't do chunking) uses ByteString
which extends IndexedSeq[Byte]
When you read data in a streaming fashion in binary format (e.g. from file) you mostly get Observable[Byte], so that would require users to always do buffering before using any compression transformer.
Related operators, like Observable.fromInputStream
return Observable[Array[Byte]]
and if we add more, I feel like we should emit Array[Byte]
for performance reasons, as you mentioned.
Also they would need to know what chunk size to choose.
That's a fair point, maybe we could add a comment with a suggestion/example how to use it with Observable[Byte]
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally we would write a small benchmark on both to compare their performance :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
About that:
That's a fair point, maybe we could add a comment with a suggestion/example how to use it with Observable[Byte]?
I thought that maybe such helpful extension method could be declared for those who will wonder?
final def chunked[B >: A](chunkSize :Int = 8 * 1024)(implicit ev: B =:= Byte) : Observable[Array[Byte]] = {
bufferTumbling(chunkSize)
.map(seq => ev.liftCo[Seq](seq).toArray)
}
To make it more discover-able I would define it on Observable
itself although that's a precedence.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this operator might make sense for performance scenarios, not only for Byte
.
bufferTumbling
uses Array
as a buffer but it converts it to Seq
before sending downstream.
We could copy-paste implementation and send the copy of the array without the conversion.
Although it would be better to do with benchmarks (there are some that use bufferTumbling and it's a bottleneck) so I feel like it would be preferable to tackle in another PR
bufferSize: Int = 32 * 1024, | ||
chunkSize: Int = 32 * 1024, | ||
compressionParameters: CompressionParameters = CompressionParameters.Default, | ||
noWrap: Boolean = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm personally fine with API without DeflateParams
overload
monix-reactive/jvm/src/test/scala/monix/reactive/compress/DeflateIntegrationTest.scala
Outdated
Show resolved
Hide resolved
monix-reactive/jvm/src/test/scala/monix/reactive/compress/InflateTest.scala
Outdated
Show resolved
Hide resolved
Last but not least fs2 returns |
monix-reactive/jvm/src/main/scala/monix/reactive/compress/internal/operators/GzipOperator.scala
Outdated
Show resolved
Hide resolved
isDone = true | ||
if (ack == null) ack = Continue | ||
ack.syncOnContinue { | ||
out.onNext(gzipper.finish()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gzipper.finish() never throws, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that it can, should be fixed now.
...-reactive/jvm/src/main/scala/monix/reactive/compress/internal/operators/GunzipOperator.scala
Outdated
Show resolved
Hide resolved
...reactive/jvm/src/main/scala/monix/reactive/compress/internal/operators/InflateOperator.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the delay, I've made another pass - I've resolved cosmetic comments and elaborated on those that should be fixed. If you don't have time then let me know and I can finish it - it already looks fantastic!
Yeah, I think that's fine. If there's ever a demand, we could create an extra method but working with stream of streams could be awkward for the general use case and I assume you're implementing it because you need it in sttp/tapir |
No problem, I know how it is ;) Thanks for the careful code-review. I think that now the code should comply with monix-reactive protocol. One think that bothers me is that this inconsistency with existing protocol wasn't caught by tests. Correct me if I am wrong, but I think that you mentioned (during your last presentation) something about having a generic test for checking protocol compliance. Maybe I've just forgot to implement some trait in tests? |
There is |
We can check it, but I am not sure how to combine this |
Many other operators are synchronous too so that shouldn't be a problem.
Usually, we just add test cases below implementation of I don't know how well will the test suite fit for these operators so feel free to skip it if there are too many problems |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks great to me - thanks once again :)
Although few tests seem to fail
This PR adds operators to compress/decompress stream of bytes using gzip and deflate algorithms.
Most of the heavy work was done by my colleague @LGLO while implementing this feature in ZIO (zio/zio#3825)
I've just adapted his work to monix. I also did some improvements based on the fs2 code (added option to provide
fileName
,comment
andmodificationTimestamp
when compressing a stream).Not sure if the packages are ok, looking forward for your review.