Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
61022d6
supports get log for DLM job.
guowl3 Dec 4, 2023
e78e30d
supports get log for DLM job.
guowl3 Dec 5, 2023
b6af99f
supports get log for DLM job.
guowl3 Dec 5, 2023
cbd90fb
supports get log for DLM job.
guowl3 Dec 5, 2023
df65a7a
supports get log for DLM job.
guowl3 Dec 5, 2023
64a2fcf
update path pattern
guowl3 Dec 6, 2023
2d3b696
code format
guowl3 Dec 6, 2023
506c5ab
Remove redundant code
guowl3 Dec 6, 2023
159ca99
destroy thread pool after task finished.
guowl3 Dec 6, 2023
9072ad1
Merge branch 'dev/4.2.x' into tianke_supports_log_for_dlm
guowl3 Dec 6, 2023
d69ea06
check task parameters when update schedule task.
guowl3 Dec 6, 2023
a0770c4
init cluster name.
guowl3 Dec 6, 2023
e544d0d
fix NPE when variables are not exist.
guowl3 Dec 6, 2023
ea85399
Verify whether the data source is a cluster instance
guowl3 Dec 8, 2023
cfb0ae8
Supports configuring connection timeout.
guowl3 Dec 11, 2023
7af2756
Merge branch 'dev/4.2.x' into feat_423_bixin
guowl3 Dec 12, 2023
a16ceef
Merge branch 'tianke_fix_edit_dlm' into feat_423_bixin
guowl3 Dec 12, 2023
119a1af
Supports configuring sharding strategy.
guowl3 Dec 12, 2023
108be6e
Merge branch 'tianke_supports_log_for_dlm' into feat_423_bixin
guowl3 Dec 12, 2023
7b47409
modify default sharding strategy.
guowl3 Dec 12, 2023
25d76d9
Remove redundant code
guowl3 Dec 12, 2023
ec9aca0
Merge branch 'tianke_supports_log_for_dlm' into feat_423_bixin
guowl3 Dec 12, 2023
1e54daa
fix create dlm job failed in MySql mode.
guowl3 Dec 12, 2023
863bcfd
Merge branch 'tianke_fix_edit_dlm' into feat_423_bixin
guowl3 Dec 12, 2023
9f8ccae
Fix edit limit config and description not taking effect
guowl3 Dec 12, 2023
4344d25
Merge branch 'tianke_fix_edit_dlm' into feat_423_bixin
guowl3 Dec 12, 2023
e6d3afd
Merge branch 'tianke_fix_edit_dlm' into tianke_supports_log_for_dlm
guowl3 Dec 12, 2023
f8c26ec
fix interrupt task failed.
guowl3 Dec 12, 2023
64d11b8
check job status before update.
guowl3 Dec 12, 2023
6e24bab
Merge branch 'tianke_supports_log_for_dlm' into feat_423_bixin
guowl3 Dec 12, 2023
85f551a
fix restart job failed.
guowl3 Dec 12, 2023
bae2d3f
Merge branch 'tianke_supports_log_for_dlm' into feat_423_bixin
guowl3 Dec 12, 2023
ca79929
fix restart job failed.
guowl3 Dec 12, 2023
3135149
Merge branch 'tianke_supports_log_for_dlm' into feat_423_bixin
guowl3 Dec 12, 2023
8f04f5c
fix restart job failed.
guowl3 Dec 12, 2023
0104811
Merge branch 'tianke_supports_log_for_dlm' into feat_423_bixin
guowl3 Dec 12, 2023
ec268b2
fix restart job failed.
guowl3 Dec 12, 2023
499cfab
format
guowl3 Dec 12, 2023
9df8012
update submodule commit id
guowl3 Dec 12, 2023
44a0b9a
do not check cluster name when create job.
guowl3 Dec 12, 2023
504791c
support configuring scan batch size.
guowl3 Dec 13, 2023
3d86de7
Merge branch 'dev/4.2.x' into tianke_supports_log_for_dlm
guowl3 Dec 18, 2023
875d3ab
merge 4.2.x to current branch.
guowl3 Dec 18, 2023
fcc1bd6
Merge branch 'dev/4.2.x' into tianke_supports_log_for_dlm
guowl3 Dec 20, 2023
177438a
Merge branch 'dev/4.2.x' into feat_423_bixin
guowl3 Dec 20, 2023
0d3f1ad
Merge branch 'feat_423_bixin' into tianke_supports_log_for_dlm
guowl3 Dec 20, 2023
1a779f1
change default sharding strategy.
guowl3 Dec 20, 2023
094f654
code format
guowl3 Dec 20, 2023
e8c35dc
response comments
guowl3 Dec 20, 2023
b502f49
Merge branch 'dev/4.2.x' into tianke_supports_log_for_dlm
guowl3 Dec 20, 2023
5fcf0c1
fix sharding strategy configuration didn't work.
guowl3 Dec 20, 2023
e3cefa0
upgrade sdk version
guowl3 Dec 21, 2023
24a8070
fix ut
guowl3 Dec 21, 2023
4517c34
fix description of key
guowl3 Dec 21, 2023
abaf390
update default sharding strategy.
guowl3 Dec 21, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@
<mina.version>2.1.6</mina.version>

