intro stompize
STOMP 一套应用于消息中间件的简单文本协议, 它的优势在于 简单 和 灵活.
主流的消息中间件(像 RabbitMQ, ActiveMQ)都以实现对它的支持, 同时主流语言也都有其对应的客户端实现.
使用 STOMP 的场景中, 或多或少都会对其协议进行一些扩展定制以满足特殊的需求, 如消息安全级别, 投递优先策略, 局部顺序等.
协议的(书面上)扩展是很容易的, 但落实到代码实现可能变得很随意和凌乱, 滋生Bug.
Stompize 就是为避免这样的问题而设计的, 它定义了几条编码约定, 同时提供了少许框架代码, 旨在让协议扩展实现的代码优雅易懂, 且更易维护测试.
这里的 规格说明 并非指文档, 而是一个继承了Specification
的interface
, 如:
public interface Stomp extends Specification {}
其中, 我们可以定义哪些用以通讯的 Frame. 如:
public interface Stomp extends Specification {
@Command(optionals = {Login.class, Passcode.class, Heartbeat.class})
void connect(AcceptVersion acceptVersion, Host host, Header... optionals);
}
上面的代码就定义了一个 Frame : CONNECT
, 由@Command
标注的方法, 其中:
- 必要的 Header :
accept-version
和host
, 通过常规参数声明 - 可选的 Header :
login
,passcode
以及heartbeat
, 通过@Command
的optionals
属性声明, 且以可变参数来表示.
在 STOMP 中 Header 无一例外都是文本, 为何不使用
java.lang.String
作为其声明类型, 而是自定义的领域类型?
STOMP 是文本协议, 自然地 Header 的首选类型是字符串. 可是, 字符串却是最丰富和自由的数据类型, 换句话说, 它可以是任何数据类型.
在代码实现上, 若是使用字符串来定义, 则带来的问题是: 哪些对 Header 值进行校验和访问的代码该如何组织?
就好像 Header accept-version
代表一组可接受的协议版本, 它们由逗号进行分割.
这类 Header 值对象需要有自己的类型, 如:
class AcceptVersion extends Header {
public AcceptVersion(String... versions) {
super(Joiner.on(',').join(versions));
this.versions = ImmutableList.copyOf(versions);
}
public AcceptVersion(String versions) {
super(versions);
this.versions = ImmutableList.copyOf(Splitter.on(',').split(versions));
}
public final ImmutableList<String> versions;
}
请注意, AcceptVersion
中定义了两个构造器, 前者是编写代码使用的, 后者是留个框架代码回调的, 说白了这两个构造器就是实现了accept-version
的编解码.
因此, 请务必保证每个 Header 值对象类型有唯一一个参数是一个字符串类型的构造器.
STOMP之所以灵活, 也就在于允许 Header 自由的扩展, 而它却是最易让代码混乱的原因.
Content 业务数据的主要载体, 尽管消息中间件( Broker )是不关心它, 但它仍然不能使用字符串来表示. 原因在于, content-type
这个 Header 决定了客户端对 Content 中数据的解读方式, 那怕content-type
没有被声明, STOMP 也定义了默认为text/plain
. 因此, Content 类型的声明应类似:
class Text extends Content<String> {
public Text(String value) { this(value, null); }
public Text(Object value, String type) { super(value.toString(), type == null ? "text/plain" : type); }
@Override
protected String value() { return value; }
}
聪明的你一定发现, 上面的代码与 Header 类型定义非常类似, 是的, 两个构造器的作用也是一致的.
因此, 请务必保证 Content 类型有唯一一个参数是对象(0)和字符串(1)的构造器.
协议定义完了, 就看客户端和服务端的业务逻辑怎么实现了.
这里以客户端的视角为例, 请见示例代码如下:
public abstract class StompClient extends Stompizeble implements Stomp {
private final Channel output;
protected StompClient(Channel output) {
this.output = output;
}
@Override
public void receipt(ReceiptId receiptId) { … }
@Override
public void message(Destination destination, MessageId messageId, Subscription subscription, Text content, Header... optionals) { … }
@Override
public void connected(Version version, Header... optionals) { … }
@Override
public void error(Text content, Header... optionals) { … }
@Override
protected void out(String command, Iterable<Header> headers, Content<?> content) {
channel.write(…);
channel.flush();
}
}
作为STOMP v1.2客户端仅需要关心四个服务端的 Frame : CONNECTED
, RECEIPT
, ERROR
和 MESSAGE
, 所以上述代码 override 了这四个 Frame 对应的方法, 其中省略的部分就是应该是相应的处理逻辑.
它们是会被框架代码回调的, 能被回调的关键在于继承了 Stompizeble
, 它声明了一个名为apply
的方法, 会将收到 Frame 转变为对对应方法的调用.
方法
apply
不需要也不应该被你 override, 它是由框架代码动态生成的.
Stompizeble
还定义了一个方法out
, 这需要你来 override 的. 它会在任何未 override 方法(如 connect
, send
) 调用时回调, 自动将那些方法的调用转换成 Frame 的组成部分, 由你来决定如何将它发给服务端.
你应该猜到了,
StompClient
中未 override 的@Command
方法也将由框架代码动态生成.
请注意, StompClient
是 abstract 的, 这是有意而为之的, 因为你需要借助框架代码来实例化这个类.
客户端写好了, 怎么用? 很简单, 请看下面示例代码:
Channel channel = ...;
Stomp client = Stompize.create(Stomp.class, StompClient.class, channel);
client.send(new Destination("/topic/a"), new Text("Hello, stompize.")); // 发送消息
Stompizeble stompizeble = (Stompizeble)client;
String command = "ERROR";
Map<String, String> headers = Collections.singletonMap("message","Illegal state.");
String content = "bla bla bla."
stompizeble.apply(command, headers, content); // 回调error方法
这里省略网络相关的代码, 考虑到使用的灵活性, Stompize并不限定或依赖网络层的实现, 你可以自由选择Netty,Grizzly等.
若你不愿折腾, 而STOMP v1.2能够满足你的场景要求, 那么Stompize已经提供了一个Client实现给你使用, 它依赖Netty4.
测试代码永远是最好的文档!
请详见它们: