Skip to content

Commit

Permalink
refactor historyStore, rootsStore, coldStore, RSpaceImporterStore, RS…
Browse files Browse the repository at this point in the history
…paceExporterStore to use keyValueStore
  • Loading branch information
zsluedem committed Jan 12, 2021
1 parent b685a29 commit a9c0f7d
Show file tree
Hide file tree
Showing 11 changed files with 108 additions and 160 deletions.
41 changes: 7 additions & 34 deletions rspace/src/main/scala/coop/rchain/rspace/history/ColdStore.scala
Expand Up @@ -3,22 +3,21 @@ package coop.rchain.rspace.history
import cats.implicits._
import cats.effect.Sync
import coop.rchain.rspace.Blake2b256Hash
import coop.rchain.rspace.Blake2b256Hash.codecPureBlake2b256Hash
import scodec.Codec
import scodec.bits.ByteVector
import scodec.codecs.{discriminated, uint2}
import coop.rchain.rspace.internal.codecByteVector
import coop.rchain.store.{KeyValueStore, KeyValueTypedStore}
import coop.rchain.shared.AttemptOpsF.RichAttempt
import coop.rchain.shared.syntax._

trait ColdStore[F[_]] {
def put(hash: Blake2b256Hash, data: PersistedData): F[Unit]
def put(data: List[(Blake2b256Hash, PersistedData)]): F[Unit]

def get(hash: Blake2b256Hash): F[Option[PersistedData]]
object ColdStoreInstances {
type ColdKeyValueStore[F[_]] = KeyValueTypedStore[F, Blake2b256Hash, PersistedData]

def close(): F[Unit]
}
def coldStore[F[_]: Sync](store: KeyValueStore[F]): ColdKeyValueStore[F] =
store.toTypedStore(codecPureBlake2b256Hash, codecPersistedData)

object ColdStoreInstances {
val codecPersistedData: Codec[PersistedData] =
discriminated[PersistedData]
.by(uint2)
Expand All @@ -31,32 +30,6 @@ object ColdStoreInstances {
.subcaseP(2) {
case c: ContinuationsLeaf => c
}(codecByteVector.as[ContinuationsLeaf])

def coldStore[F[_]: Sync](store: Store[F]): ColdStore[F] = new ColdStore[F] {
private val codec = codecPersistedData

override def put(key: Blake2b256Hash, d: PersistedData): F[Unit] =
for {
encoded <- codec.encode(d).get
data <- store.put(key, encoded)
} yield data

override def get(key: Blake2b256Hash): F[Option[PersistedData]] =
for {
maybeBytes <- store.get(key)
maybeDecoded <- maybeBytes.map(bytes => codec.decode(bytes).get).sequence
} yield (maybeDecoded.map(_.value))

override def close(): F[Unit] = store.close()

override def put(data: List[(Blake2b256Hash, PersistedData)]): F[Unit] =
data
.traverse {
case (key, data) => codec.encode(data).get.map((key, _))
}
.flatMap(encoded => store.put(encoded))

}
}

sealed trait PersistedData {
Expand Down
11 changes: 5 additions & 6 deletions rspace/src/main/scala/coop/rchain/rspace/history/History.scala
Expand Up @@ -13,7 +13,6 @@ trait History[F[_]] {
def process(actions: List[HistoryAction]): F[History[F]]
def root: Blake2b256Hash
def find(key: KeyPath): F[(TriePointer, Vector[Trie])]
def close(): F[Unit]
def reset(root: Blake2b256Hash): History[F]
}

Expand Down Expand Up @@ -128,23 +127,23 @@ object Trie {
}(provide(EmptyPointer))
.subcaseP(1) {
case p: LeafPointer => p
}(Blake2b256Hash.codecBlake2b256Hash.as[LeafPointer])
}(Blake2b256Hash.codecWithBytesStringBlake2b256Hash.as[LeafPointer])
.subcaseP(2) {
case p: SkipPointer => p
}(Blake2b256Hash.codecBlake2b256Hash.as[SkipPointer])
}(Blake2b256Hash.codecWithBytesStringBlake2b256Hash.as[SkipPointer])
.subcaseP(3) {
case p: NodePointer => p
}(Blake2b256Hash.codecBlake2b256Hash.as[NodePointer])
}(Blake2b256Hash.codecWithBytesStringBlake2b256Hash.as[NodePointer])

