Skip to content

Commit

Permalink
CCtx: add ability to configure multiple worker threads
Browse files Browse the repository at this point in the history
  • Loading branch information
luben committed Sep 20, 2021
1 parent 6030208 commit 440fa44
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 13 deletions.
8 changes: 8 additions & 0 deletions src/main/java/com/github/luben/zstd/ZstdCompressCtx.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ public ZstdCompressCtx setChecksum(boolean checksumFlag) {
}
private native void setChecksum0(boolean checksumFlag);


public ZstdCompressCtx setWorkers(int workers) {
acquireSharedLock();
Zstd.setCompressionWorkers(nativePtr, workers);
releaseSharedLock();
return this;
}

/**
* Enable or disable content size
* @param contentSizeFlag Content size will be written into frame header _whenever known_, default: true
Expand Down
45 changes: 32 additions & 13 deletions src/test/scala/Perf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,29 @@ class ZstdPerfSpec extends FlatSpec {
val output: Array[Byte] = Array.fill[Byte](input.size)(0)
var compressedSize = 0;

for (i <- 1 to cycles) {
nsc.timeAndAlloc {
compressedSize = c_ctx.compress(compressed, input)
}
nsd.timeAndAlloc {
d_ctx.decompressByteArray(output, 0, output.size, compressed, 0, compressedSize)
}
}

report(name, compressedSize, input.size, cycles, nsc, nsd)
assert (input.toSeq == output.toSeq)
}

def benchMT(name: String, input: Array[Byte], level: Int = 1): Unit = {
var nsc = new AllocTracker
var nsd = new AllocTracker
val c_ctx = new ZstdCompressCtx()
c_ctx.setLevel(level)
c_ctx.setWorkers(2)
val d_ctx = new ZstdDecompressCtx()
val compressed: Array[Byte] = Array.fill[Byte](input.size)(0)
val output: Array[Byte] = Array.fill[Byte](input.size)(0)
var compressedSize = 0;

for (i <- 1 to cycles) {
nsc.timeAndAlloc {
Expand All @@ -181,8 +204,8 @@ class ZstdPerfSpec extends FlatSpec {
assert (input.toSeq == output.toSeq)
}


def benchStream(name: String, input: Array[Byte], level: Int = 1): Unit = {
val cycles = 50
val size = input.length

val os = new ByteArrayOutputStream(Zstd.compressBound(size.toLong).toInt)
Expand Down Expand Up @@ -219,7 +242,6 @@ class ZstdPerfSpec extends FlatSpec {
}

def benchStreamWithBufferPool(name: String, input: Array[Byte], level: Int = 1): Unit = {
val cycles = 50
val size = input.length

val os = new ByteArrayOutputStream(Zstd.compressBound(size.toLong).toInt)
Expand Down Expand Up @@ -258,7 +280,6 @@ class ZstdPerfSpec extends FlatSpec {


def benchStreamMT(name: String, input: Array[Byte], level: Int = 1): Unit = {
val cycles = 50
val size = input.length

val os = new ByteArrayOutputStream(Zstd.compressBound(size.toLong).toInt)
Expand Down Expand Up @@ -297,7 +318,6 @@ class ZstdPerfSpec extends FlatSpec {


def benchDirectBufferStream(name: String, input: Array[Byte], level: Int = 1): Unit = {
val cycles = 50

val compressedBuffer = ByteBuffer.allocateDirect(Zstd.compressBound(input.length.toLong).toInt);
val inputBuffer = ByteBuffer.allocateDirect(input.length)
Expand Down Expand Up @@ -342,7 +362,6 @@ class ZstdPerfSpec extends FlatSpec {
}

def benchStreamLDM(name: String, input: Array[Byte], level: Int = 1): Unit = {
val cycles = 50
val size = input.length

val os = new ByteArrayOutputStream(Zstd.compressBound(size.toLong).toInt)
Expand Down Expand Up @@ -380,27 +399,27 @@ class ZstdPerfSpec extends FlatSpec {
}


val cycles = 200
val cycles = 50

val levels = List(-3, -1, 1, 3, 6, 9)
val buff = Source.fromFile("src/test/resources/xml")(Codec.ISO8859).map{_.toByte }.take(1024 * 1024).toArray
val buff = Source.fromFile("src/test/resources/xml")(Codec.ISO8859).map{_.toByte }.take(10 * 1024 * 1024).toArray
for (level <- levels) {
it should s"be fast for compressable data at level $level" in {
bench(s"Compressable data at $level", buff, level)
benchDirectByteBuffer(s"Compressable data at $level in a direct ByteBuffer", buff, level)
benchDirectByteBufferWithDict(s"Compressable data at $level in a direct ByteBuffer and pre-allocated contexts", buff, level)
benchMT(s"Compressable data at $level with multi-threaded", buff, level)
benchLDM(s"Compressable data at $level with Long Distance Matching", buff, level)
}
}

val buff1 = Source.fromFile("src/test/resources/xmlx2")(Codec.ISO8859).map{_.toByte }.take(10 * 1024 * 1024).toArray
for (level <- levels) {
it should s"be fast with streaming at level $level" in {
benchStream(s"Streaming at $level", buff1, level)
benchStreamWithBufferPool(s"Streaming with BufferPool at $level", buff1, level)
benchStreamMT(s"Streaming (multi-threaded) at $level", buff1, level)
benchDirectBufferStream(s"Streaming at $level to direct ByteBuffers", buff1, level)
benchStreamLDM(s"Streaming at $level with Long Distance Matching", buff1, level)
benchStream(s"Streaming at $level", buff, level)
benchStreamWithBufferPool(s"Streaming with BufferPool at $level", buff, level)
benchStreamMT(s"Streaming (multi-threaded) at $level", buff, level)
benchDirectBufferStream(s"Streaming at $level to direct ByteBuffers", buff, level)
benchStreamLDM(s"Streaming at $level with Long Distance Matching", buff, level)
}
}
}

0 comments on commit 440fa44

Please sign in to comment.