Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ public class JobTrackerTest {
public static void main(String[] args) {

// 1. 使用mongo做任务队列
testMongoQueue();
// testMongoQueue();
// 2. 使用mysql做任务队列
// testMysqlQueue();
testMysqlQueue();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public void run(Job job) throws Throwable {
bizLogger.info("测试,业务日志啊啊啊啊啊");

try {
Thread.sleep(1000L);
Thread.sleep(100L);
} catch (InterruptedException e) {
e.printStackTrace();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@
import com.lts.job.queue.domain.JobPo;
import com.lts.job.queue.exception.JobQueueException;
import com.lts.job.queue.mysql.support.ResultSetHandlerHolder;
import com.lts.job.store.jdbc.SqlExecutor;

import java.io.InputStream;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;

Expand Down Expand Up @@ -62,41 +60,83 @@ public boolean add(JobPo jobPo) {

@Override
public JobPo take(final String taskTrackerNodeGroup, final String taskTrackerIdentity) {
return getSqlTemplate().executeInTransaction(new SqlExecutor<JobPo>() {
@Override
public JobPo run(Connection conn) throws SQLException {
Long now = System.currentTimeMillis();
// select for update
String selectForUpdateSql = "SELECT *" +
" FROM `{tableName}` " +
" WHERE is_running = ? " +
" AND `trigger_time` < ? " +
" ORDER BY `trigger_time` ASC, `priority` ASC, `gmt_created` ASC " +
" LIMIT 0, 1 FOR UPDATE";
Object[] selectParams = new Object[]{false, now};
JobPo jobPo = getSqlTemplate().query(conn, getRealSql(selectForUpdateSql, taskTrackerNodeGroup),
ResultSetHandlerHolder.JOB_PO_RESULT_SET_HANDLER, selectParams
);
if (jobPo != null) {
String updateSql = "UPDATE `{tableName}` SET " +
"`is_running` = ?, " +
"`task_tracker_identity` = ?, " +
"`gmt_modified` = ?," +
"`prev_exe_time` = ? " +
" WHERE job_id = ?";
Object[] params = new Object[]{
true, taskTrackerIdentity, now, now, jobPo.getJobId()
};
getSqlTemplate().update(conn, getRealSql(updateSql, taskTrackerNodeGroup), params);
/**
* 这里从SELECT FOR UPDATE 优化为 CAS 乐观锁
*/
Long now = DateUtils.currentTimeMillis();
String selectSql = "SELECT *" +
" FROM `{tableName}` " +
" WHERE is_running = ? " +
" AND `trigger_time` < ? " +
" ORDER BY `trigger_time` ASC, `priority` ASC, `gmt_created` ASC " +
" LIMIT 0, 1";
Object[] selectParams = new Object[]{false, now};

jobPo.setIsRunning(true);
jobPo.setTaskTrackerIdentity(taskTrackerIdentity);
jobPo.setGmtModified(now);
jobPo.setPrevExeTime(now);
}
try {
JobPo jobPo = getSqlTemplate().query(getRealSql(selectSql, taskTrackerNodeGroup),
ResultSetHandlerHolder.JOB_PO_RESULT_SET_HANDLER, selectParams);
if (jobPo == null) {
return null;
}

String updateSql = "UPDATE `{tableName}` SET " +
"`is_running` = ?, " +
"`task_tracker_identity` = ?, " +
"`gmt_modified` = ?," +
"`prev_exe_time` = ? " +
" WHERE job_id = ? AND is_running = ?";
Object[] params = new Object[]{
true, taskTrackerIdentity, now, now, jobPo.getJobId(), false
};
// 返回影响的行数
int affectedRow = getSqlTemplate().update(getRealSql(updateSql, taskTrackerNodeGroup), params);
if (affectedRow == 0) {
return take(taskTrackerNodeGroup, taskTrackerIdentity);
} else {
jobPo.setIsRunning(true);
jobPo.setTaskTrackerIdentity(taskTrackerIdentity);
jobPo.setGmtModified(now);
jobPo.setPrevExeTime(now);
return jobPo;
}
});
} catch (SQLException e) {
throw new JobQueueException(e);
}
// return getSqlTemplate().executeInTransaction(new SqlExecutor<JobPo>() {
// @Override
// public JobPo run(Connection conn) throws SQLException {
// Long now = System.currentTimeMillis();
// // select for update
// String selectForUpdateSql = "SELECT *" +
// " FROM `{tableName}` " +
// " WHERE is_running = ? " +
// " AND `trigger_time` < ? " +
// " ORDER BY `trigger_time` ASC, `priority` ASC, `gmt_created` ASC " +
// " LIMIT 0, 1 FOR UPDATE";
// Object[] selectParams = new Object[]{false, now};
// JobPo jobPo = getSqlTemplate().query(conn, getRealSql(selectForUpdateSql, taskTrackerNodeGroup),
// ResultSetHandlerHolder.JOB_PO_RESULT_SET_HANDLER, selectParams
// );
// if (jobPo != null) {
// String updateSql = "UPDATE `{tableName}` SET " +
// "`is_running` = ?, " +
// "`task_tracker_identity` = ?, " +
// "`gmt_modified` = ?," +
// "`prev_exe_time` = ? " +
// " WHERE job_id = ?";
// Object[] params = new Object[]{
// true, taskTrackerIdentity, now, now, jobPo.getJobId()
// };
// getSqlTemplate().update(conn, getRealSql(updateSql, taskTrackerNodeGroup), params);
//
// jobPo.setIsRunning(true);
// jobPo.setTaskTrackerIdentity(taskTrackerIdentity);
// jobPo.setGmtModified(now);
// jobPo.setPrevExeTime(now);
// }
// return jobPo;
// }
// });
}

@Override
Expand Down