Skip to content

Commit

Permalink
doc(fjagej): document Julia-fjåge gateway
Browse files Browse the repository at this point in the history
  • Loading branch information
mchitre committed Jul 14, 2019
1 parent f283cc0 commit a9263d7
Showing 1 changed file with 154 additions and 28 deletions.
182 changes: 154 additions & 28 deletions src/main/julia/Fjage.jl
Original file line number Diff line number Diff line change
@@ -1,15 +1,39 @@
"""
Julia-fjåge Gateway.
Note: This implementation is not thread-safe.
# Examples
Assuming fjåge master container is running on `localhost` at port 1100:
```julia-repl
julia> using Fjage
julia> ShellExecReq = MessageClass("org.arl.fjage.shell.ShellExecReq");
julia> gw = Gateway("localhost", 1100);
julia> shell = agentforservice(gw, "org.arl.fjage.shell.Services.SHELL")
shell
julia> request(shell, ShellExecReq(cmd="ps"))
AGREE
julia> close(gw)
```
"""
module Fjage

# NOTES:
# - not threadsafe
# install and use dependencies
# using Pkg
# Pkg.add("JSON")

using Sockets, Distributed, JSON, Base64, UUIDs, Dates
using Sockets, Distributed, Base64, UUIDs, Dates, JSON

# exported symbols
export Performative, AgentID, Gateway, Message, GenericMessage, MessageClass
export agent, topic, send, receive, request, agentforservice, agentsforservice, subscribe, unsubscribe

# package settings
const MAX_QUEUE_LEN = 256

"An action represented by a message."
module Performative
const REQUEST = "REQUEST"
const AGREE = "AGREE"
Expand All @@ -25,11 +49,13 @@ module Performative
const CANCEL = "CANCEL"
end

# respond to master container
function _respond(gw, rq::Dict, rsp::Dict)
s = JSON.json(merge(Dict("id" => rq["id"], "inResponseTo" => rq["action"]), rsp))
println(gw.sock, s)
end

# ask master container a question, and wait for reply
function _ask(gw, rq::Dict)
id = string(uuid4())
s = JSON.json(merge(rq, Dict("id" => id)))
Expand All @@ -43,6 +69,7 @@ function _ask(gw, rq::Dict)
end
end

# update master container about changes to recipient watch list
function _update_watch(gw)
watch = [gw.agentID.name]
for s in keys(gw.subscriptions)
Expand All @@ -55,6 +82,7 @@ function _update_watch(gw)
println(gw.sock, s)
end

# task monitoring incoming JSON messages from master container
function _run(gw)
println(gw.sock, "{\"alive\": true}")
_update_watch(gw)
Expand Down Expand Up @@ -95,14 +123,35 @@ function _run(gw)
end
end

"Base class for messages transmitted by one agent to another."
abstract type Message end

"An identifier for an agent or a topic."
struct AgentID
name::String
istopic::Bool
owner
end

"""
aid = AgentID(name[, istopic])
Create an unowned AgentID.
See also: [`agent`](@ref), [`topic`](@ref)
"""
AgentID(name::String) = name[1] == '#' ? AgentID(name[2:end], true, nothing) : AgentID(name, false, nothing)
AgentID(name::String, istopic::Bool) = AgentID(name, istopic, nothing)
Base.show(io::IO, aid::AgentID) = print(io, aid.istopic ? "#"*aid.name : aid.name)
JSON.lower(aid::AgentID) = aid.istopic ? "#"*aid.name : aid.name

"""
gw = Gateway([name,] host, port)
Open a new TCP/IP gateway to communicate with fjåge agents from Julia.
See also: [`Fjage`](@ref)
"""
struct Gateway
agentID::AgentID
sock::TCPSocket
Expand All @@ -122,25 +171,32 @@ struct Gateway
end
end

AgentID(name::String) = name[1] == '#' ? AgentID(name[2:end], true, nothing) : AgentID(name, false, nothing)
AgentID(name::String, istopic::Bool) = AgentID(name, istopic, nothing)
AgentID(name::String, owner::Gateway) = name[1] == '#' ? AgentID(name[2:end], true, owner) : AgentID(name, false, owner)
Base.show(io::IO, aid::AgentID) = print(io, aid.istopic ? "#"*aid.name : aid.name)
JSON.lower(aid::AgentID) = aid.istopic ? "#"*aid.name : aid.name

Gateway(host::String, port::Integer) = Gateway("julia-gw-" * string(uuid1()), host, port)
Base.show(io::IO, gw::Gateway) = print(io, gw.agentID.name)

