-
Notifications
You must be signed in to change notification settings - Fork 44
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Application level ACK of in-only message exchanges #40
Comments
Where does this ticket lie on the development roadmap? I have a project requiring application-level JMS ack, and need to decide between waiting for this ticket or implementing another solution. Thanks. |
I also need this feature! how can I help implementing it? |
@igreenfield glad to hear 😃. Essentially, this is going to be a special case of (a more generic) EndpointConsumerReplier which is going to replace EndpointConsumer. I still plan working on it in September but PR are welcome of course! |
I could not find where the actual ack is happening. could you point me to it? |
What do you think is the right way of giving the callback object to the application? |
What are you trying to achieve? |
How the application can ack the exchange without getting it? |
By adding an |
ok. thanks. |
What exactly is unclear? |
ok I start adding this to the DSL: class AckDsl[A, M](val self: Flow[A, A, M]) {
def ack: RunnableGraph[M] = self.joinMat(Flow[A])(Keep.left)
}
implicit def ackDsl[A, M](self: Flow[A, A, M]): AckDsl[A, M] =
new AckDsl(self)
def ackExchange[A](exchange: Exchange, parallelism: Int = 1)(implicit context: StreamContext): Graph[FlowShape[StreamMessage[A], StreamMessage[A]], NotUsed] =
Flow[StreamMessage[A]].mapAsync(parallelism)(ack[A](exchange))
private def ack[A](exchange: Exchange)(implicit context: StreamContext): Future[StreamMessage[A]] = {
val promise = Promise[StreamMessage[A]]()
Future {
try {
val message = StreamMessage.from[A](exchange.getIn)
context.consumerTemplate.doneUoW(exchange)
promise.success(message)
} catch {
case e: Exception => promise.failure(e)
}
}(ExecutionContext.fromExecutorService(context.executorService))
promise.future
} but this can't work... hence I don't have the exchange... |
They'll have much logic in common, so a proper abstraction is needed here. |
I see I don't have time to work on this now. sorry, |
No problem, thanks for your interest. |
@krasserm: I'm interested in taking a shot at this and I have some questions:
It's unclear to me what the new types should be, given that the current |
@chunjef sorry for the late reply. I didn't think through the design yet but, in a first step, I'd start re-implementing only |
In addition to replying to consumed in-out message exchanges also support delayed ack of consumed in-only message exchanges e.g. by extending the DSL with an
.ack
element. Implementation of this features should also cover #32.Stream errors (see also difference between errors and failures) should be translatable to negative acknowledgements (exceptions or faults) on individual message exchanges while stream failures should fail all message exchanges that are currently being processed.
The text was updated successfully, but these errors were encountered: