Skip to content
This repository has been archived by the owner on Jul 1, 2021. It is now read-only.

Commit

Permalink
Merge pull request #21 from wellcometrust/use-storage-type-classes
Browse files Browse the repository at this point in the history
Use the new storage lib; more type classes
  • Loading branch information
alexwlchan committed May 21, 2019
2 parents b20dd4a + 342953b commit 09ed8cf
Show file tree
Hide file tree
Showing 27 changed files with 695 additions and 590 deletions.
@@ -0,0 +1,27 @@
package uk.ac.wellcome.messaging

import grizzled.slf4j.Logging
import io.circe.Decoder
import uk.ac.wellcome.json.JsonUtil.fromJson
import uk.ac.wellcome.messaging.message.{
InlineNotification,
MessageNotification,
RemoteNotification
}
import uk.ac.wellcome.storage.ObjectStore

import scala.util.Try

trait BigMessageReader[T] extends Logging {
val objectStore: ObjectStore[T]

implicit val decoder: Decoder[T]

def read(notification: MessageNotification): Try[T] =
notification match {
case inlineNotification: InlineNotification =>
fromJson[T](inlineNotification.jsonString)
case remoteNotification: RemoteNotification =>
objectStore.get(remoteNotification.location)
}
}
@@ -0,0 +1,62 @@
package uk.ac.wellcome.messaging

import java.text.SimpleDateFormat
import java.util.Date

import grizzled.slf4j.Logging
import io.circe.Encoder
import uk.ac.wellcome.json.JsonUtil._
import uk.ac.wellcome.messaging.message.{
InlineNotification,
MessageNotification,
RemoteNotification
}
import uk.ac.wellcome.storage.{KeyPrefix, ObjectStore}

import scala.util.{Success, Try}

trait BigMessageSender[Destination, T] extends Logging {
val messageSender: MessageSender[Destination]
val objectStore: ObjectStore[T]

val namespace: String

implicit val encoder: Encoder[T]

val maxMessageSize: Int

private val dateFormat = new SimpleDateFormat("YYYY/MM/dd")

protected def getKeyPrefix: String = {
val currentTime = new Date()
s"${messageSender.destination}/${dateFormat.format(currentTime)}/${currentTime.getTime.toString}"
}

def sendT(t: T): Try[MessageNotification] =
for {
jsonString <- toJson(t)
inlineNotification = InlineNotification(jsonString)

encodedInlineNotification <- toJson(inlineNotification)

notification <- if (encodedInlineNotification
.getBytes("UTF-8")
.length > maxMessageSize) {
createRemoteNotification(t)
} else {
Success(inlineNotification)
}

_ <- messageSender.sendT[MessageNotification](notification)
} yield notification

private def createRemoteNotification(t: T): Try[RemoteNotification] =
for {
location <- objectStore.put(namespace)(
t,
keyPrefix = KeyPrefix(getKeyPrefix)
)
_ = info(s"Successfully stored message in location: $location")
notification = RemoteNotification(location = location)
} yield notification
}
@@ -0,0 +1,27 @@
package uk.ac.wellcome.messaging

import io.circe.Encoder
import uk.ac.wellcome.json.JsonUtil.toJson

import scala.util.Try

trait IndividualMessageSender[Destination] {
def send(body: String)(subject: String, destination: Destination): Try[Unit]

def sendT[T](t: T)(subject: String, destination: Destination)(
implicit encoder: Encoder[T]): Try[Unit] =
toJson(t).flatMap { send(_)(subject, destination) }
}

trait MessageSender[Destination] {
protected val underlying: IndividualMessageSender[Destination]

val subject: String
val destination: Destination

def send(body: String): Try[Unit] =
underlying.send(body)(subject, destination)

def sendT[T](t: T)(implicit encoder: Encoder[T]): Try[Unit] =
underlying.sendT[T](t)(subject, destination)
}
@@ -0,0 +1,28 @@
package uk.ac.wellcome.messaging.memory

import io.circe.Encoder
import uk.ac.wellcome.messaging.BigMessageSender
import uk.ac.wellcome.storage.{ObjectStore, SerialisationStrategy}
import uk.ac.wellcome.storage.memory.MemoryObjectStore