<!-- data-lifecycle-manager version -->
<data-lifecycle-manager.version>1.0.6</data-lifecycle-manager.version>
<data-lifecycle-manager.version>1.0.7</data-lifecycle-manager.version>

<!-- plugin version -->
<formatter-maven-plugin.version>2.11.0</formatter-maven-plugin.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package com.oceanbase.odc.metadb.schedule;

import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;

import org.junit.Test;
Expand Down Expand Up @@ -60,6 +62,17 @@ public void updateExecutor() {
Assert.equals(executor, byId.get().getExecutor());
}

@Test
public void listByScheduleIdAndStatus() {
taskRepository.deleteAll();
createScheduleTask();
List<TaskStatus> statuses = new LinkedList<>();
statuses.add(TaskStatus.RUNNING);
statuses.add(TaskStatus.PREPARING);
List<ScheduleTaskEntity> byJobNameAndStatus = taskRepository.findByJobNameAndStatusIn("1", statuses);
Assert.equals(byJobNameAndStatus.size(), 1);
}

private ScheduleTaskEntity createScheduleTask() {
ScheduleTaskEntity entity = new ScheduleTaskEntity();
entity.setStatus(TaskStatus.PREPARING);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.oceanbase.odc.service.schedule.model.ScheduleDetailResp;
import com.oceanbase.odc.service.schedule.model.ScheduleStatus;
import com.oceanbase.odc.service.schedule.model.ScheduleTaskResp;
import com.oceanbase.odc.service.task.model.OdcTaskLogLevel;

import io.swagger.annotations.ApiOperation;

Expand Down Expand Up @@ -140,4 +141,12 @@ public SuccessResponse<ScheduleTaskResp> rollbackTask(@PathVariable Long schedul
return Responses.single(scheduleService.rollbackTask(scheduleId, taskId));
}


@ApiOperation(value = "GetScheduleTaskLog", notes = "获取计划任务日志")
@RequestMapping(value = "/schedules/{scheduleId:[\\d]+}/tasks/{taskId:[\\d]+}/log", method = RequestMethod.GET)
public SuccessResponse<String> getScheduleTaskLog(@PathVariable Long scheduleId, @PathVariable Long taskId,
@RequestParam OdcTaskLogLevel logType) {
return Responses.single(scheduleService.getLog(scheduleId, taskId, logType));
}

}
3 changes: 3 additions & 0 deletions server/odc-server/src/main/resources/data.sql
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,9 @@ INSERT INTO config_system_configuration ( `key`, `value`, `description` ) VALUES
INSERT INTO config_system_configuration ( `key`, `value`, `description` ) VALUES( 'odc.task.dlm.single-task-thread-pool-size', '12', 'DLM 单个任务可用线程数' ) ON DUPLICATE KEY UPDATE `id` = `id`;
INSERT INTO config_system_configuration ( `key`, `value`, `description` ) VALUES( 'odc.task.dlm.thread-pool-size', '100', '单个 POD 中 DLM 任务线程池大小' ) ON DUPLICATE KEY UPDATE `id` = `id`;
INSERT INTO config_system_configuration ( `key`, `value`, `description` ) VALUES( 'odc.task.dlm.support-breakpoint-recovery', 'true', 'DLM 任务是否开启断点恢复' ) ON DUPLICATE KEY UPDATE `id` = `id`;
INSERT INTO config_system_configuration ( `key`, `value`, `description` ) VALUES( 'odc.task.dlm.task-connection-query-timeout-seconds', '180', 'DLM 任务 SQL 超时时间' ) ON DUPLICATE KEY UPDATE `id` = `id`;
INSERT INTO config_system_configuration ( `key`, `value`, `description` ) VALUES( 'odc.task.dlm.sharding-strategy', 'FIXED_LENGTH', 'DLM 分片策略,默认值 FIXED_LENGTH 表示均匀分片,适合小规格实例。使用 MATCH 策略时将出现少量慢 SQL,整体性能会有较大提升,适合大规格实例。' ) ON DUPLICATE KEY UPDATE `id` = `id`;
INSERT INTO config_system_configuration ( `key`, `value`, `description` ) VALUES( 'odc.task.dlm.default-scan-batch-size', '10000', 'DLM 分片大小,默认值 10000 表示分片 SQL 每次会扫描 10000 个主键' ) ON DUPLICATE KEY UPDATE `id` = `id`;

