Skip to content
This repository has been archived by the owner on Jul 1, 2021. It is now read-only.

Use the new storage lib; more type classes #21

Merged
merged 16 commits into from May 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,27 @@
package uk.ac.wellcome.messaging

import grizzled.slf4j.Logging
import io.circe.Decoder
import uk.ac.wellcome.json.JsonUtil.fromJson
import uk.ac.wellcome.messaging.message.{
InlineNotification,
MessageNotification,
RemoteNotification
}
import uk.ac.wellcome.storage.ObjectStore

import scala.util.Try

trait BigMessageReader[T] extends Logging {
val objectStore: ObjectStore[T]

implicit val decoder: Decoder[T]

def read(notification: MessageNotification): Try[T] =
notification match {
case inlineNotification: InlineNotification =>
fromJson[T](inlineNotification.jsonString)
case remoteNotification: RemoteNotification =>
objectStore.get(remoteNotification.location)
}
}
@@ -0,0 +1,62 @@
package uk.ac.wellcome.messaging

import java.text.SimpleDateFormat
import java.util.Date

import grizzled.slf4j.Logging
import io.circe.Encoder
import uk.ac.wellcome.json.JsonUtil._
import uk.ac.wellcome.messaging.message.{
InlineNotification,
MessageNotification,
RemoteNotification
}
import uk.ac.wellcome.storage.{KeyPrefix, ObjectStore}

import scala.util.{Success, Try}

trait BigMessageSender[Destination, T] extends Logging {
val messageSender: MessageSender[Destination]
val objectStore: ObjectStore[T]

val namespace: String

implicit val encoder: Encoder[T]

val maxMessageSize: Int

private val dateFormat = new SimpleDateFormat("YYYY/MM/dd")

protected def getKeyPrefix: String = {
val currentTime = new Date()
s"${messageSender.destination}/${dateFormat.format(currentTime)}/${currentTime.getTime.toString}"
}

def sendT(t: T): Try[MessageNotification] =
for {
jsonString <- toJson(t)
inlineNotification = InlineNotification(jsonString)

encodedInlineNotification <- toJson(inlineNotification)

notification <- if (encodedInlineNotification
.getBytes("UTF-8")
.length > maxMessageSize) {
createRemoteNotification(t)
} else {
Success(inlineNotification)
}

_ <- messageSender.sendT[MessageNotification](notification)
} yield notification

private def createRemoteNotification(t: T): Try[RemoteNotification] =
for {
location <- objectStore.put(namespace)(
t,
keyPrefix = KeyPrefix(getKeyPrefix)
)
_ = info(s"Successfully stored message in location: $location")
notification = RemoteNotification(location = location)
} yield notification
}
@@ -0,0 +1,27 @@
package uk.ac.wellcome.messaging

import io.circe.Encoder
import uk.ac.wellcome.json.JsonUtil.toJson

import scala.util.Try

trait IndividualMessageSender[Destination] {
def send(body: String)(subject: String, destination: Destination): Try[Unit]

def sendT[T](t: T)(subject: String, destination: Destination)(
implicit encoder: Encoder[T]): Try[Unit] =
toJson(t).flatMap { send(_)(subject, destination) }
}

trait MessageSender[Destination] {
protected val underlying: IndividualMessageSender[Destination]

val subject: String
val destination: Destination

def send(body: String): Try[Unit] =
underlying.send(body)(subject, destination)

def sendT[T](t: T)(implicit encoder: Encoder[T]): Try[Unit] =
underlying.sendT[T](t)(subject, destination)
}
@@ -0,0 +1,28 @@
package uk.ac.wellcome.messaging.memory

import io.circe.Encoder
import uk.ac.wellcome.messaging.BigMessageSender
import uk.ac.wellcome.storage.{ObjectStore, SerialisationStrategy}
import uk.ac.wellcome.storage.memory.MemoryObjectStore