class MemoryBigMessageSender[T](
maxSize: Int = 100,
storeNamespace: String = "MemoryBigMessageSender",
destination: String = "MemoryBigMessageSender"
)(
implicit
val encoder: Encoder[T],
serialisationStrategy: SerialisationStrategy[T]
) extends BigMessageSender[String, T] {
override val messageSender: MemoryMessageSender = new MemoryMessageSender(
destination = destination,
subject = "Sent from MemoryBigMessageSender"
)

override val objectStore: ObjectStore[T] = new MemoryObjectStore[T]()
override val namespace: String = storeNamespace
override val maxMessageSize: Int = maxSize

def messages: List[messageSender.underlying.MemoryMessage] =
messageSender.messages
}
@@ -0,0 +1,30 @@
package uk.ac.wellcome.messaging.memory

import uk.ac.wellcome.messaging.{IndividualMessageSender, MessageSender}

import scala.util.Try

class MemoryIndividualMessageSender extends IndividualMessageSender[String] {
case class MemoryMessage(
body: String,
subject: String,
destination: String
)

var messages: List[MemoryMessage] = List.empty

override def send(body: String)(subject: String,
destination: String): Try[Unit] = Try {
messages = messages :+ MemoryMessage(body, subject, destination)
}
}

class MemoryMessageSender(
val destination: String,
val subject: String
) extends MessageSender[String] {
override val underlying: MemoryIndividualMessageSender =
new MemoryIndividualMessageSender()

def messages: List[underlying.MemoryMessage] = underlying.messages
}
Expand Up @@ -11,16 +11,25 @@ import uk.ac.wellcome.messaging.sqs.{SQSConfig, SQSStream}
import uk.ac.wellcome.monitoring.MetricsSender
import uk.ac.wellcome.storage.ObjectStore
import uk.ac.wellcome.json.JsonUtil._
import uk.ac.wellcome.messaging.BigMessageReader

import scala.concurrent.{ExecutionContext, Future}
import scala.util.Try