"""
aid = agent([gw,] name)
Creates an AgentID for a named agent, optionally owned by a gateway. AgentIDs that are
associated with gateways can be used directly in `send()` and `request()` calls.
"""
agent(name::String) = AgentID(name, false)
agent(gw::Gateway, name::String) = AgentID(name, false, gw)

"""
aid = topic([gw,] name[, subtopic])
Creates an AgentID for a named topic, optionally owned by a gateway. AgentIDs that are
associated with gateways can be used directly in `send()` and `request()` calls.
"""
topic(name::String) = AgentID(name, true)
topic(aid::AgentID) = aid.istopic ? aid : AgentID(aid.name*"__ntf", true)
topic(aid::AgentID, topic2::String) = AgentID(aid.name*"__"*topic2*"__ntf", true)

agent(gw::Gateway, name::String) = AgentID(name, false, gw)
topic(gw::Gateway, name::String) = AgentID(name, true, gw)
topic(gw::Gateway, aid::AgentID) = aid.istopic ? aid : AgentID(aid.name*"__ntf", true, gw)
topic(gw::Gateway, aid::AgentID, topic2::String) = AgentID(aid.name*"__"*topic2*"__ntf", true, gw)

"Find an agent that provides a named service."
function agentforservice(gw::Gateway, svc::String)
rq = Dict("action" => "agentForService", "service" => svc)
rsp = _ask(gw, rq)
Expand All @@ -151,30 +207,32 @@ function agentforservice(gw::Gateway, svc::String)
end
end

"Find all agents that provides a named service."
function agentsforservice(gw::Gateway, svc::String)
rq = Dict("action" => "agentsForService", "service" => svc)
rsp = _ask(gw, rq)
return [AgentID(a, false, gw) for a in rsp["agentIDs"]]
end

"Subscribe to receive all messages sent to the given topic."
function subscribe(gw::Gateway, aid::AgentID)
gw.subscriptions[string(topic(gw, aid))] = true
_update_watch(gw)
end

"Unsubscribe from receiving messages sent to the given topic."
function unsubscribe(gw::Gateway, aid::AgentID)
delete!(gw.subscriptions, string(topic(gw, aid)))
_update_watch(gw)
end

"Close a gateway connection to the master container."
function close(gw::Gateway)
println(gw.sock, "{\"alive\": false}")
Base.close(gw.sock)
end

Base.close(gw::Gateway) = close(gw)
Base.flush(gw::Gateway) = flush(gw)

# create a Message subclass from a qualified classname
macro _define_message(sname::Symbol, clazz, perf)
quote
struct $(esc(sname)) <: Message
Expand All @@ -194,6 +252,22 @@ macro _define_message(sname::Symbol, clazz, perf)
end
end

"""
mtype = MessageClass(clazz[, perf])
Create a message class from a fully qualified class name. If a performative is not
specified, it is guessed based on the class name. For class names ending with "Req",
the performative is assumed to be REQUEST, and for all other messages, INFORM.
# Examples
```julia-repl
julia> using Fjage
julia> ShellExecReq = MessageClass("org.arl.fjage.shell.ShellExecReq");
julia> req = ShellExecReq(cmd="ps")
ShellExecReq: REQUEST [cmd:"ps"]
```
"""
function MessageClass(clazz::String, perf=nothing)
if perf == nothing
if match(r"Req$", clazz) != nothing
Expand All @@ -206,6 +280,7 @@ function MessageClass(clazz::String, perf=nothing)
return @eval @_define_message($sname, $clazz, $perf)
end

# prepares a message to be sent to the server
function _prepare!(gw::Gateway, msg::Message)
msg.sender = gw.agentID
for k in keys(msg.__data__)
Expand All @@ -217,6 +292,7 @@ function _prepare!(gw::Gateway, msg::Message)
end
end

# converts Base64 encoded arrays to Julia arrays
function _b64toarray(v)
try
dtype = v["clazz"]
Expand All @@ -241,6 +317,7 @@ function _b64toarray(v)
end
end

# creates a message object from a JSON representation of the object
function _inflate(json)
if typeof(json) == String
json = JSON.parse(json)
Expand Down Expand Up @@ -269,6 +346,15 @@ function _inflate(json)
return obj
end

"""
send(gw, msg)
send(aid, msg)
Send a message via the gateway to the specified agent. If the gateway (`gw`) is specified then the
`recipient` field of the message must be populated with an agentID. If the agentID (`aid`) is specified,
it must be an "owned" agentID obtained from the `agent(gw, name)` function or returned by the
`agentforservice(gw, service)` function.
"""
function send(gw::Gateway, msg::Message)
_prepare!(gw, msg)
json = JSON.json(Dict("action" => "send", "relay" => true, "message" => msg))
Expand All @@ -283,6 +369,34 @@ function send(aid::AgentID, msg::Message)
send(aid.owner, msg)
end

# helper function to see if a message matches a filter
function _matches(filt, msg)
if msg == nothing
return true
end
if typeof(filt) == DataType
return typeof(msg) <: filt
elseif typeof(filt) <: Message
return msg.inReplyTo == filt.msgID
elseif typeof(filt) <: Function
return filt(msg)
end
return false
end

"""
msg = receive(gw[, filter][, timeout])
Receive an incoming message from other agents or topics. Timeout is specified in
milliseconds. If no timeout is specified, the call is non-blocking.
If a `filter` is specified, only messages matching the filter are retrieved. A filter
may be a message type, a message or a function. If it is a message type, only messages
of that type or a subtype are retrieved. If it is a message, any message whose `inReplyTo`
field is set to the `msgID` of the specified message is retrieved. If it is a function,
it must take in a message and return `true` or `false`. A message for which it returns
`true` is retrieved.
"""
function receive(gw::Gateway)
if isready(gw.queue)
return take!(gw.queue)
Expand All @@ -309,20 +423,6 @@ function receive(gw::Gateway, timeout::Integer)
return rv
end

function _matches(filt, msg)
if msg == nothing
return true
end
if typeof(filt) == DataType
return typeof(msg) <: filt
elseif typeof(filt) <: Message
return msg.inReplyTo == filt.msgID
elseif typeof(filt) <: Function
return filt(msg)
end
return false
end

function receive(gw::Gateway, filt, timeout::Integer)
t1 = now() + Millisecond(timeout)
cache = []
Expand All @@ -343,6 +443,16 @@ function receive(gw::Gateway, filt, timeout::Integer)
end
end

"""
rsp = request(gw, msg[, timeout])
rsp = request(aid, msg[, timeout])
Send a request via the gateway to the specified agent, and wait for a response. The response is returned.
If the gateway (`gw`) is specified then the `recipient` field of the request message (`msg`) must be
populated with an agentID. If the agentID (`aid`) is specified, it must be an "owned" agentID obtained
from the `agent(gw, name)` function or returned by the `agentforservice(gw, service)` function. The timeout
is specified in milliseconds, and defaults to 1 second if unspecified.
"""
function request(gw::Gateway, msg::Message, timeout::Integer=1000)
send(gw, msg)
return receive(gw, msg, timeout)
Expand All @@ -353,12 +463,14 @@ function request(aid::AgentID, msg::Message, timeout::Integer=1000)
return receive(aid.owner, msg, timeout)
end

"Flush the incoming message queue."
function flush(gw::Gateway)
while isready(gw.queue)
take!(gw.queue)
end
end

# adds notation message.field
function Base.getproperty(s::Message, p::Symbol)
if p == :__clazz__
return getfield(s, :clazz)
Expand All @@ -380,6 +492,7 @@ function Base.getproperty(s::Message, p::Symbol)
end
end

# adds notation message.field
function Base.setproperty!(s::Message, p::Symbol, v)
if p == :__clazz__ || p == :__data__
error("read-only property cannot be set")
Expand All @@ -394,6 +507,7 @@ function Base.setproperty!(s::Message, p::Symbol, v)
end
end

# pretty prints arrays without type names
function _repr(x)
x = repr(x)
m = match(r"[A-Za-z0-9]+(\[.+\])", x)
Expand All @@ -403,6 +517,7 @@ function _repr(x)
return x
end

# pretty printing of messages
function Base.show(io::IO, msg::Message)
ndx = findlast(".", msg.__clazz__)
s = ndx == nothing ? msg.__clazz__ : msg.__clazz__[ndx[1]+1:end]
Expand Down Expand Up @@ -461,8 +576,19 @@ function Base.show(io::IO, msg::Message)
print(io, s)
end

"Generic message type that can carry arbitrary name-value pairs as data."
GenericMessage = MessageClass("org.arl.fjage.GenericMessage", Performative.INFORM)

"""
msg = Message([perf])
Create a message with just a performative (`perf`) and no data. If the performative
is not specified, it defaults to INFORM.
"""
Message(perf::String=Performative.INFORM) = GenericMessage(performative=perf)

# Base functions to add local methods
Base.close(gw::Gateway) = close(gw)
Base.flush(gw::Gateway) = flush(gw)

end

0 comments on commit a9263d7

Please sign in to comment.