Skip to content

Commit

Permalink
Try #3397:
Browse files Browse the repository at this point in the history
  • Loading branch information
bors[bot] committed May 4, 2021
2 parents e6a208e + 1f70dd1 commit 21d9186
Show file tree
Hide file tree
Showing 18 changed files with 496 additions and 426 deletions.
123 changes: 0 additions & 123 deletions casper/src/main/scala/coop/rchain/casper/ReportMemStore.scala

This file was deleted.

42 changes: 42 additions & 0 deletions casper/src/main/scala/coop/rchain/casper/ReportStore.scala
@@ -0,0 +1,42 @@
package coop.rchain.casper

import cats.effect.Sync
import com.google.protobuf.ByteString
import coop.rchain.blockstorage.dag.codecs
import coop.rchain.casper.protocol.BlockEventInfo
import coop.rchain.shared.Compression
import coop.rchain.shared.syntax._
import coop.rchain.store.{KeyValueStoreManager, KeyValueTypedStore}
import net.jpountz.lz4.{LZ4CompressorWithLength, LZ4DecompressorWithLength}
import scodec.SizeBound.unknown
import scodec.{Attempt, Codec, DecodeResult, SizeBound}
import scodec.bits.BitVector

object ReportStore {

val compressor = new LZ4CompressorWithLength(Compression.factory.fastCompressor())
// val compressor = new LZ4CompressorWithLength(factory.highCompressor(17)) // Max compression
val decompressor = new LZ4DecompressorWithLength(Compression.factory.fastDecompressor())

def compressBytes(bytes: Array[Byte]): Array[Byte] = compressor.compress(bytes)

type ReportStore[F[_]] = KeyValueTypedStore[F, ByteString, BlockEventInfo]

val blockEventInfoCodec = new Codec[BlockEventInfo] {
override def encode(value: BlockEventInfo): Attempt[BitVector] =
Attempt.successful(BitVector(compressor.compress(value.toByteArray)))

override def decode(bits: BitVector): Attempt[DecodeResult[BlockEventInfo]] =
Attempt.successful(
DecodeResult(
BlockEventInfo.parseFrom(decompressor.decompress(bits.toByteArray)),
BitVector.empty
)
)

override def sizeBound: SizeBound = unknown
}

def store[F[_]: Sync](kvm: KeyValueStoreManager[F]): F[ReportStore[F]] =
kvm.database("reporting-cache", codecs.codecByteString, blockEventInfoCodec)
}
148 changes: 64 additions & 84 deletions casper/src/main/scala/coop/rchain/casper/ReportingCasper.scala
@@ -1,16 +1,21 @@
package coop.rchain.casper

import java.nio.file.{Files, Path}

import cats.data.EitherT
import cats.effect.concurrent.{MVar, MVar2, Ref}
import cats.effect.{Concurrent, ContextShift, Sync}
import cats.implicits._
import cats.{Monad, Parallel}
import com.google.protobuf.ByteString
import coop.rchain.blockstorage.BlockStore
import coop.rchain.blockstorage.dag.{BlockDagRepresentation, BlockDagStorage}
import coop.rchain.casper.ReportingCasper.RhoReportingRspace
import coop.rchain.casper.protocol.{BlockMessage, ProcessedDeploy, ProcessedSystemDeploy}
import coop.rchain.casper.protocol.{
BlockMessage,
ProcessedDeploy,
ProcessedSystemDeploy,
SystemDeployData
}
import coop.rchain.casper.util.{EventConverter, ProtoUtil}
import coop.rchain.casper.util.rholang.RuntimeManager.StateHash
import coop.rchain.casper.syntax._
Expand Down Expand Up @@ -38,136 +43,94 @@ import coop.rchain.rspace.RSpace.RSpaceStore
import scala.collection.concurrent.TrieMap

