From f0f533a5d12fdd3a40af25c77290612eeb8b76f4 Mon Sep 17 00:00:00 2001 From: sihuazhou Date: Sat, 7 Apr 2018 21:52:05 +0800 Subject: [PATCH] improve the RocksDBMapState.clear() via iterating the records directly and deleting records using WriteBatch. --- .../streaming/state/RocksDBMapState.java | 51 ++++++++++++------- 1 file changed, 32 insertions(+), 19 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java index 56a7cc499214f..a2cc063b2e2e5 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java @@ -35,6 +35,7 @@ import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.rocksdb.RocksIterator; +import org.rocksdb.WriteBatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -225,11 +226,23 @@ public Map.Entry next() { @Override public void clear() { try { - Iterator> iterator = iterator(); + try (RocksIterator iterator = backend.db.newIterator(columnFamily); + WriteBatch writeBatch = new WriteBatch(128)) { - while (iterator.hasNext()) { - iterator.next(); - iterator.remove(); + final byte[] keyPrefixBytes = serializeCurrentKeyAndNamespace(); + iterator.seek(keyPrefixBytes); + + while (iterator.isValid()) { + byte[] keyBytes = iterator.key(); + if (startWithKeyPrefix(keyPrefixBytes, keyBytes)) { + writeBatch.remove(columnFamily, keyBytes); + } else { + break; + } + iterator.next(); + } + + backend.db.write(writeOptions, writeBatch); } } catch (Exception e) { LOG.warn("Error while cleaning the state.", e); @@ -356,6 +369,20 @@ private UV deserializeUserValue(byte[] rawValueBytes, TypeSerializer valueSe return isNull ? null : valueSerializer.deserialize(in); } + private boolean startWithKeyPrefix(byte[] keyPrefixBytes, byte[] rawKeyBytes) { + if (rawKeyBytes.length < keyPrefixBytes.length) { + return false; + } + + for (int i = keyPrefixBytes.length; --i >= backend.getKeyGroupPrefixBytes(); ) { + if (rawKeyBytes[i] != keyPrefixBytes[i]) { + return false; + } + } + + return true; + } + // ------------------------------------------------------------------------ // Internal Classes // ------------------------------------------------------------------------ @@ -572,7 +599,7 @@ private void loadCache() { } while (true) { - if (!iterator.isValid() || !underSameKey(iterator.key())) { + if (!iterator.isValid() || !startWithKeyPrefix(keyPrefixBytes, iterator.key())) { expired = true; break; } @@ -594,19 +621,5 @@ private void loadCache() { } } } - - private boolean underSameKey(byte[] rawKeyBytes) { - if (rawKeyBytes.length < keyPrefixBytes.length) { - return false; - } - - for (int i = keyPrefixBytes.length; --i >= backend.getKeyGroupPrefixBytes(); ) { - if (rawKeyBytes[i] != keyPrefixBytes[i]) { - return false; - } - } - - return true; - } } }