Skip to content

Commit

Permalink
finish
Browse files Browse the repository at this point in the history
  • Loading branch information
seaswalker committed Jul 3, 2018
1 parent 7147bad commit 282e394
Show file tree
Hide file tree
Showing 4 changed files with 254 additions and 50 deletions.
206 changes: 158 additions & 48 deletions notes/源码阅读.md
Expand Up @@ -391,32 +391,6 @@ result.put("org.quartz.threadPool.threadCount", "1");

这就意味着,**如果任务在配置的间隔内没有完成,那么下一次任务将不会执行**

## MisFire记录

含义为错过执行。考虑这样的场景:如果一个任务正在执行中尚未完成,那么此时有一此任务执行被触发,那么系统将如何处理?

方式无非两种,并行执行或者不执行。Elastic-job采取的策略是:

记录下此时的状态,即misfire。

### 判定

判定此时为misfire的逻辑为:

**所有分片的任一分片正在执行**

从实现的角度来看,当一个分片在执行时,会在Zookeeper创建一个`/elasticjob/simpleElasticJob/simpleElasticJob/sharding/0/running`节点,所以只要依次判断每个分片对应的执行节点是否存在即可。

### 记录

会为**每个**分片创建:

`/elasticjob/simpleElasticJob/simpleElasticJob/sharding/0/misfile`节点。

### 触发

在什么场景下才会导致任务的重复执行呢?从前面的单线程调度部分可以看出,必定不是本地的Quatz,应该是主节点变化或有节点宕机等事件,有待验证。

## 执行

### 启动记录
Expand Down Expand Up @@ -467,27 +441,6 @@ private void process(final ShardingContexts shardingContexts, final JobExecution

即删除启动记录时创建的临时节点: `/elasticjob/simpleElasticJob/simpleElasticJob/sharding/0/running`.

## Misfire执行

此部分源码依然位于AbstractElasticJobExecutor.execute:

```java
while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE);
}
```

isExecuteMisfired方法决定了misfire执行的条件,可以概括为:

- 设置了分片标记,在前面注册-启动信息-分片标记一节中提到了,这个条件是可以被满足的。
- 我们在配置中没有禁止此特性(默认开启)。
- 有misfire记录,对应前面Misfire记录一节。

总结如下:

Misfire机制对于多次记录只会出发一次执行,并且可能会在导致两次连续的任务执行(第一次正常,第二次misfire)。

# 分片执行机制

## 触发
Expand Down Expand Up @@ -534,7 +487,164 @@ Elastic-job支持自动failover机制,当一个节点宕机时,组件会自

也只能是临时节点。

事件监听由JobCrashedJobListener完成,其核心逻辑如下
事件监听由JobCrashedJobListener完成,其核心逻辑如下:

```java
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
if (isFailoverEnabled() && Type.NODE_REMOVED == eventType && instanceNode.isInstancePath(path)) {
String jobInstanceId = path.substring(instanceNode.getInstanceFullPath().length() + 1);
// 如果是当前节点,直接返回,话说这怎么可能? 除非手动删除
if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) {
return;
}

// 得到被删除节点的分片,准备failover
for (int each : shardingService.getShardingItems(jobInstanceId)) {
failoverService.setCrashedFailoverFlag(each);
failoverService.failoverIfNecessary();
}
}
}
```

## Failover标记

这一步将在Zookeeper中创建如下节点:

`/elastic-job/simpleElasticJob/leader/failover/items/0`

## 执行

核心的逻辑位于FailoverService.failoverIfNecessary:

```java
public void failoverIfNecessary() {
if (needFailover()) {
jobNodeStorage.executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback());
}
}
```

executeInLeader将导致failover在主节点执行。

```java
public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {
try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {
latch.start();
latch.await();
callback.execute();
} catch (final Exception ex) {
handleException(ex);
}
}
```

注意,这里的"主节点"并不是elastic-job的主节点,而是重新选举的另一个主节点(因为选举用的节点不同)。同时因为这里用的是自动释放的语法,所以在逻辑执行完成之后将会自动释放领导权,故所有节点均可以进入callback的execute方法执行。

FailoverLeaderExecutionCallback:

```java
@Override
public void execute() {
// 判断任务是否已被关闭,或者当此节点成为leader时,failover已经执行完毕,此时直接返回即可
if (JobRegistry.getInstance().isShutdown(jobName) || !needFailover()) {
return;
}

// failover的分片
int crashedItem = Integer.parseInt(jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).get(0));

// 删除此分片的failver标记
jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getItemsNode(crashedItem));

// 手动触发
JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
if (null != jobScheduleController) {
jobScheduleController.triggerJob();
}
}
```

一目了然。

# Misfire

首先明确一下,elastic-job的misfire其实就是对quartz同名机制的包装。理解了Quartz的misfire机制elastic-job也就迎刃而解了。

Quartz的misfire可能由两种情况导致:

1. 业务逻辑在Quartz执行线程中执行,同时执行时间太长导致以至于在下一个执行时间点到来时还没有完成,这就是一次misfire。
2. 到达执行时间点时没有空闲的线程资源去执行。

## 阈值

Quartz有如下参数:

```properties
org.quartz.jobStore.misfireThreshold=1
```

下面是其官方文档的定义:

> The the number of milliseconds the scheduler will ‘tolerate’ a trigger to pass its next-fire-time by, before being considered “misfired”. The default value (if you don’t make an entry of this property in your configuration) is 60000 (60 seconds).
Elastic-job在JobScheduler中对此参数进行了配置:

```java
private Properties getBaseQuartzProperties() {
Properties result = new Properties();
result.put("org.quartz.jobStore.misfireThreshold", "1");
return result;
}
```

即超过既定时间点1毫秒就认为是misfire,可见是相当严格的。

## Listener

Quartz提供了监听器TriggerListener对misfire进行监控,elastic-job定义了自己的监听器对其进行记录。JobScheduler.createScheduler:

```java
private Scheduler createScheduler() {
Scheduler result;
try {
StdSchedulerFactory factory = new StdSchedulerFactory();
factory.initialize(getBaseQuartzProperties());
result = factory.getScheduler();
result.getListenerManager().addTriggerListener(schedulerFacade.newJobTriggerListener());
} catch (final SchedulerException ex) {
throw new JobSystemException(ex);
}
return result;
}
```

监听器JobTriggerListener的核心逻辑:

```java
@Override
public void triggerMisfired(final Trigger trigger) {
if (null != trigger.getPreviousFireTime()) {
executionService.setMisfire(shardingService.getLocalShardingItems());
}
}
```

其实就是为当前节点拥有的每一个分片创建一个misfire标志节点:

`/elasticjob/simpleElasticJob/simpleElasticJob/sharding/0/misfile`

## 执行

此部分源码位于AbstractElasticJobExecutor.execute:

```java
while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE);
}
```



5 changes: 4 additions & 1 deletion src/main/java/application/job/SimpleElasticJob.java
Expand Up @@ -16,7 +16,10 @@ public class SimpleElasticJob implements SimpleJob {
public void execute(ShardingContext shardingContext) {
System.out.println(shardingContext);

throw new RuntimeException();
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException ignore) {
}
}

}
2 changes: 1 addition & 1 deletion src/main/resources/application.properties
@@ -1,5 +1,5 @@
spring.elasticjob.serverList=127.0.0.1:2181
simpleJob.cron=0 0 /1 * * ? *
simpleJob.cron=0/5 * * * * ?
simpleJob.shardingTotalCount=2
simpleJob.shardingParameters=0=A,1=B
spring.elasticjob.namespace=elastic-job
91 changes: 91 additions & 0 deletions src/test/java/zookeeper/ZookeeperTest.java
@@ -0,0 +1,91 @@
package zookeeper;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.concurrent.TimeUnit;

