Skip to content

Commit

Permalink
First commit
Browse files Browse the repository at this point in the history
  • Loading branch information
killme2008 committed Mar 14, 2012
0 parents commit a7909fd
Show file tree
Hide file tree
Showing 20 changed files with 1,345 additions and 0 deletions.
1 change: 1 addition & 0 deletions README
@@ -0,0 +1 @@
Metamorphosis实例工程
27 changes: 27 additions & 0 deletions conf/tail4j.ini
@@ -0,0 +1,27 @@
[system]
checkpoint_path=/home/dennis/meta/tail4j/
max_buf_size=131073

[meta]
server_url=
diamond_zk_data_id=
diamond_zk_group=

[local_test]
test_path=/home/dennis/meta

[topic_1]
topic=test
log_base_path=/home/dennis/programming/java/metamorphosis/trunk/metamorphosis/metamorphosis-example
tmp_log_fullpath=/home/dennis/programming/java/metamorphosis/trunk/metamorphosis/metamorphosis-example/config.client.log
log_name_regx=/home/dennis/programming/java/metamorphosis/trunk/metamorphosis/metamorphosis-example/config\\.client\\.log.*
encoding=utf-8
compress=false
ordered=true
timeout=10000
checkpoint_name=test_cp





70 changes: 70 additions & 0 deletions pom.xml
@@ -0,0 +1,70 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">


<modelVersion>4.0.0</modelVersion>
<groupId>com.taobao.metamorphosis</groupId>
<packaging>jar</packaging>
<artifactId>metamorphosis-example</artifactId>
<name>metamorphosis-example</name>
<version>1.1.0-SNAPSHOT</version>

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.taobao.metamorphosis</groupId>
<artifactId>metamorphosis-client</artifactId>
<version>1.4.0.2</version>
</dependency>
<dependency>
<groupId>com.taobao.metamorphosis</groupId>
<artifactId>metamorphosis-client-extension</artifactId>
<version>1.4.0.2</version>
</dependency>
<dependency>
<groupId>org.ini4j</groupId>
<artifactId>ini4j</artifactId>
<version>0.5.1</version>
</dependency>
<dependency>
<groupId>com.atomikos</groupId>
<artifactId>transactions-jta</artifactId>
<version>3.7.0</version>
</dependency>
<dependency>
<groupId>com.atomikos</groupId>
<artifactId>transactions-jta</artifactId>
<version>3.7.0</version>
</dependency>
<dependency>
<groupId>com.atomikos</groupId>
<artifactId>atomikos-util</artifactId>
<version>3.7.0</version>
</dependency>
<dependency>
<groupId>com.atomikos</groupId>
<artifactId>transactions</artifactId>
<version>3.7.0</version>
</dependency>
<dependency>
<groupId>jta</groupId>
<artifactId>jta</artifactId>
<version>1.0.1b</version>
</dependency>
<dependency>
<groupId>commons-dbcp</groupId>
<artifactId>commons-dbcp</artifactId>
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.5</version>
</dependency>
</dependencies>
</project>
70 changes: 70 additions & 0 deletions src/main/java/com/taobao/metamorphosis/example/AsyncConsumer.java
@@ -0,0 +1,70 @@
/*
* (C) 2007-2012 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Authors:
* wuhua <wq163@163.com> , boyan <killme2008@gmail.com>
*/
package com.taobao.metamorphosis.example;

import static com.taobao.metamorphosis.example.Help.initMetaConfig;

import java.util.concurrent.Executor;

import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.client.MessageSessionFactory;
import com.taobao.metamorphosis.client.MetaMessageSessionFactory;
import com.taobao.metamorphosis.client.consumer.ConsumerConfig;
import com.taobao.metamorphosis.client.consumer.MessageConsumer;
import com.taobao.metamorphosis.client.consumer.MessageListener;


/**
* 异步消息消费者
*
* @author boyan
* @Date 2011-5-17
*
*/
public class AsyncConsumer {
public static void main(final String[] args) throws Exception {
// New session factory,强烈建议使用单例
final MessageSessionFactory sessionFactory = new MetaMessageSessionFactory(initMetaConfig());

// subscribed topic
final String topic = "meta-test";
// consumer group
final String group = "meta-example";
// create consumer,强烈建议使用单例
final MessageConsumer consumer = sessionFactory.createConsumer(new ConsumerConfig(group));
// subscribe topic
consumer.subscribe(topic, 1024 * 1024, new MessageListener() {

@Override
public void recieveMessages(final Message message) {
System.out.println("Receive message " + new String(message.getData()));
}


@Override
public Executor getExecutor() {
// Thread pool to process messages,maybe null.
return null;
}
});
// complete subscribe
consumer.completeSubscribe();

}

}
@@ -0,0 +1,83 @@
package com.taobao.metamorphosis.example;

