Skip to content

Commit

Permalink
release 1.6.1 (#209) (#229)
Browse files Browse the repository at this point in the history
* release 1.6.1 (#209)

* (feat) add flush consolidation handler (#161)

* (feat) add flush consolidation option

* 1. upgrade version to 1.5.6 (#169)

2. fix NPE in channelInactive
3. handling reconnection situation with no connection binding with channel
4. du not scan connection pool associated with task that has not been don
5. add CONNECT_FAILED event type and print user event log

* Change org from alipay to sofatack. (#172)

* Update .travis.yml (#177)

* modify ReconnectManager to be compatible with version 1.5.x

* upgrade netty version to fix CNNVD-201909-1264 vulnerability (#189)

* 应该一个是序列化一个是反序列化 (#191)

fix unit test bug in class NormalStringCustomSerializer.

* (feat) Supports SSL for rpc server/client, #149 (#197)

* support server random port constructor. fix #196 (#199)

* fix AbstractLifeCycle concurrency problem Fixes gh-163 (#188)

* fix  AsynMultiInterestUserProcessor dispatch to sync handleRequest  method problem. gh-157

* Revert "fix  AsynMultiInterestUserProcessor dispatch to sync handleRequest  method problem. gh-157"

This reverts commit 2784f9e.

* support server random port constructor

* add rejection-processable InvokeCallback to provide flexible options when user executor rejected task. see #178(#178) (#202)

* complete tcp related config. see #171(#171) (#205)

* optimize DefaultConnectionManager contructor (#201)

* support availability check with life cycle. #195 (#204)

* support availability check with life cycle. #123(#123)

* complete unit test of life cycle available check

* fix CI error

* Fix/multi processor bug (#207)

* fix spell mistake and some test codes (#194)

* fix multi-processor bug

* fix PMD

* support user processor with life cycle interface fix #123 (#200)

* support user processor with life cycle interface

* make sure user processor startup if client/server has startup

* add state check before operate user processor in order to prevent from some unnecessary exception

* fix some codes (#208)

* change version

* update travis ci config

Co-authored-by: cytnju <muyun.cyt@antfin.com>
  • Loading branch information
dbl-x and cytnju committed Jul 6, 2020
1 parent 7815d10 commit 94c3623
Show file tree
Hide file tree
Showing 46 changed files with 1,674 additions and 276 deletions.
4 changes: 2 additions & 2 deletions .middleware-common/.travis.settings.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<repository>
<id>central</id>
<name>Central Repository</name>
<url>http://repo1.maven.org/maven2</url>
<url>https://repo1.maven.org/maven2</url>
<releases>
<enabled>true</enabled>
<updatePolicy>always</updatePolicy>
Expand Down Expand Up @@ -108,7 +108,7 @@
<pluginRepository>
<id>central</id>
<!-- specify repo1 which support http -->
<url>http://repo1.maven.org/maven2</url>
<url>https://repo1.maven.org/maven2</url>
<releases>
<enabled>true</enabled>
<updatePolicy>always</updatePolicy>
Expand Down
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ jdk:
- oraclejdk8
- openjdk7

dist: trusty

before_install:
- echo "Downloading Maven 3.2.5"
&& wget https://archive.apache.org/dist/maven/maven-3/3.2.5/binaries/apache-maven-3.2.5-bin.zip
Expand Down
14 changes: 13 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.alipay.sofa</groupId>
<artifactId>bolt</artifactId>
<version>1.6.0</version>
<version>1.6.1</version>
<packaging>jar</packaging>

<name>${project.groupId}:${project.artifactId}</name>
Expand Down Expand Up @@ -149,6 +149,18 @@
<version>1.8.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
<version>1.20</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<version>1.20</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/com/alipay/remoting/AbstractLifeCycle.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,14 @@ public boolean isStarted() {
return isStarted.get();
}

/**
* ensure the component has been startup before providing service.
*/
protected void ensureStarted() {
if (!isStarted()) {
throw new LifeCycleException(String.format(
"Component(%s) has not been started yet, please startup first!", getClass()
.getSimpleName()));
}
}
}
16 changes: 15 additions & 1 deletion src/main/java/com/alipay/remoting/AbstractRemotingServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

/**
* Server template for remoting.
*
*
* @author jiangping
* @version $Id: AbstractRemotingServer.java, v 0.1 2015-9-5 PM7:37:48 tao Exp $
*/
Expand All @@ -56,6 +56,10 @@ public AbstractRemotingServer(int port) {
}

public AbstractRemotingServer(String ip, int port) {
if (port < 0 || port > 65535) {
throw new IllegalArgumentException(String.format(
"Illegal port value: %d, which should between 0 and 65535.", port));
}
this.ip = ip;
this.port = port;

Expand Down Expand Up @@ -123,6 +127,16 @@ public int port() {
return port;
}

/**
* override the random port zero with the actual binding port value.
* @param port local binding port
*/
protected void setLocalBindingPort(int port) {
if (port() == 0) {
this.port = port;
}
}

protected abstract void doInit();

protected abstract boolean doStart() throws InterruptedException;
Expand Down
84 changes: 45 additions & 39 deletions src/main/java/com/alipay/remoting/ConnectionEventHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,35 +139,55 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
Attribute attr = ctx.channel().attr(Connection.CONNECTION);
if (null != attr) {
// add reconnect task
if (this.globalSwitch != null
&& this.globalSwitch.isOn(GlobalSwitch.CONN_RECONNECT_SWITCH)) {
Connection conn = (Connection) attr.get();
if (reconnectManager != null) {
reconnectManager.reconnect(conn.getUrl());
}
Connection conn = (Connection) attr.get();
// if conn is null, means that channel has been inactive before binding with connection
// this situation will fire a CLOSE event in ConnectionFactory
if (conn != null) {
userEventTriggered(ctx, ConnectionEventType.CLOSE);
}
// trigger close connection event
onEvent((Connection) attr.get(), remoteAddress, ConnectionEventType.CLOSE);
}
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object event) throws Exception {
if (event instanceof ConnectionEventType) {
switch ((ConnectionEventType) event) {
ConnectionEventType eventType = (ConnectionEventType) event;
Channel channel = ctx.channel();
if (channel == null) {
logger
.warn(
"channel null when handle user triggered event in ConnectionEventHandler! eventType: {}",
eventType.name());
return;
}

Connection connection = channel.attr(Connection.CONNECTION).get();
if (connection == null) {
logger
.error(
"[BUG]connection is null when handle user triggered event in ConnectionEventHandler! eventType: {}",
eventType.name());
return;
}

final String remoteAddress = RemotingUtil.parseRemoteAddress(ctx.channel());
final String localAddress = RemotingUtil.parseLocalAddress(ctx.channel());
logger.info("trigger user event, local[{}], remote[{}], event: {}", localAddress,
remoteAddress, eventType.name());

switch (eventType) {
case CONNECT:
Channel channel = ctx.channel();
if (null != channel) {
Connection connection = channel.attr(Connection.CONNECTION).get();
this.onEvent(connection, connection.getUrl().getOriginUrl(),
ConnectionEventType.CONNECT);
} else {
logger
.warn("channel null when handle user triggered event in ConnectionEventHandler!");
}
onEvent(connection, connection.getUrl().getOriginUrl(),
ConnectionEventType.CONNECT);
break;
case CONNECT_FAILED:
case CLOSE:
case EXCEPTION:
submitReconnectTaskIfNecessary(connection.getUrl());
onEvent(connection, connection.getUrl().getOriginUrl(), eventType);
break;
default:
logger.error("[BUG]unknown event: {}", eventType.name());
break;
}
} else {
Expand All @@ -186,6 +206,12 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
ctx.channel().close();
}

private void submitReconnectTaskIfNecessary(Url url) {
if (globalSwitch.isOn(GlobalSwitch.CONN_RECONNECT_SWITCH) && reconnectManager != null) {
reconnectManager.reconnect(url);
}
}

private void onEvent(final Connection conn, final String remoteAddress,
final ConnectionEventType type) {
if (this.eventListener != null) {
Expand All @@ -198,20 +224,10 @@ public void run() {
}
}

/**
* Getter method for property <tt>listener</tt>.
*
* @return property value of listener
*/
public ConnectionEventListener getConnectionEventListener() {
return eventListener;
}

/**
* Setter method for property <tt>listener</tt>.
*
* @param listener value to be assigned to property listener
*/
public void setConnectionEventListener(ConnectionEventListener listener) {
if (listener != null) {
this.eventListener = listener;
Expand All @@ -221,20 +237,10 @@ public void setConnectionEventListener(ConnectionEventListener listener) {
}
}

/**
* Getter method for property <tt>connectionManager</tt>.
*
* @return property value of connectionManager
*/
public ConnectionManager getConnectionManager() {
return connectionManager;
}

/**
* Setter method for property <tt>connectionManager</tt>.
*
* @param connectionManager value to be assigned to property connectionManager
*/
public void setConnectionManager(ConnectionManager connectionManager) {
this.connectionManager = connectionManager;
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/alipay/remoting/ConnectionEventType.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@
* @version $Id: ConnectionEventType.java, v 0.1 Mar 4, 2016 8:03:27 PM tao Exp $
*/
public enum ConnectionEventType {
CONNECT, CLOSE, EXCEPTION;
CONNECT, CONNECT_FAILED, CLOSE, EXCEPTION;
}
18 changes: 12 additions & 6 deletions src/main/java/com/alipay/remoting/DefaultConnectionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ public DefaultConnectionManager() {
* @param connectionSelectStrategy connection selection strategy
*/
public DefaultConnectionManager(ConnectionSelectStrategy connectionSelectStrategy) {
this();
this.connTasks = new ConcurrentHashMap<String, RunStateRecordedFutureTask<ConnectionPool>>();
this.healTasks = new ConcurrentHashMap<String, FutureTask<Integer>>();
this.connectionSelectStrategy = connectionSelectStrategy;
}

Expand Down Expand Up @@ -283,10 +284,8 @@ public List<Connection> getAll(String poolKey) {
@Override
public Map<String, List<Connection>> getAll() {
Map<String, List<Connection>> allConnections = new HashMap<String, List<Connection>>();
Iterator<Map.Entry<String, RunStateRecordedFutureTask<ConnectionPool>>> iterator = this
.getConnPools().entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, RunStateRecordedFutureTask<ConnectionPool>> entry = iterator.next();
for (Map.Entry<String, RunStateRecordedFutureTask<ConnectionPool>> entry : this
.getConnPools().entrySet()) {
ConnectionPool pool = FutureTaskUtil.getFutureTaskResult(entry.getValue(), logger);
if (null != pool) {
allConnections.put(entry.getKey(), pool.getAll());
Expand Down Expand Up @@ -425,7 +424,14 @@ public void scan() {
Iterator<String> iter = this.connTasks.keySet().iterator();
while (iter.hasNext()) {
String poolKey = iter.next();
ConnectionPool pool = this.getConnectionPool(this.connTasks.get(poolKey));
RunStateRecordedFutureTask<ConnectionPool> task = this.connTasks.get(poolKey);
if (!task.isDone()) {
logger.info("task(poolKey={}) is not done, do not scan the connection pool",
poolKey);
continue;
}

ConnectionPool pool = this.getConnectionPool(task);
if (null != pool) {
pool.scan();
if (pool.isEmpty()) {
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/com/alipay/remoting/InvokeCallback.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,29 @@

/**
* Invoke callback.
*
*
* @author jiangping
* @version $Id: InvokeCallback.java, v 0.1 2015-9-30 AM10:24:26 tao Exp $
*/
public interface InvokeCallback {

/**
* Response received.
*
*
* @param result
*/
void onResponse(final Object result);

/**
* Exception caught.
*
*
* @param e
*/
void onException(final Throwable e);

/**
* User defined executor.
*
*
* @return
*/
Executor getExecutor();
Expand Down
21 changes: 17 additions & 4 deletions src/main/java/com/alipay/remoting/ReconnectManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

/**
* Reconnect manager.
*
*
* @author yunliang.shi
* @version $Id: ReconnectManager.java, v 0.1 Mar 11, 2016 5:20:50 PM yunliang.shi Exp $
*/
Expand All @@ -47,29 +47,39 @@ public ReconnectManager(ConnectionManager connectionManager) {
this.connectionManager = connectionManager;
this.tasks = new LinkedBlockingQueue<ReconnectTask>();
this.canceled = new CopyOnWriteArrayList<Url>();
// call startup in the constructor to be compatible with version 1.5.x
startup();
}

@Override
public void reconnect(Url url) {
ensureStarted();
tasks.add(new ReconnectTask(url));
}

@Override
public void disableReconnect(Url url) {
ensureStarted();
canceled.add(url);
}

@Override
public void enableReconnect(Url url) {
ensureStarted();
canceled.remove(url);
}

@Override
public void startup() throws LifeCycleException {
super.startup();
// make the startup method idempotent to be compatible with version 1.5.x
synchronized (this) {
if (!isStarted()) {
super.startup();

this.healConnectionThreads = new Thread(new HealConnectionRunner());
this.healConnectionThreads.start();
this.healConnectionThreads = new Thread(new HealConnectionRunner());
this.healConnectionThreads.start();
}
}
}

@Override
Expand All @@ -86,6 +96,7 @@ public void shutdown() throws LifeCycleException {
*/
@Deprecated
public void addCancelUrl(Url url) {
ensureStarted();
disableReconnect(url);
}

Expand All @@ -94,6 +105,7 @@ public void addCancelUrl(Url url) {
*/
@Deprecated
public void removeCancelUrl(Url url) {
ensureStarted();
enableReconnect(url);
}

Expand All @@ -102,6 +114,7 @@ public void removeCancelUrl(Url url) {
*/
@Deprecated
public void addReconnectTask(Url url) {
ensureStarted();
reconnect(url);
}

Expand Down
Loading

0 comments on commit 94c3623

Please sign in to comment.