Skip to content

Commit

Permalink
feat: mediator added more logs and associated message Hash and struct…
Browse files Browse the repository at this point in the history
…ured logging (#16)

* Added more logging and formatted the logging for console, with msgHasH

* Added console json formatter

* code cleanup

* structured login with logstash

* updated the package in buildinfo

* downgraded the version for logback

* updated the version for logback

* address pr comments

Signed-off-by: Shailesh <Patil>
  • Loading branch information
mineme0110 authored and Shailesh committed Apr 30, 2024
1 parent 95ddff2 commit ff13097
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 42 deletions.
21 changes: 17 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ lazy val V = new {
// val zioMunitTest = "0.1.1"
val zioHttp = "0.0.5"
val zioConfig = "4.0.0-RC16"
val zioLogging = "2.1.13"
val zioSl4j = "2.1.13"
val logback = "1.3.8"
val logstash = "7.4"
val jansi = "2.4.0"
val mongo = "1.1.0-RC10"
val embedMongo = "4.7.0"
val munitZio = "0.1.1"
Expand Down Expand Up @@ -55,11 +59,18 @@ lazy val D = new {
// val zio = Def.setting("dev.zio" %%% "zio" % V.zio)
// val zioStreams = Def.setting("dev.zio" %%% "zio-streams" % V.zio)
// val zioJson = Def.setting("dev.zio" %%% "zio-json" % V.zioJson)

val zioHttp = Def.setting("dev.zio" %% "zio-http" % V.zioHttp)
val zioConfig = Def.setting("dev.zio" %% "zio-config" % V.zioConfig)
val zioConfigMagnolia = Def.setting("dev.zio" %% "zio-config-magnolia" % V.zioConfig) // For deriveConfig
val zioConfigTypesafe = Def.setting("dev.zio" %% "zio-config-typesafe" % V.zioConfig) // For HOCON
val zioLogging = Def.setting("dev.zio" %% "zio-logging" % V.zioLogging)
val zioLoggingSl4j = Def.setting("dev.zio" %% "zio-logging-slf4j" % V.zioSl4j)
val logback = Def.setting("ch.qos.logback" % "logback-classic" % V.logback)
val logstash = Def.setting("net.logstash.logback" % "logstash-logback-encoder" % V.logstash)

val jansi = Def.setting("org.fusesource.jansi" % "jansi" % V.jansi)

val mongo = Def.setting("org.reactivemongo" %% "reactivemongo" % V.mongo)
// // For munit https://scalameta.org/munit/docs/getting-started.html#scalajs-setup
val munit = Def.setting("org.scalameta" %%% "munit" % V.munit % Test)
Expand Down Expand Up @@ -136,7 +147,7 @@ lazy val scalaJSBundlerConfigure: Project => Project =

lazy val buildInfoConfigure: Project => Project = _.enablePlugins(BuildInfoPlugin)
.settings(
buildInfoPackage := "fmgp",
buildInfoPackage := "io.iohk.atala.mediator",
// buildInfoObject := "BuildInfo",
buildInfoKeys := Seq[BuildInfoKey](
name,
Expand All @@ -147,8 +158,6 @@ lazy val buildInfoConfigure: Project => Project = _.enablePlugins(BuildInfoPlugi
),
)

// lazy val core = RootProject(file(".."))

lazy val httpUtils = crossProject(JSPlatform, JVMPlatform) // project
.in(file("http-utils"))
.settings(publish / skip := true)
Expand All @@ -172,7 +181,11 @@ lazy val mediator = project
D.zioConfig.value,
D.zioConfigMagnolia.value,
D.zioConfigTypesafe.value,
D.zioLoggingSl4j.value
D.zioLogging.value,
D.zioLoggingSl4j.value,
D.logback.value,
D.jansi.value,
D.logstash.value,
),
libraryDependencies += D.mongo.value,
libraryDependencies ++= Seq(
Expand Down
22 changes: 22 additions & 0 deletions mediator/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<withJansi>true</withJansi>
<encoder>
<!-- TODO include mediator release version in the logs />-->
<pattern>%d{yyyy-MM-dd_HH:mm:ss.SSS} [%highlight(%-5level)] %cyan(%logger{5}@[%-4.30thread]) msgHash[%X{msgHash}] - %msg%xException%n</pattern>
</encoder>
</appender>
<appender name="logstash" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="net.logstash.logback.encoder.LogstashEncoder"/>
</appender>
<turboFilter class="ch.qos.logback.classic.turbo.MarkerFilter">
<Name>CONFIDENTIAL_FILTER</Name>
<Marker>CONFIDENTIAL</Marker>
<OnMatch>DENY</OnMatch>
</turboFilter>
<root level="info">
<appender-ref ref="STDOUT" />
<!-- <appender-ref ref="logstash" />-->
</root>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -114,36 +114,39 @@ case class MediatorAgent(
MediatorError,
Option[EncryptedMessage]
] =
ZIO
.logAnnotate("msgHash", msg.hashCode.toString) {
for {
_ <- ZIO.log(s"receiveMessage with hashCode: ${msg.hashCode}")
maybeSyncReplyMsg <-
if (!msg.recipientsSubject.contains(id))
ZIO.logError(s"This mediator '${id.string}' is not a recipient")
*> ZIO.none
else
for {
messageItemRepo <- ZIO.service[MessageItemRepo]
_ <- messageItemRepo.insert(MessageItem(msg)) // store all message
plaintextMessage <- decrypt(msg)
_ <- didSocketManager.get.flatMap { m => // TODO HACK REMOVE !!!!!!!!!!!!!!!!!!!!!!!!
ZIO.foreach(m.tapSockets)(_.socketOutHub.publish(TapMessage(msg, plaintextMessage).toJson))
}
_ <- mSocketID match
case None => ZIO.unit
case Some(socketID) =>
plaintextMessage.from match
case None => ZIO.unit
case Some(from) => didSocketManager.update { _.link(from.asFROMTO, socketID) }
// TODO Store context of the decrypt unwarping
// TODO SreceiveMessagetore context with MsgID and PIURI
protocolHandler <- ZIO.service[ProtocolExecuter[Services]]
ret <- protocolHandler
.execute(plaintextMessage)
.tapError(ex => ZIO.logError(s"Error when execute Protocol: $ex"))
} yield ret
} yield maybeSyncReplyMsg
ZIO
.logAnnotate("msgHash", msg.hashCode.toString) {
for {
_ <- ZIO.log("receivedMessage")
maybeSyncReplyMsg <-
if (!msg.recipientsSubject.contains(id))
ZIO.logError(s"This mediator '${id.string}' is not a recipient")
*> ZIO.none
else
for {
messageItemRepo <- ZIO.service[MessageItemRepo]
_ <- messageItemRepo.insert(MessageItem(msg)) // store all message
plaintextMessage <- decrypt(msg)
_ <- didSocketManager.get.flatMap { m => // TODO HACK REMOVE !!!!!!!!!!!!!!!!!!!!!!!!
ZIO.foreach(m.tapSockets)(_.socketOutHub.publish(TapMessage(msg, plaintextMessage).toJson))
}
_ <- mSocketID match
case None => ZIO.unit
case Some(socketID) =>
plaintextMessage.from match
case None => ZIO.unit
case Some(from) =>
didSocketManager.update {
_.link(from.asFROMTO, socketID)
}
// TODO Store context of the decrypt unwarping
// TODO SreceiveMessagetore context with MsgID and PIURI
protocolHandler <- ZIO.service[ProtocolExecuter[Services]]
ret <- protocolHandler
.execute(plaintextMessage)
.tapError(ex => ZIO.logError(s"Error when execute Protocol: $ex"))
} yield ret
} yield maybeSyncReplyMsg
}
.provideSomeLayer( /*resolverLayer ++ indentityLayer ++*/ protocolHandlerLayer)

Expand Down Expand Up @@ -249,11 +252,13 @@ object MediatorAgent {

// TODO [return_route extension](https://github.com/decentralized-identity/didcomm-messaging/blob/main/extensions/return_route/main.md)
case req @ Method.POST -> !! =>
ZIO.succeed(
Response
.text(s"The content-type must be ${MediaTypes.SIGNED.typ} or ${MediaTypes.ENCRYPTED.typ}")
.copy(status = Status.BadRequest)
)
ZIO
.logError(s"Request Headers : ${req.headers.mkString(",")}")
.as(
Response
.text(s"The content-type must be ${MediaTypes.SIGNED.typ} or ${MediaTypes.ENCRYPTED.typ}")
.setStatus(Status.BadRequest)
)
}: Http[
Operations & Resolver & MessageDispatcher & MediatorAgent & MessageItemRepo & UserAccountRepo,
Throwable,
Expand All @@ -267,13 +272,14 @@ object MediatorAgent {
allowedMethods = Some(Set(Method.GET, Method.POST, Method.OPTIONS)),
)
)
@@ HttpAppMiddleware.updateHeaders(headers =>
@@
HttpAppMiddleware.updateHeaders(headers =>
Headers(
headers.map(h =>
if (h.key == HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN) {
Header(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN, "*")
} else h
)
)
)
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@ import zio.http.ZClient.ClientLive
import zio.http.model.*
import zio.http.socket.*
import zio.json.*
import zio.logging.LogFormat.*
import zio.logging.backend.SLF4J
import zio.logging.*
import zio.stream.*

import java.time.format.DateTimeFormatter
import scala.io.Source
case class MediatorConfig(endpoint: java.net.URI, keyAgreement: OKPPrivateKey, keyAuthentication: OKPPrivateKey) {
val did = DIDPeer2.makeAgent(
Expand All @@ -45,6 +49,14 @@ case class DataBaseConfig(
}

object MediatorStandalone extends ZIOAppDefault {
val mediatorColorFormat: LogFormat =
fiberId.color(LogColor.YELLOW) |-|
line.highlight |-|
allAnnotations |-|
cause.highlight

override val bootstrap: ZLayer[ZIOAppArgs, Any, Any] =
Runtime.removeDefaultLoggers >>> SLF4J.slf4j(mediatorColorFormat)

val app: HttpApp[ // type HttpApp[-R, +Err] = Http[R, Err, Request, Response]
Hub[String] & Operations & MessageDispatcher & MediatorAgent & Resolver & MessageItemRepo & UserAccountRepo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,21 @@ class MessageItemRepo(reactiveMongoApi: ReactiveMongoApi)(using ec: ExecutionCon

def insert(value: MessageItem): IO[StorageError, WriteResult] = {
for {
_ <- ZIO.logInfo("insert")
coll <- collection
result <- ZIO
.fromFuture(implicit ec => coll.insert.one(value))
.tapError(err => ZIO.logError(s"insert : ${err.getMessage}"))
.mapError(ex => StorageThrowable(ex))
} yield result
}

def findById(id: HASH): IO[StorageError, Option[MessageItem]] = {
def selector: BSONDocument = BSONDocument("_id" -> id)
def projection: Option[BSONDocument] = None

for {
_ <- ZIO.logInfo("findById")
coll <- collection
result <- ZIO
.fromFuture(implicit ec =>
Expand All @@ -46,17 +50,21 @@ class MessageItemRepo(reactiveMongoApi: ReactiveMongoApi)(using ec: ExecutionCon
.cursor[MessageItem]()
.collect[Seq](1, Cursor.FailOnError[Seq[MessageItem]]())
)
.tapError(err => ZIO.logError(s"findById : ${err.getMessage}"))
.mapError(ex => StorageThrowable(ex))

} yield result.headOption

}

def findByIds(ids: Seq[HASH]): IO[StorageError, Seq[MessageItem]] = {
def selector: BSONDocument = {
println(s""" {"_id": {"$$in" -> $ids}} """)
BSONDocument("_id" -> BSONDocument("$in" -> ids))
}
def projection: Option[BSONDocument] = None

for {
_ <- ZIO.logInfo("findByIds")
coll <- collection
result <- ZIO
.fromFuture(implicit ec =>
Expand All @@ -65,8 +73,10 @@ class MessageItemRepo(reactiveMongoApi: ReactiveMongoApi)(using ec: ExecutionCon
.cursor[MessageItem]()
.collect[Seq](-1, Cursor.FailOnError[Seq[MessageItem]]())
)
.tapError(err => ZIO.logError(s"findByIds : ${err.getMessage}"))
.mapError(ex => StorageThrowable(ex))
} yield result

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class UserAccountRepo(reactiveMongoApi: ReactiveMongoApi)(using ec: ExecutionCon

def collection: IO[StorageCollection, BSONCollection] = reactiveMongoApi.database
.map(_.collection(collectionName))
.tapError(err => ZIO.logError(s"Couldn't get collection ${err.getMessage}"))
.mapError(ex => StorageCollection(ex))

def newDidAccount(did: DIDSubject): IO[StorageError, WriteResult] = {
Expand All @@ -35,17 +36,22 @@ class UserAccountRepo(reactiveMongoApi: ReactiveMongoApi)(using ec: ExecutionCon
messagesRef = Seq.empty
)
for {
_ <- ZIO.logInfo("newDidAccount")
coll <- collection
result <- ZIO
.fromFuture(implicit ec => coll.insert.one(value))
.tapError(err => ZIO.logError(s"Insert newDidAccount : ${err.getMessage}"))
.mapError(ex => StorageThrowable(ex))

} yield result
}

def getDidAccount(did: DIDSubject): IO[StorageError, Option[DidAccount]] = {
def selector: BSONDocument = BSONDocument("did" -> did)
def projection: Option[BSONDocument] = None

for {
_ <- ZIO.logInfo("getDidAccount")
coll <- collection
result <- ZIO
.fromFuture(implicit ec =>
Expand All @@ -54,24 +60,30 @@ class UserAccountRepo(reactiveMongoApi: ReactiveMongoApi)(using ec: ExecutionCon
.cursor[DidAccount]()
.collect[Seq](1, Cursor.FailOnError[Seq[DidAccount]]()) // Just one
)
.tapError(err => ZIO.logError(s"getDidAccount : ${err.getMessage}"))
.mapError(ex => StorageThrowable(ex))
} yield result.headOption

}

def addAlias(owner: DIDSubject, newAlias: DIDSubject): ZIO[Any, StorageError, Either[String, Unit]] = {
def selector: BSONDocument = BSONDocument("did" -> owner)

def update: BSONDocument = BSONDocument(
"$push" -> BSONDocument(
"alias" -> newAlias
)
)

for {
_ <- ZIO.logInfo("addAlias")
coll <- collection
result <- ZIO
.fromFuture(implicit ec =>
coll.update
.one(selector, update) // Just one
)
.tapError(err => ZIO.logError(s"addAlias : ${err.getMessage}"))
.mapError(ex => StorageThrowable(ex))
} yield Right(())

Expand All @@ -84,15 +96,19 @@ class UserAccountRepo(reactiveMongoApi: ReactiveMongoApi)(using ec: ExecutionCon
"alias" -> newAlias
)
)

for {
_ <- ZIO.logInfo("removeAlias")
coll <- collection
result <- ZIO
.fromFuture(implicit ec =>
coll.update
.one(selector, update) // Just one
)
.tapError(err => ZIO.logError(s"removeAlias : ${err.getMessage}"))
.mapError(ex => StorageThrowable(ex))
} yield Right(())

}

/** @return
Expand Down Expand Up @@ -125,24 +141,29 @@ class UserAccountRepo(reactiveMongoApi: ReactiveMongoApi)(using ec: ExecutionCon
)

for {
_ <- ZIO.logInfo("addToInboxes")
coll <- collection
result <- ZIO
.fromFuture(implicit ec =>
coll.update
.one(selector, update) // Just one
)
.tapError(err => ZIO.logError(s"addToInboxes : ${err.getMessage}"))
.mapError(ex => StorageThrowable(ex))
} yield result.nModified

}

def markAsDelivered(didAccount: DIDSubject, hashes: Seq[HASH]): ZIO[Any, StorageError, Int] = {
def selector = BSONDocument("did" -> didAccount.did, "messagesRef.hash" -> BSONDocument("$in" -> hashes))
def update: BSONDocument = BSONDocument("$set" -> BSONDocument("messagesRef.$.state" -> true))

for {
_ <- ZIO.logInfo("markAsDelivered")
coll <- collection
result <- ZIO
.fromFuture(implicit ec => coll.update.one(selector, update)) // Just one
.tapError(err => ZIO.logError(s"markAsDelivered : ${err.getMessage}"))
.mapError(ex => StorageThrowable(ex))
} yield result.nModified
}
Expand Down

0 comments on commit ff13097

Please sign in to comment.