/**
* Hello 管理员.
*
* @author skywalker
*/
public class ZookeeperTest {

private CuratorFramework client;

@Before
public void before() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(500, 4);
client = CuratorFrameworkFactory.newClient("localhost:2181", retryPolicy);
client.start();
System.out.println("Zookeeper连接成功.");
}

/**
* 当有一个事件产生时,每一个Listener是否都会收到通知?答案:是.
*/
@Test
public void testListener() throws Exception {
prepareTestData();
System.out.println("创建数据成功");

createListeners();
System.out.println("创建Listener成功");

client.delete().forPath("/skywalker/age");
System.out.println("删除节点成功");

TimeUnit.SECONDS.sleep(10);
}

@Test
public void testLeaderElection() throws Exception {
LeaderLatch latch = new LeaderLatch(client, "/skywalker/latch1");
latch.start();
latch.await();
System.out.println("Latch1选举结果: " + latch.hasLeadership());

LeaderLatch latch2 = new LeaderLatch(client, "/skywalker/latch2");
latch2.start();
latch2.await();
System.out.println("Latch2选举结果: " + latch.hasLeadership());

latch.close();
latch2.close();
}

private void prepareTestData() throws Exception {
client.create().creatingParentContainersIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath("/skywalker/name", "name".getBytes());

client.create().creatingParentContainersIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath("/skywalker/age", new byte[] {20});
}

private void createListeners() throws Exception {
TreeCache treeCache = new TreeCache(client, "/skywalker");

treeCache.getListenable().addListener((client, event) -> System.out.println("Listener1-事件类型: " + event));

treeCache.getListenable().addListener(((client, event) -> System.out.println("Listener2-事件类型: " + event)));

treeCache.start();
}

@After
public void after() {
this.client.close();
}

}

0 comments on commit 282e394

Please sign in to comment.