Skip to content

Commit

Permalink
Merge commit '493b2756bd15e2776fdf74a715df7a52d83c1af7' into li_trunk
Browse files Browse the repository at this point in the history
  • Loading branch information
Brett Konold committed Oct 17, 2019
2 parents d728510 + 493b275 commit bf9f0f5
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@ object RocksDbKeyValueStore extends Logging {
"rocksdb.cur-size-active-mem-table", // approximate active memtable size in bytes
"rocksdb.cur-size-all-mem-tables", // approximate active and unflushed memtable size in bytes
"rocksdb.size-all-mem-tables", // approximate active, unflushed and pinned memtable size in bytes
"rocksdb.estimate-num-keys", // approximate number keys in the active and unflushed memtable and storage
"rocksdb.total-sst-files-size" // size of all sst files on disk
"rocksdb.estimate-num-keys" // approximate number keys in the active and unflushed memtable and storage
)

val configuredMetrics = storeConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,27 +57,23 @@ class SerializedKeyValueStore[K, V](
}

def put(key: K, value: V) {
metrics.puts.inc
val keyBytes = toBytesOrNull(key, keySerde)
val valBytes = toBytesOrNull(value, msgSerde)
store.put(keyBytes, valBytes)
val valSizeBytes = if (valBytes == null) 0 else valBytes.length
updatePutMetrics(1, valSizeBytes)
}

def putAll(entries: java.util.List[Entry[K, V]]) {
val list = new java.util.ArrayList[Entry[Array[Byte], Array[Byte]]](entries.size())
val iter = entries.iterator
var newMaxRecordSizeBytes = 0
while (iter.hasNext) {
val curr = iter.next
val keyBytes = toBytesOrNull(curr.getKey, keySerde)
val valBytes = toBytesOrNull(curr.getValue, msgSerde)
val valSizeBytes = if (valBytes == null) 0 else valBytes.length
newMaxRecordSizeBytes = Math.max(newMaxRecordSizeBytes, valSizeBytes)
list.add(new Entry(keyBytes, valBytes))
}
store.putAll(list)
updatePutMetrics(list.size, newMaxRecordSizeBytes)
metrics.puts.inc(list.size)
}

def delete(key: K) {
Expand Down Expand Up @@ -155,14 +151,6 @@ class SerializedKeyValueStore[K, V](
bytes
}

private def updatePutMetrics(batchSize: Long, newMaxRecordSizeBytes: Long) = {
metrics.puts.inc(batchSize)
var max = metrics.maxRecordSizeBytes.getValue
while (newMaxRecordSizeBytes > max && !metrics.maxRecordSizeBytes.compareAndSet(max, newMaxRecordSizeBytes)) {
max = metrics.maxRecordSizeBytes.getValue
}
}

override def snapshot(from: K, to: K): KeyValueSnapshot[K, V] = {
val fromBytes = toBytesOrNull(from, keySerde)
val toBytes = toBytesOrNull(to, keySerde)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ class SerializedKeyValueStoreMetrics(
val flushes = newCounter("flushes")
val bytesSerialized = newCounter("bytes-serialized")
val bytesDeserialized = newCounter("bytes-deserialized")
val maxRecordSizeBytes = newGauge("max-record-size-bytes", 0L)

override def getPrefix = storeName + "-"
}

0 comments on commit bf9f0f5

Please sign in to comment.