In [1]:
using AMQPClient
using Random

Prepare the exchanges and server queues. We shall be using a direct exchange.

In [2]:
function prepare_queues(chan)
    @debug("declaring direct exchange named rpcexcg")
    @assert exchange_declare(chan, "rpcexcg", EXCHANGE_TYPE_DIRECT)

    @debug("declaring server queue")
    success, q_name, message_count, consumer_count = queue_declare(chan, "server")
    @assert success
    
    @debug("binding server queue to receive messages with routing key attribute server")
    @assert queue_bind(chan, "server", "rpcexcg", "server")
end

function teardown_queues(chan)
    @debug("unbinding server queue from exchange")
    @assert queue_unbind(chan, "server", "rpcexcg", "server")

    @debug("deleting server queue")
    success, message_count = queue_delete(chan, "server")
    @assert success
    
    @debug("deleting exchange")
    @assert exchange_delete(chan, "rpcexcg")
end;

The server process waits on the server queue.
It expects a message containing string data, which it reverses and responds back to the route mentioned in `reply_to` attribute of the message. Also attaches the `correlation_id` of the request message back with the response.

In [3]:
function server_process(chan)
    message_count = 0
    success, consumer_tag = basic_consume(chan, "server", (msg)->begin
        datastr = String(copy(msg.data))
        @debug("server got a message", datastr)

        reply_to = convert(String, msg.properties[:reply_to])
        correlation_id = convert(String, msg.properties[:correlation_id])

        responsestr = reverse(datastr)
        responsedata = convert(Vector{UInt8}, codeunits(responsestr))
        responsemsg = Message(responsedata,
            content_type="text/plain",
            correlation_id=correlation_id,
            delivery_mode=PERSISTENT)
        @debug("server responding", reply_to, correlation_id, datastr, responsestr)
        basic_publish(chan, responsemsg; exchange="rpcexcg", routing_key=reply_to)
        
        basic_ack(chan, msg.delivery_tag)
        message_count += 1
    end)
    @assert success

    while message_count < 10
        sleep(1)
    end
    @debug("cancelling server consumer")
    basic_cancel(chan, consumer_tag)
end;

The client process sends a bunch of requests to the server with string data, and when it receives a response, matches the corresponding request to it (based on correlation id) and prints the request and response out.

In [4]:
function client_process(chan, client_id)
    @debug("declaring client queue", client_id)
    success, q_name, message_count, consumer_count = queue_declare(chan, client_id)
    @assert success
    
    @debug("binding client queue", client_id)
    @assert queue_bind(chan, client_id, "rpcexcg", client_id)
    
    message_count = 0
    messages_sent = Dict{Int,String}()
    success, consumer_tag = basic_consume(chan, client_id, (msg)->begin
        response = String(copy(msg.data))
        @debug("got response", client_id, response)
        correlation_id = parse(Int, convert(String, msg.properties[:correlation_id]))
        request = messages_sent[correlation_id]
        @info("string reversed", client_id, request, response)
        
        basic_ack(chan, msg.delivery_tag)
        message_count += 1
    end)
    @assert success
    
    for idx in 1:5
        datastr = randstring(5)
        messages_sent[idx] = datastr
        data = convert(Vector{UInt8}, codeunits(datastr))
        msg = Message(data,
            content_type="text/plain",
            correlation_id=string(idx),
            reply_to=client_id,
            delivery_mode=PERSISTENT)   
        @debug("requesting string reverse", client_id, correlation_id=idx, data=datastr)
        basic_publish(chan, msg; exchange="rpcexcg", routing_key="server")
    end
    
    while message_count < 5
        sleep(1)
    end
    @debug("cancelling $client_id consumer")
    basic_cancel(chan, consumer_tag)

    @debug("unbinding client queue from exchange")
    @assert queue_unbind(chan, client_id, "rpcexcg", client_id)
    
    @debug("deleting client queue", client_id)
    success, message_count = queue_delete(chan, client_id)
    @assert success
end;

In [5]:
connection(; virtualhost="/",
        host="localhost", port=AMQPClient.AMQP_DEFAULT_PORT,
        auth_params=AMQPClient.DEFAULT_AUTH_PARAMS) do conn
    @sync begin
        exchange_prepared = false
        clients_done = 0
        @async begin
            channel(conn, AMQPClient.UNUSED_CHANNEL, true) do chan
                prepare_queues(chan)
                exchange_prepared = true
                server_process(chan)
                
                while clients_done < 2
                    sleep(1)
                end
                teardown_queues(chan)
            end
        end
        
        while !exchange_prepared
            sleep(1)
        end
        
        @async begin
            channel(conn, AMQPClient.UNUSED_CHANNEL, true) do chan
                client_process(chan, "client1")
            end
            clients_done += 1
        end
        @async begin
            channel(conn, AMQPClient.UNUSED_CHANNEL, true) do chan
                client_process(chan, "client2")
            end
            clients_done += 1
        end
    end
end

┌ Info: string reversed
│   client_id = client1
│   request = ool9x
│   response = x9loo
└ @ Main In[4]:16
┌ Info: string reversed
│   client_id = client2
│   request = 3PGFH
│   response = HFGP3
└ @ Main In[4]:16
┌ Info: string reversed
│   client_id = client1
│   request = NN3ie
│   response = ei3NN
└ @ Main In[4]:16
┌ Info: string reversed
│   client_id = client2
│   request = hHdR8
│   response = 8RdHh
└ @ Main In[4]:16
┌ Info: string reversed
│   client_id = client1
│   request = DIJx7
│   response = 7xJID
└ @ Main In[4]:16
┌ Info: string reversed
│   client_id = client2
│   request = AYUSv
│   response = vSUYA
└ @ Main In[4]:16
┌ Info: string reversed
│   client_id = client1
│   request = xjUkU
│   response = UkUjx
└ @ Main In[4]:16
┌ Info: string reversed
│   client_id = client2
│   request = gapgL
│   response = Lgpag
└ @ Main In[4]:16
┌ Info: string reversed
│   client_id = client1
│   request = Cexpa
│   response = apxeC
└ @ Main In[4]:16
┌ Info: string reversed
│   client_id

Task (done) @0x00007f172ef6ba90