class MessageStream[T](sqsClient: AmazonSQSAsync,
sqsConfig: SQSConfig,
metricsSender: MetricsSender)(
implicit actorSystem: ActorSystem,
objectStore: ObjectStore[T],
implicit
actorSystem: ActorSystem,
decoderT: Decoder[T],
objectStoreT: ObjectStore[T],
ec: ExecutionContext) {

private val bigMessageReader = new BigMessageReader[T] {
override val objectStore: ObjectStore[T] = objectStoreT
override implicit val decoder: Decoder[T] = decoderT
}

private val sqsStream = new SQSStream[NotificationMessage](
sqsClient = sqsClient,
sqsConfig = sqsConfig,
Expand All @@ -29,44 +38,39 @@ class MessageStream[T](sqsClient: AmazonSQSAsync,

def runStream(
streamName: String,
modifySource: Source[(Message, T), NotUsed] => Source[Message, NotUsed])(
implicit decoder: Decoder[T]): Future[Done] =
modifySource: Source[(Message, T), NotUsed] => Source[Message, NotUsed])
: Future[Done] =
sqsStream.runStream(
streamName,
source => modifySource(messageFromS3Source(source)))

def foreach(streamName: String, process: T => Future[Unit])(
implicit decoder: Decoder[T]): Future[Done] =
def foreach(streamName: String, process: T => Future[Unit]): Future[Done] =
sqsStream.foreach(
streamName = streamName,
process = (notification: NotificationMessage) =>
for {
body <- getBody(notification.body)
body <- Future.fromTry {
getBody(notification.body)
}
result <- process(body)
} yield result
)

private def messageFromS3Source(
source: Source[(Message, NotificationMessage), NotUsed])(
implicit decoder: Decoder[T]) = {
source: Source[(Message, NotificationMessage), NotUsed])
: Source[(Message, T), NotUsed] = {
source.mapAsyncUnordered(sqsConfig.parallelism) {
case (message, notification) =>
for {
deserialisedObject <- getBody(notification.body)
deserialisedObject <- Future.fromTry {
getBody(notification.body)
}
} yield (message, deserialisedObject)
}
}

private def getBody(messageString: String)(
implicit decoder: Decoder[T]): Future[T] =
for {
notification <- Future.fromTry(
fromJson[MessageNotification](messageString))
body <- notification match {
case inlineNotification: InlineNotification =>
Future.fromTry(fromJson[T](inlineNotification.jsonString))
case remoteNotification: RemoteNotification =>
objectStore.get(remoteNotification.location)
}
} yield body
private def getBody(messageString: String): Try[T] =
fromJson[MessageNotification](messageString).flatMap {
bigMessageReader.read
}
}
@@ -1,22 +1,14 @@
package uk.ac.wellcome.messaging.message

import java.text.SimpleDateFormat
import java.util.Date

import com.amazonaws.services.sns.AmazonSNS
import grizzled.slf4j.Logging
import io.circe.Encoder
import uk.ac.wellcome.messaging.sns.{
PublishAttempt,
SNSConfig,
SNSMessageWriter,
SNSWriter
}
import uk.ac.wellcome.messaging.sns.{SNSConfig, SNSMessageSender}
import uk.ac.wellcome.messaging.{BigMessageSender, MessageSender}
import uk.ac.wellcome.storage.ObjectStore
import uk.ac.wellcome.storage.s3.S3Config
import uk.ac.wellcome.storage.{KeyPrefix, ObjectStore}
import uk.ac.wellcome.json.JsonUtil._

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.Future

case class MessageWriterConfig(
snsConfig: SNSConfig,
Expand All @@ -26,65 +18,35 @@ case class MessageWriterConfig(
class MessageWriter[T](
messageConfig: MessageWriterConfig,
snsClient: AmazonSNS
)(implicit objectStore: ObjectStore[T], ec: ExecutionContext)
)(implicit objectStoreT: ObjectStore[T], encoderT: Encoder[T])
extends Logging {

private val snsMessageWriter = new SNSMessageWriter(snsClient = snsClient)

private val sns = new SNSWriter(
snsMessageWriter = snsMessageWriter,
snsConfig = messageConfig.snsConfig
)

private val dateFormat = new SimpleDateFormat("YYYY/MM/dd")

private def getKeyPrefix(): String = {
val topicName = messageConfig.snsConfig.topicArn.split(":").last
val currentTime = new Date()
s"$topicName/${dateFormat.format(currentTime)}/${currentTime.getTime.toString}"
private val underlying = new BigMessageSender[SNSConfig, T] {
override val messageSender: MessageSender[SNSConfig] = new SNSMessageSender(
snsClient = snsClient,
snsConfig = messageConfig.snsConfig,
subject = "Sent from MessageWriter"
)

override val objectStore: ObjectStore[T] = objectStoreT
override val namespace: String = messageConfig.s3Config.bucketName

implicit val encoder: Encoder[T] = encoderT

// If the encoded message is less than 250KB, we can send it inline
// in SNS/SQS (although the limit is 256KB, there's a bit of overhead
// caused by the notification wrapper, so we're conservative).
//
// Max SQS message size:
// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-limits.html#limits-messages
//
// Max SNS message size:
// https://aws.amazon.com/sns/faqs/
//
override val maxMessageSize: Int = 250 * 100
}

def write(message: T, subject: String)(
implicit encoder: Encoder[T]): Future[PublishAttempt] =
for {
jsonString <- Future.fromTry(toJson(message))
encodedNotification <- Future.fromTry(
toJson[MessageNotification](InlineNotification(jsonString))
)

// If the encoded message is less than 250KB, we can send it inline
// in SNS/SQS (although the limit is 256KB, there's a bit of overhead
// caused by the notification wrapper, so we're conservative).
//
// Max SQS message size:
// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-limits.html#limits-messages
//
// Max SNS message size:
// https://aws.amazon.com/sns/faqs/
//
notification: String <- if (encodedNotification
.getBytes("UTF-8")
.length > 250 * 1000) {
createRemoteNotification(message)
} else {
Future.successful(encodedNotification)
}

publishAttempt <- sns.writeMessage(
message = notification,
subject = subject
)
_ = debug(publishAttempt)
} yield publishAttempt

private def createRemoteNotification(message: T): Future[String] =
for {
location <- objectStore.put(messageConfig.s3Config.bucketName)(
message,
keyPrefix = KeyPrefix(getKeyPrefix())
)
_ = info(s"Successfully stored message in location: $location")
notification = RemoteNotification(location = location)
jsonString <- Future.fromTry(toJson[MessageNotification](notification))
} yield jsonString
def write(t: T): Future[MessageNotification] = Future.fromTry {
underlying.sendT(t)
}
}

This file was deleted.

0 comments on commit 09ed8cf

Please sign in to comment.