Skip to content

Commit

Permalink
fix #491 [master]release 0.2.3 (#493)
Browse files Browse the repository at this point in the history
  • Loading branch information
slievrly committed Mar 1, 2019
1 parent 84cd529 commit 50af0be
Show file tree
Hide file tree
Showing 42 changed files with 855 additions and 112 deletions.
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -65,7 +65,7 @@ For more details about principle and design, please go to [Fescar wiki page](htt

## Maven dependency
```xml
<fescar.version>0.2.1</fescar.version>
<fescar.version>0.2.2</fescar.version>

<dependency>
<groupId>com.alibaba.fescar</groupId>
Expand Down
2 changes: 1 addition & 1 deletion common/pom.xml
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>fescar-all</artifactId>
<groupId>com.alibaba.fescar</groupId>
<version>0.2.2</version>
<version>0.2.3</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>fescar-common</artifactId>
Expand Down
@@ -0,0 +1,52 @@
package com.alibaba.fescar.common.thread;
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

/**
* policys for RejectedExecutionHandler
*
* Created by guoyao on 2019/2/26.
*/
public final class RejectedPolicys {

/**
* when rejected happened ,add the new task and run the oldest task
* @return
*/
public static RejectedExecutionHandler runsOldestTaskPolicy() {
return new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (executor.isShutdown()) {
return;
}
BlockingQueue<Runnable> workQueue=executor.getQueue();
Runnable firstWork=workQueue.poll();
boolean newTaskAdd=workQueue.offer(r);
if (firstWork != null) {
firstWork.run();
}
if (!newTaskAdd) {
executor.execute(r);
}
}
};
}
}
Expand Up @@ -188,7 +188,11 @@ private static InetAddress getLocalAddress0() {
LOGGER.error("Could not get local host ip address, will use 127.0.0.1 instead.");
return localAddress;
}

public static void validAddress(InetSocketAddress address) {
if (null == address.getHostName() || 0 == address.getPort()) {
throw new IllegalArgumentException("invalid address:" + address);
}
}
private static boolean isValidAddress(InetAddress address) {
if (address == null || address.isLoopbackAddress()) {
return false;
Expand Down
@@ -0,0 +1,84 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fescar.common.thread;

import org.junit.Test;
import org.testng.Assert;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Created by guoyao on 2019/2/26.
*/
public class RejectedPolicysTest {

private final int DEFAULT_CORE_POOL_SIZE=1;
private final int DEFAULT_KEEP_ALIVE_TIME=10;
private final int MAX_QUEUE_SIZE=1;

@Test
public void testRunsOldestTaskPolicy() throws Exception {
AtomicInteger atomicInteger=new AtomicInteger();
ThreadPoolExecutor poolExecutor=
new ThreadPoolExecutor(DEFAULT_CORE_POOL_SIZE, DEFAULT_CORE_POOL_SIZE, DEFAULT_KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue(MAX_QUEUE_SIZE),
new NamedThreadFactory("OldestRunsPolicy", DEFAULT_CORE_POOL_SIZE), RejectedPolicys.runsOldestTaskPolicy());
CountDownLatch downLatch1=new CountDownLatch(1);
CountDownLatch downLatch2=new CountDownLatch(1);
//task1
poolExecutor.execute(new Runnable() {
@Override
public void run() {
try {
//wait the oldest task of queue count down
downLatch1.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
atomicInteger.getAndAdd(1);
}
});
Assert.assertEquals(atomicInteger.get(), 0);
//task2
poolExecutor.execute(new Runnable() {
@Override
public void run() {
// run second
atomicInteger.getAndAdd(2);
}
});
//task3
poolExecutor.execute(new Runnable() {
@Override
public void run() {
downLatch2.countDown();
atomicInteger.getAndAdd(3);
}
});
//only the task2 run which is the oldest task of queue
Assert.assertEquals(atomicInteger.get(), 2);
downLatch1.countDown();
downLatch2.await();
//run task3
Assert.assertEquals(atomicInteger.get(), 6);

}
}
6 changes: 5 additions & 1 deletion config/pom.xml
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>fescar-all</artifactId>
<groupId>com.alibaba.fescar</groupId>
<version>0.2.2</version>
<version>0.2.3</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>fescar-config</artifactId>
Expand Down Expand Up @@ -48,6 +48,10 @@
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-client</artifactId>
</dependency>
<dependency>
<groupId>com.ctrip.framework.apollo</groupId>
<artifactId>apollo-client</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Expand Up @@ -20,20 +20,15 @@
* The type Abstract configuration.
*
* @param <T> the type parameter
* @Author: jimin.jm @alibaba-inc.com
* @Project: fescar -all
* @DateTime: 2019 /2/1 2:18 PM
* @FileName: AbstractConfiguration
* @Description:
* @author: jimin.jm @alibaba-inc.com
* @date: 2019 /2/1
*/
public abstract class AbstractConfiguration<T> implements Configuration<T> {

/**
* The constant DEFAULT_CONFIG_TIMEOUT.
*/
protected static final long DEFAULT_CONFIG_TIMEOUT = 5 * 1000;
protected static final String FILE_ROOT_REGISTRY = "registry";
protected static final String FILE_CONFIG_SPLIT_CHAR = ".";

@Override
public int getInt(String dataId, int defaultValue, long timeoutMills) {
Expand Down
@@ -0,0 +1,153 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fescar.config;

import com.alibaba.fescar.common.exception.NotSupportYetException;
import com.alibaba.fescar.common.thread.NamedThreadFactory;
import com.ctrip.framework.apollo.Config;
import com.ctrip.framework.apollo.ConfigService;
import com.ctrip.framework.apollo.ConfigChangeListener;
import com.ctrip.framework.apollo.model.ConfigChangeEvent;
import com.google.common.collect.Lists;

import static com.alibaba.fescar.config.ConfigurationKeys.*;

import java.util.*;
import java.util.concurrent.*;

/**
* The type Apollo configuration.
*
* @author: kl @kailing.pub
* @date: 2019/2/27
*/
public class ApolloConfiguration extends AbstractConfiguration<ConfigChangeListener> {

private static final String REGISTRY_TYPE = "apollo";
private static final String APP_ID = "app.id";
private static final String APOLLO_META = "apollo.meta";
private static final Configuration FILE_CONFIG = ConfigurationFactory.FILE_INSTANCE;
private static volatile Config config;
private ExecutorService configOperateExecutor;
private static final int CORE_CONFIG_OPERATE_THREAD = 1;
private static final ConcurrentMap<String, ConfigChangeListener> LISTENER_SERVICE_MAP = new ConcurrentHashMap<>();
private static final int MAX_CONFIG_OPERATE_THREAD = 2;
private static volatile ApolloConfiguration instance;

private ApolloConfiguration() {
readyApolloConfig();
if (null == config) {
synchronized (ApolloConfiguration.class) {
if (null == config) {
config = ConfigService.getAppConfig();
configOperateExecutor = new ThreadPoolExecutor(CORE_CONFIG_OPERATE_THREAD, MAX_CONFIG_OPERATE_THREAD,
Integer.MAX_VALUE, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
new NamedThreadFactory("apolloConfigOperate", MAX_CONFIG_OPERATE_THREAD));
config.addChangeListener(new ConfigChangeListener() {
@Override
public void onChange(ConfigChangeEvent changeEvent) {
for (Map.Entry<String, ConfigChangeListener> entry : LISTENER_SERVICE_MAP.entrySet()) {
if (changeEvent.isChanged(entry.getKey())) {
entry.getValue().onChange(changeEvent);
}
}
}
});
}
}
}
}

public static ApolloConfiguration getInstance() {
if (null == instance) {
synchronized (ApolloConfiguration.class) {
if (null == instance) {
instance = new ApolloConfiguration();
}
}
}
return instance;
}

@Override
public String getConfig(String dataId, String defaultValue, long timeoutMills) {
ConfigFuture configFuture = new ConfigFuture(dataId, defaultValue, ConfigFuture.ConfigOperation.GET, timeoutMills);
configOperateExecutor.submit(new Runnable() {
@Override
public void run() {
String result = config.getProperty(dataId, defaultValue);
configFuture.setResult(result);
}
});
return (String) configFuture.get(timeoutMills, TimeUnit.MILLISECONDS);
}

@Override
public boolean putConfig(String dataId, String content, long timeoutMills) {
throw new NotSupportYetException("not support putConfig");
}

@Override
public boolean putConfigIfAbsent(String dataId, String content, long timeoutMills) {
throw new NotSupportYetException("not support putConfigIfAbsent");
}

@Override
public boolean removeConfig(String dataId, long timeoutMills) {
throw new NotSupportYetException("not support removeConfig");
}

@Override
public void addConfigListener(String dataId, ConfigChangeListener listener) {
LISTENER_SERVICE_MAP.put(dataId, listener);
}

@Override
public void removeConfigListener(String dataId, ConfigChangeListener listener) {
LISTENER_SERVICE_MAP.remove(dataId, listener);
}

@Override
public List<ConfigChangeListener> getConfigListeners(String dataId) {
return Lists.newArrayList(LISTENER_SERVICE_MAP.values());
}

private void readyApolloConfig(){
Properties properties = System.getProperties();
if(!properties.containsKey(APP_ID)){
System.setProperty(APP_ID,FILE_CONFIG.getConfig(getApolloAppIdFileKey()));
}
if(!properties.containsKey(APOLLO_META)){
System.setProperty(APOLLO_META,FILE_CONFIG.getConfig(getApolloMetaFileKey()));
}
}

@Override
public String getTypeName() {
return REGISTRY_TYPE;
}

private static String getApolloMetaFileKey() {
return FILE_ROOT_CONFIG + FILE_CONFIG_SPLIT_CHAR + REGISTRY_TYPE + FILE_CONFIG_SPLIT_CHAR
+ APOLLO_META;
}

private static String getApolloAppIdFileKey() {
return FILE_ROOT_CONFIG + FILE_CONFIG_SPLIT_CHAR + REGISTRY_TYPE + FILE_CONFIG_SPLIT_CHAR
+ APP_ID;
}
}
Expand Up @@ -21,11 +21,8 @@
/**
* The interface Config change listener.
*
* @Author: jimin.jm @alibaba-inc.com
* @Project: fescar -all
* @DateTime: 2018 /12/20 14:41
* @FileName: ConfigChangeListener
* @Description:
* @author: jimin.jm @alibaba-inc.com
* @date: 2018 /12/20
*/
public interface ConfigChangeListener {

Expand Down
Expand Up @@ -25,11 +25,8 @@
/**
* The type Config future.
*
* @Author: jimin.jm @alibaba-inc.com
* @Project: fescar -all
* @DateTime: 2018 /12/20 16:30
* @FileName: ConfigFuture
* @Description:
* @author: jimin.jm @alibaba-inc.com
* @date: 2018 /12/20
*/
public class ConfigFuture {
private static final Logger LOGGER = LoggerFactory.getLogger(ConfigFuture.class);
Expand Down

0 comments on commit 50af0be

Please sign in to comment.