Skip to content

Commit

Permalink
Use logback for all logging
Browse files Browse the repository at this point in the history
  • Loading branch information
Ian Streeter authored and chuwy committed Oct 5, 2020
1 parent 1df2913 commit 2b465ce
Show file tree
Hide file tree
Showing 20 changed files with 197 additions and 90 deletions.
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ lazy val common = project
.settings(
resolvers += Dependencies.SnowplowBintray,
libraryDependencies ++= Seq(
Dependencies.logger,
Dependencies.postgres,
Dependencies.catsEffect,
Dependencies.circe,
Expand All @@ -37,6 +36,8 @@ lazy val common = project
Dependencies.doobiePg,
Dependencies.doobiePgCirce,
Dependencies.doobieHikari,
Dependencies.log4s,
Dependencies.logback,
Dependencies.analyticsSdk,
Dependencies.badRows,
Dependencies.schemaDdl,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import cats.implicits._
import cats.effect.{Bracket, Clock, Sync}

import doobie.implicits._
import doobie.util.log.LogHandler
import doobie.util.transactor.Transactor

import com.snowplowanalytics.iglu.core.SchemaKey
Expand Down Expand Up @@ -86,18 +85,18 @@ object DB {
}
}

def interpreter[F[_]: Sync: Clock](resolver: Resolver[F], xa: Transactor[F], logger: LogHandler, schemaName: String): DB[F] =
def interpreter[F[_]: Sync: Clock](resolver: Resolver[F], xa: Transactor[F], schemaName: String): DB[F] =
new DB[F] {
def insert(event: List[Entity]): F[Unit] =
event.traverse_(sink.insertStatement(logger, schemaName, _)).transact(xa)
event.traverse_(sink.insertStatement(schemaName, _)).transact(xa)

def alter(schemaKey: SchemaKey): F[Unit] = {
val result = ddl.alterTable[F](resolver, logger, schemaName, schemaKey)
val result = ddl.alterTable[F](resolver, schemaName, schemaKey)
rethrow(result.semiflatMap(_.transact(xa)))
}

def create(schemaKey: SchemaKey, includeMeta: Boolean): F[Unit] = {
val result = ddl.createTable[F](resolver, logger, schemaName, schemaKey, includeMeta)
val result = ddl.createTable[F](resolver, schemaName, schemaKey, includeMeta)
rethrow(result.semiflatMap(_.transact(xa)))
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.postgres.logging

import doobie.util.log._
import org.log4s.Logger

/** Log doobie events using slf4j framework
*
* This is largely based on the jdk-based log handler supplied by doobie: https://github.com/tpolecat/doobie/blob/f04a7a3cab5aecb50be0d1ad10fbdae6b8db5ec2/modules/core/src/main/scala/doobie/util/log.scala#L57
* It uses slf4j as the logging abstraction, so the end user can control the log output using their preferred log framework
*/
object Slf4jLogHandler {
def apply(logger: Logger): LogHandler =
LogHandler {
case Success(s, a, e1, e2) =>
logger.debug(s"""Successful Statement Execution:
|
| ${s.linesIterator.dropWhile(_.trim.isEmpty).mkString("\n ")}
|
| arguments = [${a.mkString(", ")}]
| elapsed = ${e1.toMillis.toString} ms exec + ${e2.toMillis.toString} ms processing (${(e1 + e2).toMillis.toString} ms total)
""".stripMargin)

case ProcessingFailure(s, a, e1, e2, t) =>
logger.debug(s"""Failed Resultset Processing:
|
| ${s.linesIterator.dropWhile(_.trim.isEmpty).mkString("\n ")}
|
| arguments = [${a.mkString(", ")}]
| elapsed = ${e1.toMillis.toString} ms exec + ${e2.toMillis.toString} ms processing (failed) (${(e1 + e2)
.toMillis
.toString} ms total)
| failure = ${t.getMessage}
""".stripMargin)
logger.error(s"Failed Resultset Processing: ${t.getMessage}")

case ExecFailure(s, a, e1, t) =>
logger.debug(s"""Failed Statement Execution:
|
| ${s.linesIterator.dropWhile(_.trim.isEmpty).mkString("\n ")}
|
| arguments = [${a.mkString(", ")}]
| elapsed = ${e1.toMillis.toString} ms exec (failed)
| failure = ${t.getMessage}
""".stripMargin)
logger.error(s"Failed StatementExecution: ${t.getMessage}")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ import com.zaxxer.hikari.HikariConfig
import doobie.hikari._
import doobie.implicits._
import doobie.util.ExecutionContexts
import doobie.util.log.LogHandler
import doobie.util.transactor.Transactor

import io.circe.Json
import org.log4s.getLogger

import com.snowplowanalytics.iglu.client.Client

Expand All @@ -33,23 +32,21 @@ import com.snowplowanalytics.snowplow.postgres.config.DBConfig.JdbcUri

object resources {

private lazy val logger = getLogger

/** Initialise Blocking Thread Pool, Connection Pool, DB state and bad queue resources */
def initialize[F[_]: Concurrent: Clock: ContextShift](postgres: DBConfig, logger: LogHandler, iglu: Client[F, Json]) =
def initialize[F[_]: Concurrent: Clock: ContextShift](postgres: DBConfig, iglu: Client[F, Json]) =
for {
blocker <- Blocker[F]
xa <- resources.getTransactor[F](DBConfig.hikariConfig(postgres), blocker)
state <- Resource.liftF(initializeState(postgres.schema, logger, iglu, xa))
state <- Resource.liftF(initializeState(postgres.schema, iglu, xa))
} yield (blocker, xa, state)

def initializeState[F[_]: Concurrent: Clock](schema: String,
logger: LogHandler,
iglu: Client[F, Json],
xa: HikariTransactor[F]
): F[State[F]] =
def initializeState[F[_]: Concurrent: Clock](schema: String, iglu: Client[F, Json], xa: HikariTransactor[F]): F[State[F]] =
for {
ci <- storage.query.getComments(schema, logger).transact(xa).map(_.separate)
ci <- storage.query.getComments(schema).transact(xa).map(_.separate)
(issues, comments) = ci
_ <- issues.traverse_(issue => Sync[F].delay(println(issue)))
_ <- issues.traverse_(issue => Sync[F].delay(logger.warn(issue.show)))
initState = State.init[F](comments, iglu.resolver).value.flatMap {
case Left(error) =>
val exception = new RuntimeException(s"Couldn't initalise the state $error")
Expand All @@ -69,6 +66,7 @@ object resources {
val maxPoolSize = if (config.getMaximumPoolSize > 0) config.getMaximumPoolSize else 10
Math.min(maxPoolSize, Runtime.getRuntime.availableProcessors)
}
logger.debug(s"Using thread pool of size $threadPoolSize for Hikari transactor")

for {
ce <- ExecutionContexts.fixedThreadPool[F](threadPoolSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import cats.implicits._

import cats.effect.{Clock, Sync}

import doobie.{ConnectionIO, LogHandler}
import doobie.ConnectionIO
import doobie.implicits._
import doobie.util.fragment.Fragment

Expand All @@ -40,28 +40,23 @@ object ddl {
type Generator = DdlSchemaList => Fragment

def createTable[F[_]: Sync: Clock](resolver: Resolver[F],
logger: LogHandler,
schema: String,
entity: SchemaKey,
meta: Boolean
): EitherT[F, IgluErrors, Insert] = {
val generator: Generator = schemaList => sql.createTable(schema, entity, schemaList, meta)
manage(resolver, logger, schema, entity, generator)
manage(resolver, schema, entity, generator)
}

// TODO: tables need to be updated in transaction, because situation where one node tries to mutate it after its state
// been update are completely predictable
def alterTable[F[_]: Sync: Clock](resolver: Resolver[F],
logger: LogHandler,
schema: String,
entity: SchemaKey
): EitherT[F, IgluErrors, Insert] = {
def alterTable[F[_]: Sync: Clock](resolver: Resolver[F], schema: String, entity: SchemaKey): EitherT[F, IgluErrors, Insert] = {
val generator: Generator = schemaList => sql.migrateTable(schema, entity, schemaList)
manage(resolver, logger, schema, entity, generator)
manage(resolver, schema, entity, generator)
}

def createEventsTable(schema: String, logger: LogHandler): ConnectionIO[Unit] =
definitions.atomicSql(schema).update(logger).run.void
def createEventsTable(schema: String): ConnectionIO[Unit] =
definitions.atomicSql(schema).update().run.void

/**
* Perform some DB management: create or mutate the table according to current
Expand All @@ -74,15 +69,13 @@ object ddl {
* Note that it doesn't actually perform a DB action (no `Transactor`)
*
* @param resolver Iglu Resolver tied to Iglu Server (it needs schema list endpoint)
* @param logger doobie logger
* @param schema database schema
* @param entity an actual shredded entity that we manage tables for
* @param generator a function generating SQL from `DdlSchemaList`
* @return an action that is either failure because of Iglu subsystem
* or doobie IO
*/
def manage[F[_]: Sync: Clock](resolver: Resolver[F],
logger: LogHandler,
schema: String,
origin: SchemaKey,
generator: Generator
Expand All @@ -98,8 +91,8 @@ object ddl {
DdlSchemaList.fromSchemaList(list, fetch[F](resolver)).leftMap(IgluErrors.of).map { list =>
val statement = generator(list)
val tableName = getTableName(origin)
statement.update(logger).run.void *>
sql.commentTable(logger, schema, tableName, list.latest)
statement.update().run.void *>
sql.commentTable(schema, tableName, list.latest)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,25 @@ import cats.instances.list._

import doobie.ConnectionIO
import doobie.implicits._
import doobie.util.log.LogHandler
import org.log4s.getLogger

import com.snowplowanalytics.iglu.core.SchemaKey
import com.snowplowanalytics.snowplow.postgres.logging.Slf4jLogHandler

/** Functions to query the storage for state and metadata */
object query {

def tableExists(schema: String, name: String, logger: LogHandler): ConnectionIO[Boolean] =
private lazy val logger = Slf4jLogHandler(getLogger)

def tableExists(schema: String, name: String): ConnectionIO[Boolean] =
fr"SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = $name AND table_schema = $schema);"
.queryWithLogHandler[Boolean](logger)
.unique

def listTables(schema: String): ConnectionIO[List[String]] =
fr"SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname = $schema".query[String].to[List]

def getComment(schema: String, logger: LogHandler)(tableName: String): ConnectionIO[Either[CommentIssue, SchemaKey]] =
def getComment(schema: String)(tableName: String): ConnectionIO[Either[CommentIssue, SchemaKey]] =
(fr"""SELECT obj_description(oid) FROM pg_class WHERE relkind = 'r' AND relnamespace = (
SELECT oid
FROM pg_catalog.pg_namespace
Expand All @@ -51,6 +54,6 @@ object query {
CommentIssue.Missing(tableName).asLeft
}

def getComments(schema: String, logger: LogHandler): ConnectionIO[List[Either[CommentIssue, SchemaKey]]] =
listTables(schema).flatMap(_.traverse(getComment(schema, logger)))
def getComments(schema: String): ConnectionIO[List[Either[CommentIssue, SchemaKey]]] =
listTables(schema).flatMap(_.traverse(getComment(schema)))
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import cats.syntax.functor._
import doobie.Fragment
import doobie.free.connection.ConnectionIO
import doobie.implicits._
import doobie.util.log.LogHandler
import org.log4s.getLogger

import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaMap}

Expand All @@ -28,9 +28,12 @@ import com.snowplowanalytics.iglu.schemaddl.migrations.{FlatSchema, Migration, S

import com.snowplowanalytics.snowplow.postgres.shredding.transform.Atomic
import com.snowplowanalytics.snowplow.postgres.shredding.{Type, schema, transform}
import com.snowplowanalytics.snowplow.postgres.logging.Slf4jLogHandler

object sql {

private lazy val logger = Slf4jLogHandler(getLogger)

val DefaultVarcharSize = 4096

/**
Expand Down Expand Up @@ -61,7 +64,7 @@ object sql {
Fragment.const(s"CREATE TABLE $table (\n${columns.mkString(",\n")}\n)")
}

def commentTable(logger: LogHandler, schema: String, tableName: String, schemaKey: SchemaMap): ConnectionIO[Unit] = {
def commentTable(schema: String, tableName: String, schemaKey: SchemaMap): ConnectionIO[Unit] = {
val uri = schemaKey.schemaKey.toSchemaUri
val table = s"$schema.$tableName"
Fragment.const(s"COMMENT ON TABLE $table IS '$uri'").update(logger).run.void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,27 @@ import cats.Monad
import cats.implicits._

import cats.effect.Sync
import org.log4s.getLogger

import doobie.ConnectionIO
import doobie.implicits._
import doobie.util.transactor.Transactor
import doobie.util.log.LogHandler

import query.tableExists

object utils {

def prepareEventsTable(schema: String, logger: LogHandler): ConnectionIO[Boolean] = {
val create = ddl.createEventsTable(schema, logger).as(false)
private lazy val logger = getLogger

def prepareEventsTable(schema: String): ConnectionIO[Boolean] = {
val create = ddl.createEventsTable(schema).as(false)
val exists = Monad[ConnectionIO].pure(true)
Monad[ConnectionIO].ifM(tableExists(schema, "events", logger))(exists, create)
Monad[ConnectionIO].ifM(tableExists(schema, "events"))(exists, create)
}

def prepare[F[_]: Sync](schema: String, xa: Transactor[F], logger: LogHandler): F[Unit] =
prepareEventsTable(schema, logger).transact(xa).flatMap {
case true => Sync[F].delay(println(s"$schema.events table already exists"))
case false => Sync[F].delay(println(s"$schema.events table created"))
def prepare[F[_]: Sync](schema: String, xa: Transactor[F]): F[Unit] =
prepareEventsTable(schema).transact(xa).flatMap {
case true => Sync[F].delay(logger.info(s"$schema.events table already exists"))
case false => Sync[F].delay(logger.info(s"$schema.events table created"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@ package com.snowplowanalytics.snowplow.postgres.streaming
import cats.data.EitherT
import cats.implicits._

import cats.effect.{Blocker, Clock, ContextShift, Sync}
import cats.effect.{Clock, ContextShift, Sync}

import fs2.Pipe

import doobie._
import doobie.implicits._

import io.circe.Json
import org.log4s.getLogger

import com.snowplowanalytics.iglu.core.circe.implicits._

Expand All @@ -32,9 +33,13 @@ import com.snowplowanalytics.snowplow.badrows.{BadRow, Payload, Processor}
import com.snowplowanalytics.snowplow.postgres.api.{DB, State}
import com.snowplowanalytics.snowplow.postgres.shredding.{Entity, transform}
import com.snowplowanalytics.snowplow.postgres.streaming.data.{BadData, Data}
import com.snowplowanalytics.snowplow.postgres.logging.Slf4jLogHandler

object sink {

private lazy val logger = getLogger
private lazy val logHandler = Slf4jLogHandler(logger)

type Insert = ConnectionIO[Unit]

/**
Expand All @@ -59,10 +64,10 @@ object sink {
}

/** Sink bad data coming directly into the `Pipe` */
def badSink[F[_]: Sync: ContextShift](blocker: Blocker): Pipe[F, BadData, Unit] =
def badSink[F[_]: Sync: ContextShift]: Pipe[F, BadData, Unit] =
_.evalMap {
case BadData.BadEnriched(row) => blocker.delay[F, Unit](println(row.compact))
case BadData.BadJson(payload, error) => blocker.delay[F, Unit](println(s"Cannot parse $payload. $error"))
case BadData.BadEnriched(row) => Sync[F].delay(logger.warn(row.compact))
case BadData.BadJson(payload, error) => Sync[F].delay(logger.warn(s"Cannot parse $payload. $error"))
}

/** Implementation for [[goodSink]] */
Expand Down Expand Up @@ -95,7 +100,7 @@ object sink {
* Build an `INSERT` action for a single entity
* Multiple inserts later can be combined into a transaction
*/
def insertStatement(logger: LogHandler, schema: String, row: Entity): Insert = {
def insertStatement(schema: String, row: Entity): Insert = {
val length = row.columns.length

val columns = Fragment.const0(row.columns.map(c => s"""\"${c.name}\"""").mkString(","))
Expand All @@ -106,7 +111,7 @@ object sink {
case (acc, (cur, _)) => acc ++ cur.value.fragment
}

fr"""INSERT INTO $table ($columns) VALUES ($values)""".update(logger).run.void
fr"""INSERT INTO $table ($columns) VALUES ($values)""".update(logHandler).run.void
}

}

0 comments on commit 2b465ce

Please sign in to comment.