Skip to content
This repository has been archived by the owner on Jun 11, 2021. It is now read-only.

Allow monitoring of RpcClient work #64

Closed
wants to merge 2 commits into from

Conversation

Falmarri
Copy link

@Falmarri Falmarri commented Sep 1, 2015

I ran into a case where my Rpc jobs were quite large and take upwards of 30+ minutes. I wanted to be able to monitor the status of the jobs without having to re-write the RpcClient that you wrote. I just wanted to make sure jobs were actually being completed and being sent back, not just spinning.

Also I'd like to be able to update my downstream clients about the percentage of the job that's done, instead of having to monitor the rabbit queue directly.

in.

Change-Id: Id6a3559e9805420f99eea91f2460fda40ac80964
responses

Change-Id: I5e3cc54fd1f0f4c42042231b4875334cfdb9065a
@sstone
Copy link
Owner

sstone commented Sep 2, 2015

I don't have much time right now but I'll try and review your PR asap.
Thanks!

@sstone
Copy link
Owner

sstone commented Sep 16, 2015

Hi, Sorry for taking so long to reply.

I'm not sure that I understand your use case: the one request/several responses RPC pattern is typically used to process jobs which are split over different shards/partitions, and executed in parallel. In this case, there would be no monitoring feedback for most of the job's duration, and all 'job completed' message would arrive nearly at the same time when the job is completed, so it would not be very useful.
Besides, long running jobs are usually handled differently: upon job submission, users receive a uid that is checked at regular interval against a monitoring 'database/repository' that gets updated by the running job. Why do you think that using the messaging layer is a better choice in your case ?

Thanks.
ps: I know that it's a bit annoying when you propose a change and someone asks "why are you doing this" :)

@Falmarri
Copy link
Author

I understand that it's probably not ideal. But this is my use case:

My RPC producer parses a file and generates 1000+ messages to send to various consumers who do the work and persist the data. The work that the consumers do isn't sent back to the producer, nor does the producer have access to the persistence layer where the consumers save the results. And these 1000 messages can take 30-60 minutes to process.

All I'm basically trying to do is be able to report on the progress of the consumers, particularly I'm interested in seeing if they've stalled. If I send off 1000 message, and then 300 of them come back and then I stop getting them, I'd like to be able to tell that. But I can't use strict timeouts fro the whole batch because sometimes it may take 30 minutes, sometimes 60, sometimes more.

So to summarize, the main reason for this is because the producer doesn't have access to the data that the consumers persist.

And actually, maybe I misread your question. I'm actually using this RPC pattern in a 1 request, 1 response scenario. It's just that it's in batches. So a batch can have 1000 requests, which I need to know when those specific 1000 are done. And it can have multiple batches running at once. But always 1 response for 1 rabbit message.

@sstone
Copy link
Owner

sstone commented Sep 21, 2015

Ok, I understand your use case but not the code in your PR (does it really do what you expect ?).
If you cannot use a external "monitoring db" (the producer does not need to access the actual data, just this monitoring db) then what you could do is:

  • when a job is submitted, create a new rand job id
  • split the data to process and send each part using this job id as a correlation id
  • when you receive a response (just a simple ACK in your case for example), retrieve the amqp correlation id, use it to find which job this response belongs to, and it will give you a basic progress metric for this job

Basically, your client would be very close to the existing RpcClient and could work like this:

package com.github.sstone.amqp

import java.util.UUID

import akka.actor.{ActorLogging, Actor, ActorSystem, Props}
import com.github.sstone.amqp.Amqp._
import com.github.sstone.amqp.RpcServer.{IProcessor, ProcessResult}
import com.rabbitmq.client.{AMQP, BasicProperties, ConnectionFactory}

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

object Test1 extends App {
  implicit val system = ActorSystem("mySystem")
  val conn = system.actorOf(ConnectionOwner.props(new ConnectionFactory(), reconnectionDelay = 5 seconds), "connection")

  // create 2 servers
  val processor = new IProcessor {
    override def onFailure(delivery: Delivery, e: Throwable): ProcessResult = ???

    override def process(delivery: Delivery): Future[ProcessResult] = {
      Thread.sleep(100)
      Future.successful(ProcessResult(Some(delivery.body))) // here we could send anything back
    }
  }
  val queueParams = QueueParameters("request_queue", passive = false, durable = false, exclusive = false, autodelete = true)

  val server1 = ConnectionOwner.createChildActor(
    conn,
    RpcServer.props(queueParams, StandardExchanges.amqDirect,  "my_key", processor, ChannelParameters(qos = 1)))

  val server2 = ConnectionOwner.createChildActor(
    conn,
    RpcServer.props(queueParams, StandardExchanges.amqDirect,  "my_key", processor, ChannelParameters(qos = 1)))

  val producer = ConnectionOwner.createChildActor(conn, ChannelOwner.props())

  // create a client that will handle job batches
  case class Job(expected: Int, received: Vector[Array[Byte]])

  class MyClient extends Actor with ActorLogging {
    val jobs = new collection.mutable.HashMap[String, Job]

    def receive = {
      case ('work, data: Array[Byte]) =>
        val items = data.grouped(20).toList
        val correlationId = UUID.randomUUID().toString
        val properties = new AMQP.BasicProperties.Builder().correlationId(correlationId).replyTo("response_queue").build()
        items.map(item => producer ! Publish("", "request_queue", item, Some(properties)))
        jobs += correlationId -> Job(items.size, Vector.empty[Array[Byte]])
      case Delivery(tag, envelope, properties, body) if !jobs.contains(properties.getCorrelationId) =>
        log.warning(s"received response for unknown job ${properties.getCorrelationId}")
      case Delivery(tag, envelope, properties, body) =>
        val job = jobs(properties.getCorrelationId)
        val received = job.received :+ body
        if (received.size == job.expected) {
          log.info(s"job ${properties.getCorrelationId} has been completed")
          jobs -= properties.getCorrelationId
        } else {
          log.info(s"job ${properties.getCorrelationId}: ${received.size} sub-jobs have been completed out of ${job.expected}")
          jobs.update(properties.getCorrelationId, job.copy(received = received))
        }
    }
  }
  val client = system.actorOf(Props[MyClient])
  val responseQueueParams = QueueParameters("response_queue", passive = false, durable = false, exclusive = false, autodelete = true)
  val consumer = ConnectionOwner.createChildActor(conn, Consumer.props(client, channelParams = None, autoack = true))

  // wait till everyone is actually connected to the broker
  Amqp.waitForConnection(system, consumer).await()
  consumer ! Record(AddQueue(responseQueueParams))

  Thread.sleep(1000)

  val data = new Array[Byte](100)
  client ! ('work, data)
  client ! ('work, data)
}

Does it help ? (I hope I've not completely missed the point...)
Thanks

@Falmarri
Copy link
Author

Yes that makes sense.

Basically, your client would be very close to the existing RpcClient

I guess this is what I was trying to avoid, re-implementing my own RPC. If you don't feel like this patch would be useful to others and that not using the RpcClient class is the better way, I completely understand and you can feel free to close it, I won't be offended at all. I just thought that it might fit other people's use cases ¯_(ツ)_/¯

@sstone
Copy link
Owner

sstone commented Sep 21, 2015

You could maybe start from the example above and tell me if it works for you. If it does, I'll add it to the existing samples but I'm not sure yet about adding a new rpc client. It's not that much code and people usually want custom error handling etc... so I don't know yet how to turn this into a generic tool. What do you think?

@Falmarri Falmarri closed this Mar 29, 2016
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants