Skip to content

Commit

Permalink
fix(commons): fix resources is not reallocated when stream is restarted
Browse files Browse the repository at this point in the history
  • Loading branch information
fhussonnois committed Mar 2, 2021
1 parent 0a71b8e commit d63ad8e
Showing 1 changed file with 7 additions and 0 deletions.
Expand Up @@ -21,6 +21,8 @@
import io.streamthoughts.azkarra.commons.rocksdb.internal.OpaqueMemoryResource;
import io.streamthoughts.azkarra.commons.rocksdb.internal.ResourceDisposer;
import io.streamthoughts.azkarra.commons.rocksdb.internal.ResourceInitializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashSet;
import java.util.Objects;
Expand All @@ -32,6 +34,8 @@
*/
final class RocksDBMemoryManager {

private static final Logger LOG = LoggerFactory.getLogger(RocksDBMemoryManager.class);

private final ReentrantLock lock = new ReentrantLock();

private LeasedResource<RocksDBSharedResources> leasedResource;
Expand All @@ -52,6 +56,7 @@ OpaqueMemoryResource<RocksDBSharedResources> getOrAllocateSharedResource(
lock.lock();
try {
if (leasedResource == null) {
LOG.info("Initializing RocksDB shared resources: Write-Buffer-Manager and Cache");
RocksDBSharedResources resource = initializer.apply();
leasedResource = new LeasedResource<>(resource);
}
Expand All @@ -72,7 +77,9 @@ void release(final Object leaseHolder) throws Exception {
}

if (leasedResource.removeLeaseHolder(leaseHolder)) {
LOG.info("Closing RocksDB shared resources: Write-Buffer-Manager and Cache");
leasedResource.close();
leasedResource = null;
}
} finally {
lock.unlock();
Expand Down

0 comments on commit d63ad8e

Please sign in to comment.