import static com.taobao.metamorphosis.example.Help.initMetaConfig;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.client.extension.AsyncMessageSessionFactory;
import com.taobao.metamorphosis.client.extension.AsyncMetaMessageSessionFactory;
import com.taobao.metamorphosis.client.producer.MessageProducer;
import com.taobao.metamorphosis.client.producer.SendMessageCallback;
import com.taobao.metamorphosis.client.producer.SendResult;


/**
* <pre>
* 异步单向消息发送者 *
*
*
* 用于创建异步单向发送消息的会话工厂.
*
* 使用场景:
* 对于发送可靠性要求不那么高,但要求提高发送效率和降低对宿主应用的影响,提高宿主应用的稳定性.
* 例如,收集日志或用户行为信息等场景.
* 注意:
* 发送消息后返回的结果中不包含准确的messageId,partition,offset,这些值都是-1
*
* @author 无花
* @Date 2012-2-27
*
*/
public class AsyncOnewayProducer {
public static void main(final String[] args) throws Exception {
// New session factory,强烈建议使用单例
final AsyncMessageSessionFactory sessionFactory = new AsyncMetaMessageSessionFactory(initMetaConfig());
// create producer,强烈建议使用单例
final MessageProducer producer = sessionFactory.createAsyncProducer();
// publish topic
final String topic = "slave-test";
producer.publish(topic);

final BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
String line = null;
while ((line = readLine(reader)) != null) {
// send message
try {
producer.sendMessage(new Message(topic, line.getBytes()), new SendMessageCallback() {

@Override
public void onMessageSent(final SendResult result) {
if (result.isSuccess()) {
System.out.println("Send message successfully,sent to " + result.getPartition());
}
else {
System.err.println("Send message failed,error message:" + result.getErrorMessage());
}

}


@Override
public void onException(final Throwable e) {
e.printStackTrace();

}
});

}
catch (final Exception e) {
e.printStackTrace();
}
// check result
}
}


private static String readLine(final BufferedReader reader) throws IOException {
System.out.println("Type a message to send:");
return reader.readLine();
}
}
73 changes: 73 additions & 0 deletions src/main/java/com/taobao/metamorphosis/example/AsyncProducer.java
@@ -0,0 +1,73 @@
package com.taobao.metamorphosis.example;

import static com.taobao.metamorphosis.example.Help.initMetaConfig;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.client.MessageSessionFactory;
import com.taobao.metamorphosis.client.MetaMessageSessionFactory;
import com.taobao.metamorphosis.client.producer.MessageProducer;
import com.taobao.metamorphosis.client.producer.SendMessageCallback;
import com.taobao.metamorphosis.client.producer.SendResult;


/**
* 异步消息发送者
*
* @author 无花
* @Date 2012-2-27
*
*/
public class AsyncProducer {
public static void main(final String[] args) throws Exception {
// New session factory,强烈建议使用单例
final MessageSessionFactory sessionFactory = new MetaMessageSessionFactory(initMetaConfig());
// create producer,强烈建议使用单例
final MessageProducer producer = sessionFactory.createProducer();
// publish topic
final String topic = "slave-test";
producer.publish(topic);

final BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
String line = null;
while ((line = readLine(reader)) != null) {
// send message
try {
producer.sendMessage(new Message(topic, line.getBytes()), new SendMessageCallback() {

@Override
public void onMessageSent(final SendResult result) {
if (result.isSuccess()) {
System.out.println("Send message successfully,sent to " + result.getPartition());

}
else {
System.err.println("Send message failed,error message:" + result.getErrorMessage());
}

}


@Override
public void onException(final Throwable e) {
e.printStackTrace();

}
});

}
catch (final Exception e) {
e.printStackTrace();
}
}
}


private static String readLine(final BufferedReader reader) throws IOException {
System.out.println("Type a message to send:");
return reader.readLine();
}
}

0 comments on commit a7909fd

Please sign in to comment.