Skip to content
dennis zhuang edited this page Jun 2, 2013 · 14 revisions

本指南以1.4.5版本的java客户端为起点编写。

在阅读本节之前,建议您先阅读前面的章节,系统了解客户端API的使用。从1.4.5开始,MetaQ提供了一套API用来支持Spring框架,基本形式类似spring对JMS的支持。

#配置消息会话工厂

在Sring容器内配置一个MessageSessionFactory

    <!--  message session factory -->
    <bean id="sessionFactory" class="com.taobao.metamorphosis.client.extension.spring.MetaQMessageSessionFactoryBean">
        <property name="zkConnect" value="127.0.0.1:2181"/>
        <property name="zkSessionTimeoutMs" value="30000"/>
        <property name="zkConnectionTimeoutMs" value="30000"/>
        <property name="zkSyncTimeMs" value="5000"/>
    </bean>

主要是zookeeper参数配置,需要跟服务端的zk配置保持一致。更多参数参见AbstractMetaQMessageSessionFactory的javadoc。

##配置XA事务消息会话工厂

简单地将MetaQMessageSessionFactoryBean替换为XAMetaQMessageSessionFactoryBean即可,创建的对象将是XAMessageSessionFactory

#消息体转换器

MetaQ的消息载荷是一个byte数组,你需要将具体的消息内容序列化成byte数组才能发送。MessageBodyConverter接口用于做这个转换:

/**
 * Messge body object converter.
 * 
 * @author dennis<killme2008@gmail.com>
 * @since 1.4.5
 * @param <T>
 */
public interface MessageBodyConverter<T> {
    /**
     * Convert a message object to byte array.
     * 
     * @param body
     * @return
     * @throws MetaClientException
     */
    public byte[] toByteArray(T body) throws MetaClientException;


    /**
     * Convert a byte array to message object.
     * 
     * @param bs
     * @return
     * @throws MetaClientException
     */
    public T fromByteArray(byte[] bs) throws MetaClientException;
}

MetaQ客户端提供了Java序列化实现的JavaSerializationMessageBodyConverter给你使用,你也可以自定义自己的消息body转换器,比如采用其他序列化协议,如protobufs,hessian等。

配置一个消息body转换器,比如我们就使用Java序列化:

 <!--  message body converter using java serialization. -->
    <bean id="messageBodyConverter"  class="com.taobao.metamorphosis.client.extension.spring.JavaSerializationMessageBodyConverter"/>

#使用MetaqTemplate发送消息

##配置MetaqTemplate:

MetaqTemplate用于发送MetaQ消息,基本配置:

    <!--  template to send messages. -->
    <bean id ="metaqTemplate" class="com.taobao.metamorphosis.client.extension.spring.MetaqTemplate">    
        <property name="messageSessionFactory" ref="sessionFactory"/>
        <property name="messageBodyConverter" ref="messageBodyConverter"/>
    </bean>

metaqTemplate用到了上面配置的messageSessionFactorymessageBodyConverter,用来创建producer和转换消息体。

配置了metaqTemplate之后,你将可以在你要发送消息的JavaBean里引用这个对象,并使用它发送消息。

##MessageBuilder创建消息并发送 MetaqTemplate的send方法不直接接收Message对象,而是MessageBuilder构建器,用来构建消息:

 final String topic = "date";
 final SendResult sendResult =
                    template.send(MessageBuilder.withTopic(topic).withBody(new Date());

上面的例子发送topic为date的消息,消息体为java.util.Date对象,MetaqTemplate将调用messageBodyConverter将消息体的date对象转换为byte数组,构建message对象再发送。

因为我们使用了JavaSerializationMessageBodyConverter做消息体转换,因此任何可序列化的JavaBean其实都可以作为消息体发送,使用MessageBuilder.withBody方法设置即可。如果还是想发送自定义的byte数组,可以使用MessageBuilder.withPayload(byte [])设置。但是两者不能同时设置。

send(MessageBuilder)方法同样有重载方法用于异步发送和设置请求超时,具体见javadoc。

##共享MessageProducer

MetaqTemplate会为每个发送的topic创建一个MessageProducer并发布topic,你也可以共享一个MessageProducer来发送多个topic的消息,只要在配置metqTempalte的设置属性shareProducer为true即可:

    <!--  template to send messages. -->
    <bean id ="metaqTemplate" class="com.taobao.metamorphosis.client.extension.spring.MetaqTemplate">    
        <property name="messageSessionFactory" ref="sessionFactory"/>
        <property name="messageBodyConverter" ref="messageBodyConverter"/>
        <property name="shareProducer" value="true"/>
    </bean>

#订阅消息

##配置订阅Topic

例如你想订阅

##继承DefaultMessageListener

##配置MessageListenerContainer

#资源释放