implicit def codecTrieValuePointer: Codec[ValuePointer] =
discriminated[ValuePointer]
.by(uint(1))
.subcaseP(0) {
case p: LeafPointer => p
}(Blake2b256Hash.codecBlake2b256Hash.as[LeafPointer])
}(Blake2b256Hash.codecWithBytesStringBlake2b256Hash.as[LeafPointer])
.subcaseP(1) {
case p: NodePointer => p
}(Blake2b256Hash.codecBlake2b256Hash.as[NodePointer])
}(Blake2b256Hash.codecWithBytesStringBlake2b256Hash.as[NodePointer])

}

Expand Down
Expand Up @@ -5,6 +5,7 @@ import cats.effect.Sync
import cats.implicits._
import coop.rchain.rspace.Blake2b256Hash
import coop.rchain.rspace.history.History._
import coop.rchain.shared.syntax._
import scodec.bits.ByteVector

import Ordering.Implicits.seqDerivedOrdering
Expand Down Expand Up @@ -555,8 +556,6 @@ object HistoryInstances {

}

override def close(): F[Unit] = historyStore.close()

override def reset(root: Blake2b256Hash): History[F] = this.copy(root = root)

}
Expand Down Expand Up @@ -587,8 +586,6 @@ object HistoryInstances {
}
} yield result

override def close(): F[Unit] = historyStore.close()

def clear(): F[Unit] = Sync[F].delay {
cache.clear()
}
Expand Down
Expand Up @@ -6,7 +6,8 @@ import cats.Parallel
import coop.rchain.rspace.state.{RSpaceExporter, RSpaceImporter}
import coop.rchain.rspace.state.instances.{RSpaceExporterStore, RSpaceImporterStore}
import coop.rchain.rspace.{Blake2b256Hash, HistoryReader, HotStoreAction}
import coop.rchain.store.KeyValueStoreManager
import coop.rchain.shared.Serialize
import coop.rchain.store.{KeyValueStore, KeyValueStoreManager}
import org.lmdbjava.EnvFlags
import scodec.Codec

Expand All @@ -17,8 +18,6 @@ trait HistoryRepository[F[_], C, P, A, K] extends HistoryReader[F, C, P, A, K] {

def history: History[F]

def close(): F[Unit]

def exporter: F[RSpaceExporter[F]]

def importer: F[RSpaceImporter[F]]
Expand All @@ -32,16 +31,12 @@ final case class LMDBStorageConfig(
flags: List[EnvFlags] = Nil,
dbNamePrefix: String = "db"
)
final case class LMDBRSpaceStorageConfig(
coldStore: StoreConfig,
historyStore: StoreConfig,
rootsStore: StoreConfig
)

object HistoryRepositoryInstances {

def lmdbRepository[F[_]: Concurrent: Parallel, C, P, A, K](
config: LMDBRSpaceStorageConfig
rootsKeyValueStore: KeyValueStore[F],
coldKeyValueStore: KeyValueStore[F],
historyKeyValueStore: KeyValueStore[F]
)(
implicit codecC: Codec[C],
codecP: Codec[P],
Expand All @@ -50,19 +45,18 @@ object HistoryRepositoryInstances {
): F[HistoryRepository[F, C, P, A, K]] =
for {
// Roots store
rootsLMDBStore <- StoreInstances.lmdbStore[F](config.rootsStore)
rootsRepository = new RootRepository[F](RootsStoreInstances.rootsStore[F](rootsLMDBStore))
currentRoot <- rootsRepository.currentRoot()
rootsRepository <- new RootRepository[F](
RootsStoreInstances.rootsStore[F](rootsKeyValueStore)
).pure[F]
currentRoot <- rootsRepository.currentRoot()
// Cold store
coldLMDBStore <- StoreInstances.lmdbStore[F](config.coldStore)
coldStore = ColdStoreInstances.coldStore[F](coldLMDBStore)
coldStore = ColdStoreInstances.coldStore[F](coldKeyValueStore)
// History store
historyLMDBStore <- StoreInstances.lmdbStore[F](config.historyStore)
historyStore = HistoryStoreInstances.historyStore[F](historyLMDBStore)
history = HistoryInstances.merging(currentRoot, historyStore)
historyStore = HistoryStoreInstances.historyStore[F](historyKeyValueStore)
history = HistoryInstances.merging(currentRoot, historyStore)
// RSpace importer/exporter / directly operates on Store (lmdb)
exporter = RSpaceExporterStore[F](historyLMDBStore, coldLMDBStore, rootsLMDBStore)
importer = RSpaceImporterStore[F](historyLMDBStore, coldLMDBStore, rootsLMDBStore)
exporter = RSpaceExporterStore[F](historyKeyValueStore, coldKeyValueStore, rootsKeyValueStore)
importer = RSpaceImporterStore[F](historyKeyValueStore, coldKeyValueStore, rootsKeyValueStore)
} yield HistoryRepositoryImpl[F, C, P, A, K](
history,
rootsRepository,
Expand Down
Expand Up @@ -8,6 +8,7 @@ import cats.{Applicative, Parallel}
import com.typesafe.scalalogging.Logger
import coop.rchain.rspace.history.HistoryRepositoryImpl._
import coop.rchain.rspace.internal._
import coop.rchain.rspace.history.ColdStoreInstances.ColdKeyValueStore
import coop.rchain.rspace.state.{RSpaceExporter, RSpaceImporter}
import coop.rchain.rspace.{
internal,
Expand All @@ -23,13 +24,14 @@ import coop.rchain.rspace.{
RSpace
}
import coop.rchain.shared.Serialize
import coop.rchain.shared.syntax._
import scodec.Codec
import scodec.bits.{BitVector, ByteVector}

final case class HistoryRepositoryImpl[F[_]: Sync: Parallel, C, P, A, K](
history: History[F],
rootsRepository: RootRepository[F],
leafStore: ColdStore[F],
leafStore: ColdKeyValueStore[F],
rspaceExporter: RSpaceExporter[F],
rspaceImporter: RSpaceImporter[F]
)(implicit codecC: Codec[C], codecP: Codec[P], codecA: Codec[A], codecK: Codec[K])
Expand Down Expand Up @@ -194,7 +196,7 @@ final case class HistoryRepositoryImpl[F[_]: Sync: Parallel, C, P, A, K](

private def storeLeaves(leafs: List[Result]): F[List[HistoryAction]] = {
val toBeStored = leafs.collect { case (key, Some(data), _) => (key, data) }
leafStore.put(toBeStored).map(_ => leafs.map(_._3))
leafStore.putIfAbsent(toBeStored).map(_ => leafs.map(_._3))
}

override def checkpoint(actions: List[HotStoreAction]): F[HistoryRepository[F, C, P, A, K]] = {
Expand All @@ -215,13 +217,6 @@ final case class HistoryRepositoryImpl[F[_]: Sync: Parallel, C, P, A, K](
next = history.reset(root = root)
} yield this.copy(history = next)

override def close(): F[Unit] =
for {
_ <- leafStore.close()
_ <- rootsRepository.close()
_ <- history.close()
} yield ()

override def exporter: F[RSpaceExporter[F]] = Sync[F].delay(rspaceExporter)

override def importer: F[RSpaceImporter[F]] = Sync[F].delay(rspaceImporter)
Expand Down
44 changes: 11 additions & 33 deletions rspace/src/main/scala/coop/rchain/rspace/history/HistoryStore.scala
@@ -1,9 +1,12 @@
package coop.rchain.rspace.history

import cats.Functor
import cats.implicits._
import cats.effect.Sync
import coop.rchain.rspace.Blake2b256Hash
import coop.rchain.shared.AttemptOpsF.RichAttempt
import coop.rchain.store.{KeyValueStore, KeyValueTypedStore}
import coop.rchain.shared.syntax._
import scodec.DecodeResult
import scodec.bits.BitVector

Expand All @@ -12,42 +15,17 @@ trait HistoryStore[F[_]] {

def get(key: Blake2b256Hash): F[Trie]

def close(): F[Unit]
}

object HistoryStoreInstances {
type KVData = (Blake2b256Hash, BitVector)
def historyStore[F[_]: Sync](store: Store[F]): HistoryStore[F] = new HistoryStore[F] {
// TODO put list
override def put(tries: List[Trie]): F[Unit] = {

def asEncoded(t: Trie): F[KVData] =
for {
// TODO: the key hash is not the hash of stored binary value but it's
// the hash of each case of Trie (see `Trie.hash`)
// - this makes it much harder to validate binary value because it must be encoded first
b <- Trie.codecTrie.encode(t).get
k = Trie.hash(t)
} yield (k, b)

for {
asKeyValue <- tries traverse asEncoded
storeRes <- store.put(asKeyValue)
} yield (storeRes)
}

override def get(key: Blake2b256Hash): F[Trie] =
for {
maybeBytes <- store.get(key)
result <- maybeBytes.traverse(decodeTrieUnsafe(key))
} yield (result.map(_.value).getOrElse(EmptyTrie))

private def decodeTrieUnsafe(key: Blake2b256Hash)(bytes: BitVector) =
Trie.codecTrie.decode(bytes).get.handleErrorWith { ex: Throwable =>
new Exception(s"Critical error: decoding value for key ${key.bytes.toHex} failed.", ex)
.raiseError[F, DecodeResult[Trie]]
}

override def close(): F[Unit] = store.close()

def historyStore[F[_]: Sync](store: KeyValueStore[F]): HistoryStore[F] = new HistoryStore[F] {
val typedStore = store.toTypedStore(Blake2b256Hash.codecPureBlake2b256Hash, Trie.codecTrie)

override def get(key: Blake2b256Hash): F[Trie] = typedStore.getOrElse(key, EmptyTrie)

override def put(tries: List[Trie]): F[Unit] =
typedStore.putIfAbsent(tries.map(t => (Trie.hash(t), t)))
}
}
Expand Up @@ -28,5 +28,4 @@ class RootRepository[F[_]: Sync](
case Some(_) => Applicative[F].pure(())
}

def close(): F[Unit] = rootsStore.close()
}
38 changes: 25 additions & 13 deletions rspace/src/main/scala/coop/rchain/rspace/history/RootsStore.scala
Expand Up @@ -8,45 +8,57 @@ import cats.effect.Sync
import coop.rchain.rspace.Blake2b256Hash
import coop.rchain.shared.AttemptOpsF.RichAttempt
import coop.rchain.shared.ByteVectorOps._
import coop.rchain.store.KeyValueStore
import scodec.bits.{BitVector, ByteVector}
import coop.rchain.shared.syntax._

trait RootsStore[F[_]] {
def currentRoot(): F[Option[Blake2b256Hash]]
def validateAndSetCurrentRoot(key: Blake2b256Hash): F[Option[Blake2b256Hash]]
def recordRoot(key: Blake2b256Hash): F[Unit]

def close(): F[Unit]
}

object RootsStoreInstances {
def rootsStore[F[_]: Sync](store: Store[F]): RootsStore[F] = new RootsStore[F] {
def rootsStore[F[_]: Sync](store: KeyValueStore[F]): RootsStore[F] = new RootsStore[F] {
val tag: ByteBuffer = ByteVector("root".getBytes(StandardCharsets.UTF_8)).toDirectByteBuffer
val currentRootName: ByteBuffer =
ByteVector("current-root".getBytes(StandardCharsets.UTF_8)).toDirectByteBuffer

override def currentRoot(): F[Option[Blake2b256Hash]] =
for {
bytes <- store.get(currentRootName)
maybeDecoded <- bytes.map(Blake2b256Hash.codecBlake2b256Hash.decode(_).get).sequence
maybeHash = maybeDecoded.map(_.value)
} yield (maybeHash)
bytes <- store.get1(
currentRootName,
identity
)
maybeDecoded <- bytes
.map(
b =>
Blake2b256Hash.codecWithBytesStringBlake2b256Hash
.decode(BitVector(b))
.get
)
.sequence
maybeHash = maybeDecoded.map(_.value)
} yield maybeHash

override def validateAndSetCurrentRoot(key: Blake2b256Hash): F[Option[Blake2b256Hash]] =
for {
bits <- Blake2b256Hash.codecBlake2b256Hash.encode(key).get
bits <- Blake2b256Hash.codecWithBytesStringBlake2b256Hash.encode(key).get
bytes = bits.toByteVector.toDirectByteBuffer
byteBuf <- store.get(bytes)
result <- byteBuf.traverse(_ => store.put(currentRootName, bytes).as(key))
byteBuf <- store.get1(bytes, identity)
result <- byteBuf.traverse(
_ => store.put1(currentRootName, bytes, identity[ByteBuffer]).as(key)
)
} yield result

override def recordRoot(key: Blake2b256Hash): F[Unit] =
for {
bits <- Blake2b256Hash.codecBlake2b256Hash.encode(key).get
bits <- Blake2b256Hash.codecWithBytesStringBlake2b256Hash.encode(key).get
bytes = bits.toByteVector.toDirectByteBuffer
_ <- store.put(bytes, tag)
_ <- store.put(currentRootName, bytes)
_ <- store.put1(bytes, tag, identity[ByteBuffer])
_ <- store.put1(currentRootName, bytes, identity[ByteBuffer])
} yield ()

override def close(): F[Unit] = store.close()
}
}

0 comments on commit a9c0f7d

Please sign in to comment.