class MemoryBigMessageSender[T](
maxSize: Int = 100,
storeNamespace: String = "MemoryBigMessageSender",
destination: String = "MemoryBigMessageSender"
)(
implicit
val encoder: Encoder[T],
serialisationStrategy: SerialisationStrategy[T]
) extends BigMessageSender[String, T] {
override val messageSender: MemoryMessageSender = new MemoryMessageSender(
destination = destination,
subject = "Sent from MemoryBigMessageSender"
)

override val objectStore: ObjectStore[T] = new MemoryObjectStore[T]()
override val namespace: String = storeNamespace
override val maxMessageSize: Int = maxSize

def messages: List[messageSender.underlying.MemoryMessage] =
messageSender.messages
}
@@ -0,0 +1,30 @@
package uk.ac.wellcome.messaging.memory

import uk.ac.wellcome.messaging.{IndividualMessageSender, MessageSender}

import scala.util.Try

class MemoryIndividualMessageSender extends IndividualMessageSender[String] {
case class MemoryMessage(
body: String,
subject: String,
destination: String
)

var messages: List[MemoryMessage] = List.empty

override def send(body: String)(subject: String,
destination: String): Try[Unit] = Try {
messages = messages :+ MemoryMessage(body, subject, destination)
}
}

class MemoryMessageSender(
val destination: String,
val subject: String
) extends MessageSender[String] {
override val underlying: MemoryIndividualMessageSender =
new MemoryIndividualMessageSender()

def messages: List[underlying.MemoryMessage] = underlying.messages
}
Expand Up @@ -11,16 +11,25 @@ import uk.ac.wellcome.messaging.sqs.{SQSConfig, SQSStream}
import uk.ac.wellcome.monitoring.MetricsSender
import uk.ac.wellcome.storage.ObjectStore
import uk.ac.wellcome.json.JsonUtil._
import uk.ac.wellcome.messaging.BigMessageReader

import scala.concurrent.{ExecutionContext, Future}
import scala.util.Try