sealed trait ReportError
final case class ReportBlockNotFound(hash: BlockHash) extends ReportError
final case class ReportReplayError(error: ReplayFailure) extends ReportError
final case class DeployReportResult(
processedDeploy: ProcessedDeploy,
events: Seq[Seq[ReportingEvent]]
)
final case class SystemDeployReportResult(
processedSystemDeploy: SystemDeployData,
events: Seq[Seq[ReportingEvent]]
)
final case class ReplayResult(
deployReportResult: List[DeployReportResult],
systemDeployReportResult: List[SystemDeployReportResult],
postStateHash: ByteString
)

trait ReportingCasper[F[_]] {
def trace(
hash: BlockHash
): F[Either[ReportError, List[(ProcessedDeploy, Seq[Seq[ReportingEvent]])]]]
block: BlockMessage
): F[Either[ReportError, ReplayResult]]
}

object ReportingCasper {
def noop[F[_]: Sync]: ReportingCasper[F] = new ReportingCasper[F] {

override def trace(
hash: BlockHash
): F[Either[ReportError, List[(ProcessedDeploy, Seq[Seq[ReportingEvent]])]]] =
Sync[F].delay(Right(List.empty[(ProcessedDeploy, Seq[Seq[ReportingEvent]])]))
block: BlockMessage
): F[Either[ReportError, ReplayResult]] =
Sync[F].delay(Right(ReplayResult(List.empty, List.empty, ByteString.copyFromUtf8("empty"))))
}

type RhoReportingRspace[F[_]] =
ReportingRspace[F, Par, BindPattern, ListParWithRandom, TaggedContinuation]