INSERT INTO config_system_configuration ( `key`, `value`, `description` ) VALUES( 'odc.task.datatransfer.use-server-prep-stmts', 'true', '导入导出是否开启 ps 协议,默认为开启' ) ON DUPLICATE KEY UPDATE `id` = `id`;
INSERT INTO config_system_configuration ( `key`, `value`, `description` ) VALUES( 'odc.task.datatransfer.cursor-fetch-size', '20', '导出时游标的 fetch size,默认为 20,最大值为 1000' ) ON DUPLICATE KEY UPDATE `id` = `id`;
Expand Down
50 changes: 48 additions & 2 deletions server/odc-server/src/main/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,51 @@
</Routes>
</Routing>

<Routing name="ScheduleTaskInfoRoutingAppender">
<Routes pattern="${ctx:scheduleTaskId}">
<Route>
<RollingFile name="InfoRolling-${ctx:scheduleTaskId}"
fileName="${LOG_DIRECTORY}/scheduleTask/${ctx:jobName}-${ctx:jobGroup}/${ctx:scheduleTaskId}/log.all"
filePattern="${LOG_DIRECTORY}/scheduleTask/${ctx:jobName}-${ctx:jobGroup}/${ctx:scheduleTaskId}/${date:yyyy-MM}/log-%d{yyyy-MM-dd}-%i.all.gz">
<PatternLayout>
<pattern>%d{yyyy-MM-dd HH:mm:ss} [%p] %m%n</pattern>
</PatternLayout>
<Filters>
<!-- TRACE < DEBUG < INFO < WARN < ERROR < FATAL -->
<ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
</Filters>
<Policies>
<TimeBasedTriggeringPolicy interval="6" modulate="true"/>
<SizeBasedTriggeringPolicy size="256 MB"/>
</Policies>
</RollingFile>
</Route>
</Routes>
</Routing>

<Routing name="ScheduleTaskWarnRoutingAppender">
<Routes pattern="${ctx:scheduleTaskId}">
<Route>
<RollingFile name="WarnRolling-${ctx:scheduleTaskId}"
fileName="${LOG_DIRECTORY}/scheduleTask/${ctx:jobName}-${ctx:jobGroup}/${ctx:scheduleTaskId}/log.warn"
filePattern="${LOG_DIRECTORY}/scheduleTask/${ctx:jobName}-${ctx:jobGroup}/${ctx:scheduleTaskId}/${date:yyyy-MM}/log-%d{yyyy-MM-dd}-%i.warn.gz">
<PatternLayout>
<pattern>%d{yyyy-MM-dd HH:mm:ss} [%p] %m%n</pattern>
</PatternLayout>
<Filters>
<!-- TRACE < DEBUG < INFO < WARN < ERROR < FATAL -->
<ThresholdFilter level="ERROR" onMatch="DENY" onMismatch="NEUTRAL"/>
<ThresholdFilter level="WARN" onMatch="ACCEPT" onMismatch="DENY"/>
</Filters>
<Policies>
<TimeBasedTriggeringPolicy interval="6" modulate="true"/>
<SizeBasedTriggeringPolicy size="256 MB"/>
</Policies>
</RollingFile>
</Route>
</Routes>
</Routing>


