Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change produceMany{,Async} to accept any batch with a Traverse #11

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.sc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ object Dependencies {

object libraries {
val catsCore = ivy"org.typelevel::cats-core:${version.catsCore}"
val alleyCatsCore = ivy"org.typelevel::alleycats-core:${version.catsCore}"
val effect = ivy"org.typelevel::cats-effect:${version.effect}"
val fs2 = ivy"co.fs2::fs2-core:${version.fs2}"

Expand Down Expand Up @@ -61,6 +62,7 @@ trait CommonModule extends SbtModule with PublishModule {
)

def httpDependencies = Agg(
Dependencies.libraries.alleyCatsCore,
Dependencies.libraries.http4sDsl,
Dependencies.libraries.http4sClient,
Dependencies.libraries.log4cats,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,15 @@ package com.permutive.pubsub.producer.grpc.internal
import java.util.UUID
import java.util.concurrent.Executor

import cats.Traverse
import cats.effect.Async
import cats.syntax.all._
import cats.instances.list._
import com.google.api.core.{ApiFutureCallback, ApiFutures}
import com.google.cloud.pubsub.v1.Publisher
import com.google.protobuf.ByteString
import com.google.pubsub.v1.PubsubMessage
import com.permutive.pubsub.producer.{Model, PubsubProducer}
import com.permutive.pubsub.producer.encoder.MessageEncoder
import fs2.Chunk

import scala.collection.JavaConverters._

Expand Down Expand Up @@ -52,9 +51,6 @@ private[pubsub] class DefaultPublisher[F[_], A: MessageEncoder](
}
}

override def produceMany(records: List[Model.Record[A]]): F[List[String]] =
records.traverse(r => produce(r.value, r.metadata, r.uniqueId))

override def produceMany(records: Chunk[Model.Record[A]]): F[List[String]] =
produceMany(records.toList)
override def produceMany[G[_]: Traverse](records: G[Model.Record[A]]): F[List[String]] =
records.traverse(r => produce(r.value, r.metadata, r.uniqueId)).map(_.toList)
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package com.permutive.pubsub.producer.http.internal

import cats.Traverse
import cats.effect.syntax.all._
import cats.effect.{Concurrent, Resource, Timer}
import cats.syntax.all._
import cats.effect.syntax.all._
import cats.instances.list._
import com.permutive.pubsub.producer.encoder.MessageEncoder
import com.permutive.pubsub.producer.http.BatchingHttpProducerConfig
import com.permutive.pubsub.producer.http.BatchingHttpPubsubProducer.Batch
import com.permutive.pubsub.producer.{AsyncPubsubProducer, Model, PubsubProducer}
import fs2.{Chunk, Stream}
import fs2.Chunk._
import fs2.concurrent.{Enqueue, Queue}
import fs2.{Chunk, Stream}

private[http] class BatchingHttpPublisher[F[_] : Concurrent : Timer, A: MessageEncoder] private(
queue: Enqueue[F, Model.AsyncRecord[F, A]],
Expand All @@ -24,7 +25,7 @@ private[http] class BatchingHttpPublisher[F[_] : Concurrent : Timer, A: MessageE
queue.enqueue1(Model.AsyncRecord(record, callback, metadata, uniqueId))
}

override def produceManyAsync(records: List[Model.AsyncRecord[F, A]]): F[Unit] =
override def produceManyAsync[G[_] : Traverse](records: G[Model.AsyncRecord[F, A]]): F[Unit] =
records.traverse(queue.enqueue1).void
}

Expand All @@ -48,11 +49,11 @@ private[http] object BatchingHttpPublisher {
): F[Unit] = {
val handler: Chunk[Model.AsyncRecord[F, A]] => F[Unit] =
if (config.retryTimes == 0) {
records => underlying.produceMany(records) >> records.traverse_(_.callback)
records => underlying.produceMany[Chunk](records) >> records.traverse_(_.callback)
} else {
records =>
Stream.retry(
underlying.produceMany(records),
underlying.produceMany[Chunk](records),
delay = config.retryInitialDelay,
nextDelay = config.retryNextDelay,
maxAttempts = config.retryTimes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,27 @@ package com.permutive.pubsub.producer.http.internal

import java.util.Base64

import alleycats.syntax.foldable._
import cats.effect._
import cats.effect.concurrent.Ref
import cats.instances.list._
import cats.syntax.all._
import com.github.plokhotnyuk.jsoniter_scala.core.{JsonValueCodec, _}
import cats.{Foldable, Traverse}
import com.github.plokhotnyuk.jsoniter_scala.core._
import com.github.plokhotnyuk.jsoniter_scala.macros._
import com.permutive.pubsub.producer.encoder.MessageEncoder
import com.permutive.pubsub.producer.http.PubsubHttpProducerConfig
import com.permutive.pubsub.http.oauth.{AccessToken, DefaultTokenProvider}
import com.permutive.pubsub.http.util.RefreshableRef
import com.permutive.pubsub.producer.encoder.MessageEncoder
import com.permutive.pubsub.producer.http.PubsubHttpProducerConfig
import com.permutive.pubsub.producer.{Model, PubsubProducer}
import fs2.Stream
import io.chrisdavenport.log4cats.Logger
import org.http4s.Method._
import org.http4s.Uri._
import org.http4s._
import org.http4s.client._
import org.http4s.client.dsl.Http4sClientDsl
import org.http4s.headers._
import fs2.{Chunk, Stream}

import scala.util.control.NoStackTrace

Expand All @@ -37,25 +39,17 @@ private[http] class DefaultHttpPublisher[F[_], A: MessageEncoder] private(
private[this] final val publishRoute = baseApiUrl.copy(path = baseApiUrl.path.concat(":publish"))

override final def produce(record: A, metadata: Map[String, String], uniqueId: String): F[String] = {
produceMany(List(Model.SimpleRecord(record, metadata, uniqueId))).map(_.head)
produceMany[List](List(Model.SimpleRecord(record, metadata, uniqueId))).map(_.head)
}

override final def produceMany(records: List[Model.Record[A]]): F[List[String]] = {
override final def produceMany[G[_]: Traverse](records: G[Model.Record[A]]): F[List[String]] = {
for {
msgs <- records.traverse(recordToMessage)
json <- F.delay(writeToArray(MessageBundle(msgs)))
resp <- sendHttpRequest(json)
} yield resp
}

override final def produceMany(records: Chunk[Model.Record[A]]): F[List[String]] = {
for {
msgs <- records.traverse(recordToMessage)
json <- F.delay(writeToArray(ChunkMessageBundle(msgs)))
resp <- sendHttpRequest(json)
} yield resp
}

private def sendHttpRequest(json: Array[Byte]): F[List[String]] = {
for {
token <- tokenRef.get
Expand Down Expand Up @@ -150,44 +144,33 @@ private[http] object DefaultHttpPublisher {
attributes: Map[String, String]
)

case class MessageBundle(
messages: Iterable[Message],
)

case class ChunkMessageBundle(
messages: Chunk[Message],
case class MessageBundle[G[_]](
messages: G[Message],
)

case class MessageIds(
messageIds: List[String],
)

final implicit val MessageCodec: JsonValueCodec[Message] =
JsonCodecMaker.make[Message](CodecMakerConfig())

final implicit val MessageBundleCodec: JsonValueCodec[MessageBundle] =
JsonCodecMaker.make[MessageBundle](CodecMakerConfig())

final implicit val MessageChunkCodec: JsonValueCodec[Chunk[Message]] =
new JsonValueCodec[Chunk[Message]] {
override def decodeValue(in: JsonReader, default: Chunk[Message]): Chunk[Message] = {
// in.arrayStartOrNullError()
// MessageCodec.decodeValue(in, MessageCodec.nullValue)
// in.arrayEndOrCommaError()
???
}
final implicit def foldableMessagesCodec[G[_]](implicit G: Foldable[G]): JsonValueCodec[G[Message]] =
new JsonValueCodec[G[Message]] {
override def decodeValue(in: JsonReader, default: G[Message]): G[Message] = ???

override def encodeValue(x: Chunk[Message], out: JsonWriter): Unit = {
override def encodeValue(x: G[Message], out: JsonWriter): Unit = {
out.writeArrayStart()
x.foreach(MessageCodec.encodeValue(_, out))
out.writeArrayEnd()
}

override def nullValue: Chunk[Message] = Chunk.empty
override def nullValue: G[Message] = ???
}

final implicit val ChunkMessageBundleCodec: JsonValueCodec[ChunkMessageBundle] =
JsonCodecMaker.make[ChunkMessageBundle](CodecMakerConfig())

final implicit val MessageCodec: JsonValueCodec[Message] =
JsonCodecMaker.make[Message](CodecMakerConfig())

final implicit def messageBundleCodec[G[_] : Foldable]: JsonValueCodec[MessageBundle[G]] =
JsonCodecMaker.make[MessageBundle[G]](CodecMakerConfig())

final implicit val MessageIdsCodec: JsonValueCodec[MessageIds] =
JsonCodecMaker.make[MessageIds](CodecMakerConfig())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ object ExampleBatching extends IOApp {
retryNextDelay = _ + 250.millis,
),
onPublishFailure = (batch, e) => {
Logger[IO].error(e)(s"Failed to publish ${batch.length} messages")
Logger[IO].error(e)(s"Failed to publish ${batch.size} messages")
},
_
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package com.permutive.pubsub.producer

import java.util.UUID

import cats.Traverse

trait AsyncPubsubProducer[F[_], A] {
def produceAsync(
record: A,
Expand All @@ -10,7 +12,7 @@ trait AsyncPubsubProducer[F[_], A] {
uniqueId: String = UUID.randomUUID().toString,
): F[Unit]

def produceManyAsync(
records: List[Model.AsyncRecord[F, A]],
def produceManyAsync[G[_] : Traverse](
records: G[Model.AsyncRecord[F, A]],
): F[Unit]
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.permutive.pubsub.producer

import java.util.UUID

import fs2.Chunk
import cats.Traverse

trait PubsubProducer[F[_], A] {
def produce(
Expand All @@ -11,7 +11,5 @@ trait PubsubProducer[F[_], A] {
uniqueId: String = UUID.randomUUID().toString,
): F[String]

def produceMany(records: List[Model.Record[A]]): F[List[String]]

def produceMany(records: Chunk[Model.Record[A]]): F[List[String]]
def produceMany[G[_] : Traverse](records: G[Model.Record[A]]): F[List[String]]
}