class MessageStream[T](sqsClient: AmazonSQSAsync,
sqsConfig: SQSConfig,
metricsSender: MetricsSender)(
implicit actorSystem: ActorSystem,
objectStore: ObjectStore[T],
implicit
actorSystem: ActorSystem,
decoderT: Decoder[T],
objectStoreT: ObjectStore[T],
ec: ExecutionContext) {

private val bigMessageReader = new BigMessageReader[T] {
override val objectStore: ObjectStore[T] = objectStoreT
override implicit val decoder: Decoder[T] = decoderT
}

private val sqsStream = new SQSStream[NotificationMessage](
sqsClient = sqsClient,
sqsConfig = sqsConfig,
Expand All @@ -29,44 +38,39 @@ class MessageStream[T](sqsClient: AmazonSQSAsync,

def runStream(
streamName: String,
modifySource: Source[(Message, T), NotUsed] => Source[Message, NotUsed])(
implicit decoder: Decoder[T]): Future[Done] =
modifySource: Source[(Message, T), NotUsed] => Source[Message, NotUsed])
: Future[Done] =
sqsStream.runStream(
streamName,
source => modifySource(messageFromS3Source(source)))

def foreach(streamName: String, process: T => Future[Unit])(
implicit decoder: Decoder[T]): Future[Done] =
def foreach(streamName: String, process: T => Future[Unit]): Future[Done] =
sqsStream.foreach(
streamName = streamName,
process = (notification: NotificationMessage) =>
for {
body <- getBody(notification.body)
body <- Future.fromTry {
getBody(notification.body)
}
result <- process(body)
} yield result
)

private def messageFromS3Source(
source: Source[(Message, NotificationMessage), NotUsed])(
implicit decoder: Decoder[T]) = {
source: Source[(Message, NotificationMessage), NotUsed])
: Source[(Message, T), NotUsed] = {
source.mapAsyncUnordered(sqsConfig.parallelism) {
case (message, notification) =>
for {
deserialisedObject <- getBody(notification.body)
deserialisedObject <- Future.fromTry {
getBody(notification.body)
}
} yield (message, deserialisedObject)
}
}

private def getBody(messageString: String)(
implicit decoder: Decoder[T]): Future[T] =
for {
notification <- Future.fromTry(
fromJson[MessageNotification](messageString))
body <- notification match {
case inlineNotification: InlineNotification =>
Future.fromTry(fromJson[T](inlineNotification.jsonString))
case remoteNotification: RemoteNotification =>
objectStore.get(remoteNotification.location)
}
} yield body
private def getBody(messageString: String): Try[T] =
fromJson[MessageNotification](messageString).flatMap {
bigMessageReader.read
}
}
@@ -1,22 +1,14 @@
package uk.ac.wellcome.messaging.message

import java.text.SimpleDateFormat
import java.util.Date

import com.amazonaws.services.sns.AmazonSNS
import grizzled.slf4j.Logging
import io.circe.Encoder
import uk.ac.wellcome.messaging.sns.{
PublishAttempt,
SNSConfig,
SNSMessageWriter,
SNSWriter
}
import uk.ac.wellcome.messaging.sns.{SNSConfig, SNSMessageSender}
import uk.ac.wellcome.messaging.{BigMessageSender, MessageSender}
import uk.ac.wellcome.storage.ObjectStore
import uk.ac.wellcome.storage.s3.S3Config
import uk.ac.wellcome.storage.{KeyPrefix, ObjectStore}
import uk.ac.wellcome.json.JsonUtil._

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.Future

case class MessageWriterConfig(
snsConfig: SNSConfig,
Expand All @@ -26,65 +18,35 @@ case class MessageWriterConfig(
class MessageWriter[T](
messageConfig: MessageWriterConfig,
snsClient: AmazonSNS
)(implicit objectStore: ObjectStore[T], ec: ExecutionContext)
)(implicit objectStoreT: ObjectStore[T], encoderT: Encoder[T])
extends Logging {

private val snsMessageWriter = new SNSMessageWriter(snsClient = snsClient)

private val sns = new SNSWriter(
snsMessageWriter = snsMessageWriter,
snsConfig = messageConfig.snsConfig
)

private val dateFormat = new SimpleDateFormat("YYYY/MM/dd")

private def getKeyPrefix(): String = {
val topicName = messageConfig.snsConfig.topicArn.split(":").last
val currentTime = new Date()
s"$topicName/${dateFormat.format(currentTime)}/${currentTime.getTime.toString}"
private val underlying = new BigMessageSender[SNSConfig, T] {
override val messageSender: MessageSender[SNSConfig] = new SNSMessageSender(
snsClient = snsClient,
snsConfig = messageConfig.snsConfig,
subject = "Sent from MessageWriter"
)

override val objectStore: ObjectStore[T] = objectStoreT
override val namespace: String = messageConfig.s3Config.bucketName

implicit val encoder: Encoder[T] = encoderT

// If the encoded message is less than 250KB, we can send it inline
// in SNS/SQS (although the limit is 256KB, there's a bit of overhead
// caused by the notification wrapper, so we're conservative).
//
// Max SQS message size:
// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-limits.html#limits-messages
//
// Max SNS message size:
// https://aws.amazon.com/sns/faqs/
//
override val maxMessageSize: Int = 250 * 100
}

def write(message: T, subject: String)(
implicit encoder: Encoder[T]): Future[PublishAttempt] =
for {
jsonString <- Future.fromTry(toJson(message))
encodedNotification <- Future.fromTry(
toJson[MessageNotification](InlineNotification(jsonString))
)

// If the encoded message is less than 250KB, we can send it inline
// in SNS/SQS (although the limit is 256KB, there's a bit of overhead
// caused by the notification wrapper, so we're conservative).
//
// Max SQS message size:
// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-limits.html#limits-messages
//
// Max SNS message size:
// https://aws.amazon.com/sns/faqs/
//
notification: String <- if (encodedNotification
.getBytes("UTF-8")
.length > 250 * 1000) {
createRemoteNotification(message)
} else {
Future.successful(encodedNotification)
}

publishAttempt <- sns.writeMessage(
message = notification,
subject = subject
)
_ = debug(publishAttempt)
} yield publishAttempt

private def createRemoteNotification(message: T): Future[String] =
for {
location <- objectStore.put(messageConfig.s3Config.bucketName)(
message,
keyPrefix = KeyPrefix(getKeyPrefix())
)
_ = info(s"Successfully stored message in location: $location")
notification = RemoteNotification(location = location)
jsonString <- Future.fromTry(toJson[MessageNotification](notification))
} yield jsonString
def write(t: T): Future[MessageNotification] = Future.fromTry {
underlying.sendT(t)
}
}

This file was deleted.