</Appenders>
<Loggers>
Expand Down Expand Up @@ -588,8 +633,9 @@
<AppenderRef ref="WarnRoutingOnlineSchemaChangeAppender"/>
</Logger>

<Logger name="com.oceanbase.tools.migrator" level="INFO">
<AppenderRef ref="OdcFileAppender"/>
<Logger name="com.oceanbase.tools.migrator" level="INFO" additivity="true">
<AppenderRef ref="ScheduleTaskInfoRoutingAppender"/>
<AppenderRef ref="ScheduleTaskWarnRoutingAppender"/>
</Logger>

<!-- ODC程序日志输出,输出级别 INFO -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.oceanbase.odc.metadb.schedule;

import java.util.List;

import javax.transaction.Transactional;

import org.springframework.data.jpa.repository.JpaRepository;
Expand All @@ -35,6 +37,8 @@
public interface ScheduleTaskRepository extends JpaRepository<ScheduleTaskEntity, Long>,
JpaSpecificationExecutor<ScheduleTaskEntity> {

List<ScheduleTaskEntity> findByJobNameAndStatusIn(String jobName, List<TaskStatus> statuses);

@Transactional
@Modifying
@Query("update ScheduleTaskEntity st set st.status = ?2 where st.id = ?1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.oceanbase.tools.migrator.common.enums.ShardingStrategy;
import com.oceanbase.tools.migrator.core.IJobStore;

import lombok.extern.slf4j.Slf4j;
Expand All @@ -44,13 +45,25 @@ public class DLMConfiguration {
@Value("${odc.task.dlm.single-task-thread-pool-size:15}")
private int singleTaskThreadPoolSize;

@Value("${odc.task.dlm.task-connection-query-timeout-seconds:180}")
private int taskConnectionQueryTimeout;

@Value("${odc.task.dlm.sharding-strategy:FIXED_LENGTH}")
private ShardingStrategy shardingStrategy;

@Value("${odc.task.dlm.default-scan-batch-size:10000}")
private int defaultScanBatchSize;

@Bean
public JobMetaFactory jobMetaFactory(IJobStore jobStore) {
JobMetaFactory jobMetaFactory = new JobMetaFactory();
jobMetaFactory.setJobStore(jobStore);
jobMetaFactory.setExecutorService(Executors.newFixedThreadPool(dlmThreadPoolSize));
jobMetaFactory.setSingleTaskThreadPoolSize(singleTaskThreadPoolSize);
jobMetaFactory.setReadWriteRatio(readWriteRatio);
jobMetaFactory.setTaskConnectionQueryTimeout(taskConnectionQueryTimeout);
jobMetaFactory.setDefaultShardingStrategy(shardingStrategy);
jobMetaFactory.setDefaultScanBatchSize(defaultScanBatchSize);
return jobMetaFactory;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public static DataSourceInfo build(ConnectionConfig connectionConfig) {
.setFullUserName(OBConsoleDataSourceFactory.getUsername(connectionConfig));
dataSourceInfo.setDbType(DataBaseType.OCEANBASEV10.name());
dataSourceInfo.setSysUser(connectionConfig.getSysTenantUsername());
dataSourceInfo.setUserLocalProxy(false);
dataSourceInfo.setClusterName(connectionConfig.getClusterName());
if (StringUtils.isNotEmpty(connectionConfig.getSysTenantPassword())) {
try {
dataSourceInfo.setSysPassword(EncryptUtils.encode(connectionConfig.getSysTenantPassword()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.oceanbase.tools.migrator.common.dto.HistoryJob;
import com.oceanbase.tools.migrator.common.enums.JobStatus;
import com.oceanbase.tools.migrator.common.enums.JobType;
import com.oceanbase.tools.migrator.common.enums.ShardingStrategy;
import com.oceanbase.tools.migrator.core.AbstractJobMetaFactory;
import com.oceanbase.tools.migrator.core.JobReq;
import com.oceanbase.tools.migrator.core.meta.ClusterMeta;
Expand All @@ -41,8 +42,14 @@ public class JobMetaFactory extends AbstractJobMetaFactory {

private final SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
private int singleTaskThreadPoolSize;

private int taskConnectionQueryTimeout;
private double readWriteRatio;

private int defaultScanBatchSize;

private ShardingStrategy defaultShardingStrategy;

public JobMeta create(DlmTask parameters) throws Exception {
HistoryJob historyJob = new HistoryJob();
historyJob.setId(parameters.getId());
Expand All @@ -58,18 +65,23 @@ public JobMeta create(DlmTask parameters) throws Exception {
LogicTableConfig logicTableConfig = parameters.getLogicTableConfig();
logicTableConfig.setReaderTaskCount((int) (singleTaskThreadPoolSize * readWriteRatio / (1 + readWriteRatio)));
logicTableConfig.setWriterTaskCount(singleTaskThreadPoolSize - logicTableConfig.getReaderTaskCount());
logicTableConfig.setGeneratorBatchSize(defaultScanBatchSize);
DataSourceInfo sourceInfo = DataSourceInfoBuilder.build(parameters.getSourceDs());
DataSourceInfo targetInfo = DataSourceInfoBuilder.build(parameters.getTargetDs());
sourceInfo.setConnectionCount(2 * (logicTableConfig.getReaderTaskCount()
+ parameters.getLogicTableConfig().getWriterTaskCount()));
targetInfo.setConnectionCount(2 * (logicTableConfig.getReaderTaskCount()
+ parameters.getLogicTableConfig().getWriterTaskCount()));
sourceInfo.setQueryTimeout(taskConnectionQueryTimeout);
targetInfo.setQueryTimeout(taskConnectionQueryTimeout);
log.info("Begin to create dlm job,params={}", logicTableConfig);
// ClusterMeta and TenantMeta used to calculate min limit size.
JobReq req =
new JobReq(historyJob, parameters.getLogicTableConfig(), sourceInfo, targetInfo, new ClusterMeta(),
new ClusterMeta(), new TenantMeta(), new TenantMeta());
return super.create(req);
JobMeta jobMeta = super.create(req);
jobMeta.setShardingStrategy(defaultShardingStrategy);
return jobMeta;
}

public void setReadWriteRatio(double readWriteRatio) {
Expand All @@ -80,4 +92,16 @@ public void setSingleTaskThreadPoolSize(int singleTaskThreadPoolSize) {
this.singleTaskThreadPoolSize = singleTaskThreadPoolSize;
}

public void setTaskConnectionQueryTimeout(int taskConnectionQueryTimeout) {
this.taskConnectionQueryTimeout = taskConnectionQueryTimeout;
}

public void setDefaultShardingStrategy(ShardingStrategy defaultShardingStrategy) {
this.defaultShardingStrategy = defaultShardingStrategy;
}

public void setDefaultScanBatchSize(int defaultScanBatchSize) {
this.defaultScanBatchSize = defaultScanBatchSize;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.commons.collections4.CollectionUtils;

import com.oceanbase.odc.service.dlm.model.OffsetConfig;

/**
Expand Down Expand Up @@ -58,6 +61,9 @@ private static String replaceVariables(String condition, Map<String, String> var
}

private static Map<String, String> getVariablesMap(List<OffsetConfig> variables) {
if (CollectionUtils.isEmpty(variables)) {
return Collections.emptyMap();
}
Map<String, String> map = new HashMap<>();
variables.forEach(obj -> {
if (map.containsKey(obj.getName())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,13 @@ public void execute(JobExecutionContext context) throws JobExecutionException {
log.info("Start to run new job,jobKey={}", context.getJobDetail().getKey());
try {
odcJob = getOdcJob(context);
odcJob.before(context);
run(context);
} catch (Exception e) {
log.warn("Job execute failed,job key={},fire time={}.",
context.getJobDetail().getKey(), context.getFireTime(), e);
} finally {
odcJob.after(context);
}

log.info("Job done,jobKey={}", context.getJobDetail().getKey());
Expand Down
Loading