/
RocksDBStore.scala
137 lines (109 loc) · 4.51 KB
/
RocksDBStore.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
/*
* Copyright (C) 2011-2017 scalable minds UG (haftungsbeschränkt) & Co. KG. <http://scm.io>
*/
package com.scalableminds.fossildb.db
import java.nio.file.{Files, Path}
import java.util
import com.typesafe.scalalogging.LazyLogging
import org.rocksdb._
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.Future
case class BackupInfo(id: Int, timestamp: Long, size: Long)
case class KeyValuePair[T](key: String, value: T)
class RocksDBManager(dataDir: Path, columnFamilies: List[String], optionsFilePathOpt: Option[String]) extends LazyLogging {
val (db: RocksDB, columnFamilyHandles) = {
RocksDB.loadLibrary()
val columnOptions = new ColumnFamilyOptions()
.setArenaBlockSize(4 * 1024 * 1024) // 4MB
.setTargetFileSizeBase(1024 * 1024 * 1024) // 1GB
.setMaxBytesForLevelBase(10 * 1024 * 1024 * 1024) // 10GB
val columnFamilyDescriptors = (columnFamilies.map(_.getBytes) :+ RocksDB.DEFAULT_COLUMN_FAMILY).map { columnFamily =>
new ColumnFamilyDescriptor(columnFamily, columnOptions)
}
val columnFamilyHandles = new util.ArrayList[ColumnFamilyHandle]
var options = new DBOptions()
var cfListRef: mutable.Buffer[ColumnFamilyDescriptor] = mutable.Buffer()
optionsFilePathOpt.map { optionsFilePath =>
try {
org.rocksdb.OptionsUtil.loadOptionsFromFile(optionsFilePath, Env.getDefault, options, cfListRef.asJava)
logger.info("successfully loaded rocksdb options from " + optionsFilePath)
} catch {
case e: Exception => {
throw new Exception("Failed to load rocksdb options from file " + optionsFilePath, e)
}
}
}
options = options
.setCreateIfMissing(true)
.setCreateMissingColumnFamilies(true)
logger.info("Opening RocksDB at " + dataDir.toAbsolutePath)
val db = RocksDB.open(
options,
dataDir.toAbsolutePath.toString,
columnFamilyDescriptors.asJava,
columnFamilyHandles)
(db, columnFamilies.zip(columnFamilyHandles.asScala).toMap)
}
def getStoreForColumnFamily(columnFamily: String): Option[RocksDBStore] = {
columnFamilyHandles.get(columnFamily).map(new RocksDBStore(db, _))
}
def backup(backupDir: Path): Option[BackupInfo] = {
if (!Files.exists(backupDir) || !Files.isDirectory(backupDir))
Files.createDirectories(backupDir)
RocksDB.loadLibrary
val backupEngine = BackupEngine.open(Env.getDefault, new BackupableDBOptions(backupDir.toString))
backupEngine.createNewBackup(db)
backupEngine.purgeOldBackups(1)
backupEngine.getBackupInfo.asScala.headOption.map(info => BackupInfo(info.backupId, info.timestamp, info.size))
}
def restoreFromBackup(backupDir: Path) = {
logger.info("Restoring from backup. RocksDB temporarily unavailable")
close()
RocksDB.loadLibrary
val backupEngine = BackupEngine.open(Env.getDefault, new BackupableDBOptions(backupDir.toString))
backupEngine.restoreDbFromLatestBackup(dataDir.toString, dataDir.toString, new RestoreOptions(true))
logger.info("Restoring from backup complete. Reopening RocksDB")
}
def close(): Future[Unit] = {
logger.info("Closing RocksDB handle")
Future.successful(db.close())
}
}
class RocksDBKeyIterator(it: RocksIterator, prefix: Option[String]) extends Iterator[String] {
override def hasNext: Boolean = it.isValid && prefix.forall(it.key().startsWith(_))
override def next: String = {
val key = new String(it.key().map(_.toChar))
it.next()
key
}
}
class RocksDBIterator(it: RocksIterator, prefix: Option[String]) extends Iterator[KeyValuePair[Array[Byte]]] {
override def hasNext: Boolean = it.isValid && prefix.forall(it.key().startsWith(_))
override def next: KeyValuePair[Array[Byte]] = {
val value = KeyValuePair(new String(it.key().map(_.toChar)) , it.value())
it.next()
value
}
}
class RocksDBStore(db: RocksDB, handle: ColumnFamilyHandle) {
def get(key: String): Array[Byte] = {
db.get(handle, key.getBytes())
}
def scan(key: String, prefix: Option[String]): Iterator[KeyValuePair[Array[Byte]]] = {
val it = db.newIterator(handle)
it.seek(key.getBytes())
new RocksDBIterator(it, prefix)
}
def scanKeysOnly(key: String, prefix: Option[String]): Iterator[String] = {
val it = db.newIterator(handle)
it.seek(key.getBytes())
new RocksDBKeyIterator(it, prefix)
}
def put(key: String, value: Array[Byte]) = {
db.put(handle, key.getBytes(), value)
}
def delete(key: String) = {
db.delete(handle, key.getBytes())
}
}