def rhoReporter[F[_]: ContextShift: Concurrent: Log: Metrics: Span: Parallel: BlockStore: BlockDagStorage](
memStore: ReportMemStore[F],
rspaceStore: RSpaceStore[F]
)(implicit scheduler: ExecutionContext): ReportingCasper[F] =
new ReportingCasper[F] {
implicit val source = Metrics.Source(CasperMetricsSource, "report-replay")

val blockLockMap = TrieMap[BlockHash, (MetricsSemaphore[F], Boolean)]()

private def replayGetReport(
override def trace(
block: BlockMessage
): F[Either[ReportError, List[(ProcessedDeploy, Seq[Seq[ReportingEvent]])]]] =
): F[Either[ReportError, ReplayResult]] =
for {
reportingRspace <- ReportingRuntime.setupReportingRSpace(rspaceStore)
reportingRuntime <- ReportingRuntime.createReportingRuntime(reportingRspace)
dag <- BlockDagStorage[F].getRepresentation
// TODO approvedBlock is not equal to genesisBlock
genesis <- BlockStore[F].getApprovedBlock
isGenesis = genesis.exists(a => block.blockHash == a.candidate.block.blockHash)
invalidBlocksSet <- dag.invalidBlocks
invalidBlocks = invalidBlocksSet
.map(block => (block.blockHash, block.sender))
.toMap
preStateHash = ProtoUtil.preStateHash(block)
_ <- reportingRuntime.setBlockData(BlockData.fromBlock(block))
blockdata = BlockData.fromBlock(block)
_ <- reportingRuntime.setBlockData(blockdata)
_ <- reportingRuntime.setInvalidBlocks(invalidBlocks)
res <- replayDeploys(reportingRuntime, preStateHash, block.body.deploys, !isGenesis)
res <- replayDeploys(
reportingRuntime,
preStateHash,
block.body.deploys,
block.body.systemDeploys,
!isGenesis,
blockdata
)
result <- res match {
case Left(replayError) =>
Log[F].info(
s"Relay ${PrettyPrinter.buildStringNoLimit(block.blockHash)} error ${replayError} from reporting"
) >> Concurrent[F].delay(
ReportReplayError(replayError)
.asLeft[List[(ProcessedDeploy, Seq[Seq[ReportingEvent]])]]
.asLeft[ReplayResult]
)
case Right(r) =>
for {
_ <- Log[F].info(
s"Cache ${PrettyPrinter.buildStringNoLimit(block.blockHash)}reporting data into mem."
s"Cache ${PrettyPrinter.buildStringNoLimit(block.blockHash)} reporting data into mem."
)
_ <- r.traverse(data => memStore.put(data._1.deploy.sig, data._2))
} yield r.asRight[ReportError]
}
} yield result

private def traceBlock(
hash: BlockHash
): F[Either[ReportError, List[(ProcessedDeploy, Seq[Seq[ReportingEvent]])]]] =
for {
maybeBlock <- BlockStore[F].get(hash)
_ <- Log[F].info(s"trace block ${maybeBlock}")
result <- maybeBlock match {
case None =>
Concurrent[F].delay(
ReportBlockNotFound(hash)
.asLeft[List[(ProcessedDeploy, Seq[Seq[ReportingEvent]])]]
)
case Some(block) =>
for {
cached <- block.body.deploys.traverse(
pd =>
for {
data <- memStore.get(pd.deploy.sig)
re = data.map((pd, _))
} yield re
)
maybeCached = cached.sequence
outcome <- maybeCached match {
case None =>
for {
_ <- Log[F].info(
s"No ${PrettyPrinter.buildStringNoLimit(block.blockHash)} reporting data in cached, going to replay"
)
result <- replayGetReport(block)
} yield result
case Some(cached) =>
for {
_ <- Log[F].info(
s"Find ${PrettyPrinter.buildStringNoLimit(block.blockHash)} reporting data in cached"
)
} yield cached.asRight[ReportError]
}
} yield outcome
}
} yield result

override def trace(
hash: BlockHash
): F[Either[ReportError, List[(ProcessedDeploy, Seq[Seq[ReportingEvent]])]]] =
for {
semaphore <- MetricsSemaphore.single
lockWithDone = blockLockMap.getOrElseUpdate(hash, (semaphore, false))
result <- if (lockWithDone._2) {
traceBlock(hash)
} else {
lockWithDone._1.withPermit[Either[ReportError, List[
(ProcessedDeploy, Seq[Seq[ReportingEvent]])
]]](for {
re <- traceBlock(hash)
_ = blockLockMap.update(hash, (lockWithDone._1, true))
} yield re)
}
} yield result

private def replayDeploys(
runtime: ReportingRuntime[F],
startHash: StateHash,
terms: Seq[ProcessedDeploy],
withCostAccounting: Boolean
): F[Either[ReplayFailure, List[(ProcessedDeploy, Seq[Seq[ReportingEvent]])]]] =
systemDeploys: Seq[ProcessedSystemDeploy],
withCostAccounting: Boolean,
blockData: BlockData
): F[Either[ReplayFailure, ReplayResult]] =
(for {
_ <- EitherT.right(runtime.reset(Blake2b256Hash.fromByteString(startHash)))
res <- EitherT.right(terms.toList.traverse { term =>
Expand All @@ -180,10 +143,27 @@ object ReportingCasper {
case Left(_) => Seq.empty[Seq[ReportingEvent]]
case Right(s) => s
}
} yield (term, r)
} yield DeployReportResult(term, r)
})
_ <- EitherT.right[ReplayFailure](runtime.createCheckpoint)
} yield res).value
sysRes <- EitherT.right(
systemDeploys.toList.traverse { term =>
for {
rd <- runtime
.replaySystemDeployE(blockData)(term)
.map(_.semiflatMap(_ => runtime.getReport))
res <- rd.value
r = res match {
case Left(_) => Seq.empty[Seq[ReportingEvent]]
case Right(s) => s
}
} yield SystemDeployReportResult(term.systemDeploy, r)
}
)
checkPoint <- EitherT.right[ReplayFailure](runtime.createCheckpoint)
result <- EitherT.right[ReplayFailure](
ReplayResult(res, sysRes, checkPoint.root.toByteString).pure[F]
)
} yield result).value
}

}
Expand Down

0 comments on commit 21d9186

Please sign in to comment.