diff --git a/src/backend/commons/cmdb-sdk-model/src/main/java/com/tencent/bk/job/common/cc/model/req/FindModuleHostRelationReq.java b/src/backend/commons/cmdb-sdk-model/src/main/java/com/tencent/bk/job/common/cc/model/req/FindModuleHostRelationReq.java index edadd4973c..04a08df5e8 100644 --- a/src/backend/commons/cmdb-sdk-model/src/main/java/com/tencent/bk/job/common/cc/model/req/FindModuleHostRelationReq.java +++ b/src/backend/commons/cmdb-sdk-model/src/main/java/com/tencent/bk/job/common/cc/model/req/FindModuleHostRelationReq.java @@ -43,11 +43,11 @@ public class FindModuleHostRelationReq extends EsbReq { private List moduleIdList; @JsonProperty("module_fields") - private List moduleFields = Arrays.asList("bk_module_id", "bk_set_id"); + private List moduleFields = Arrays.asList("bk_module_id", "bk_set_id", "last_time"); @JsonProperty("host_fields") private List hostFields = Arrays.asList("bk_host_id", "bk_host_innerip", "bk_host_innerip_v6", - "bk_agent_id", "bk_host_name", "bk_os_name", "bk_cloud_id", "bk_os_type", "bk_cloud_vendor"); + "bk_agent_id", "bk_host_name", "bk_os_name", "bk_cloud_id", "bk_os_type", "bk_cloud_vendor", "last_time"); private Page page; } diff --git a/src/backend/commons/cmdb-sdk-model/src/main/java/com/tencent/bk/job/common/cc/model/result/FindHostByModuleResult.java b/src/backend/commons/cmdb-sdk-model/src/main/java/com/tencent/bk/job/common/cc/model/result/FindHostByModuleResult.java deleted file mode 100644 index 243ace4493..0000000000 --- a/src/backend/commons/cmdb-sdk-model/src/main/java/com/tencent/bk/job/common/cc/model/result/FindHostByModuleResult.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available. - * - * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. - * - * BK-JOB蓝鲸智云作业平台 is licensed under the MIT License. - * - * License for BK-JOB蓝鲸智云作业平台: - * -------------------------------------------------------------------- - * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated - * documentation files (the "Software"), to deal in the Software without restriction, including without limitation - * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and - * to permit persons to whom the Software is furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all copies or substantial portions of - * the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO - * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF - * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS - * IN THE SOFTWARE. - */ - -package com.tencent.bk.job.common.cc.model.result; - -import com.fasterxml.jackson.annotation.JsonProperty; -import lombok.Getter; -import lombok.Setter; -import lombok.ToString; - -import java.util.List; - -public class FindHostByModuleResult extends AbstractCcSearchResult { - @Getter - @Setter - @ToString - public static class HostWithTopoInfo { - private List biz; - private HostProp host; - private List module; - private List set; - } - - @Getter - @Setter - @ToString - public static class BizProp { - @JsonProperty("bk_biz_id") - private Long appId; - @JsonProperty("bk_biz_name") - private String appName; - @JsonProperty("bk_biz_maintainer") - private String appMaintainer; - } - - @Getter - @Setter - @ToString - public static class HostProp { - @JsonProperty("bk_host_id") - private Long hostId; - @JsonProperty("bk_host_innerip") - private String ip; - @JsonProperty("bk_host_name") - private String hostName; - @JsonProperty("bk_os_name") - private String os; - @JsonProperty("bk_cloud_id") - private List cloudAreaList; - } - - @Getter - @Setter - @ToString - public static class CloudAreaProp { - @JsonProperty("bk_inst_id") - private Long cloudId; - @JsonProperty("bk_inst_name") - private String cloudAreaName; - } - - @Getter - @Setter - @ToString - public static class ModuleProp { - @JsonProperty("bk_module_id") - private Long moduleId; - @JsonProperty("bk_set_id") - private Long setId; - @JsonProperty("bk_module_name") - private String moduleName; - @JsonProperty("bk_module_type") - private String moduleType; - } - - @Getter - @Setter - @ToString - public static class SetProp { - @JsonProperty("bk_biz_id") - private Long appId; - @JsonProperty("bk_set_id") - private Long setId; - @JsonProperty("bk_set_name") - private String setName; - } -} diff --git a/src/backend/commons/cmdb-sdk-model/src/main/java/com/tencent/bk/job/common/cc/model/result/FindModuleHostRelationResult.java b/src/backend/commons/cmdb-sdk-model/src/main/java/com/tencent/bk/job/common/cc/model/result/FindModuleHostRelationResult.java index a71f2e9364..566146866e 100644 --- a/src/backend/commons/cmdb-sdk-model/src/main/java/com/tencent/bk/job/common/cc/model/result/FindModuleHostRelationResult.java +++ b/src/backend/commons/cmdb-sdk-model/src/main/java/com/tencent/bk/job/common/cc/model/result/FindModuleHostRelationResult.java @@ -24,11 +24,7 @@ package com.tencent.bk.job.common.cc.model.result; -import com.fasterxml.jackson.annotation.JsonProperty; import lombok.Data; -import lombok.Getter; -import lombok.Setter; -import lombok.ToString; import java.util.List; @@ -39,47 +35,4 @@ public class FindModuleHostRelationResult { private List relation; - @Getter - @Setter - @ToString - public static class HostWithModules { - private HostProp host; - private List modules; - } - - @Getter - @Setter - @ToString - public static class HostProp { - @JsonProperty("bk_host_id") - private Long hostId; - @JsonProperty("bk_host_innerip") - private String ip; - @JsonProperty("bk_host_innerip_v6") - private String ipv6; - @JsonProperty("bk_agent_id") - private String agentId; - @JsonProperty("bk_host_name") - private String hostName; - @JsonProperty("bk_os_name") - private String osName; - @JsonProperty("bk_os_type") - private String osType; - @JsonProperty("bk_cloud_id") - private Long cloudAreaId = 0L; - @JsonProperty("bk_cloud_vendor") - private String cloudVendorId; - } - - @Getter - @Setter - @ToString - public static class ModuleProp { - @JsonProperty("bk_module_id") - private Long moduleId; - @JsonProperty("bk_set_id") - private Long setId; - @JsonProperty("bk_module_type") - private String moduleType; - } } diff --git a/src/backend/commons/cmdb-sdk-model/src/main/java/com/tencent/bk/job/common/cc/model/result/HostEventDetail.java b/src/backend/commons/cmdb-sdk-model/src/main/java/com/tencent/bk/job/common/cc/model/result/HostEventDetail.java index b4d2b5bbbf..6ab2a000ec 100644 --- a/src/backend/commons/cmdb-sdk-model/src/main/java/com/tencent/bk/job/common/cc/model/result/HostEventDetail.java +++ b/src/backend/commons/cmdb-sdk-model/src/main/java/com/tencent/bk/job/common/cc/model/result/HostEventDetail.java @@ -28,6 +28,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.tencent.bk.job.common.model.dto.ApplicationHostDTO; import com.tencent.bk.job.common.util.StringUtil; +import com.tencent.bk.job.common.util.TimeUtil; import lombok.Data; import java.util.List; @@ -62,6 +63,9 @@ public class HostEventDetail { @JsonProperty("bk_cloud_vendor") private String cloudVendorId; + @JsonProperty("last_time") + private String lastTime; + public static ApplicationHostDTO toHostInfoDTO(HostEventDetail eventDetail) { ApplicationHostDTO hostInfoDTO = new ApplicationHostDTO(); hostInfoDTO.setHostId(eventDetail.hostId); @@ -78,6 +82,7 @@ public static ApplicationHostDTO toHostInfoDTO(HostEventDetail eventDetail) { hostInfoDTO.setOsType(eventDetail.osType); hostInfoDTO.setCloudAreaId(Long.parseLong(eventDetail.getCloudId())); hostInfoDTO.setCloudVendorId(eventDetail.cloudVendorId); + hostInfoDTO.setLastTime(TimeUtil.parseZonedTime(eventDetail.getLastTime())); return hostInfoDTO; } } diff --git a/src/backend/commons/cmdb-sdk-model/src/main/java/com/tencent/bk/job/common/cc/model/result/HostProp.java b/src/backend/commons/cmdb-sdk-model/src/main/java/com/tencent/bk/job/common/cc/model/result/HostProp.java new file mode 100644 index 0000000000..394379c84e --- /dev/null +++ b/src/backend/commons/cmdb-sdk-model/src/main/java/com/tencent/bk/job/common/cc/model/result/HostProp.java @@ -0,0 +1,85 @@ +/* + * Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-JOB蓝鲸智云作业平台 is licensed under the MIT License. + * + * License for BK-JOB蓝鲸智云作业平台: + * -------------------------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO + * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +package com.tencent.bk.job.common.cc.model.result; + + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.tencent.bk.job.common.model.dto.ApplicationHostDTO; +import com.tencent.bk.job.common.util.StringUtil; +import com.tencent.bk.job.common.util.TimeUtil; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + +import java.util.List; + +@Getter +@Setter +@ToString +public class HostProp { + @JsonProperty("bk_host_id") + private Long hostId; + @JsonProperty("bk_host_innerip") + private String ip; + @JsonProperty("bk_host_innerip_v6") + private String ipv6; + @JsonProperty("bk_agent_id") + private String agentId; + @JsonProperty("bk_host_name") + private String hostName; + @JsonProperty("bk_os_name") + private String osName; + @JsonProperty("bk_os_type") + private String osType; + @JsonProperty("bk_cloud_id") + private Long cloudAreaId = 0L; + @JsonProperty("bk_cloud_vendor") + private String cloudVendorId; + @JsonProperty("last_time") + private String lastTime; + + public ApplicationHostDTO toApplicationHostDTO() { + ApplicationHostDTO applicationHostDTO = new ApplicationHostDTO(); + applicationHostDTO.setHostId(hostId); + List ipList = StringUtil.strToList(ip, String.class, ","); + applicationHostDTO.setIpList(ipList); + if (ipList != null && !ipList.isEmpty()) { + applicationHostDTO.setIp(ipList.get(0)); + } + applicationHostDTO.setDisplayIp(ip); + applicationHostDTO.setIpv6(ipv6); + applicationHostDTO.setAgentId(agentId); + int hostNameMaxLength = 2000; + int osNameMaxLength = 512; + int osTypeNameMaxLength = 32; + applicationHostDTO.setHostName(StringUtil.substring(hostName, hostNameMaxLength)); + applicationHostDTO.setOsName(StringUtil.substring(osName, osNameMaxLength)); + applicationHostDTO.setOsType(StringUtil.substring(osType, osTypeNameMaxLength)); + applicationHostDTO.setCloudAreaId(cloudAreaId); + applicationHostDTO.setCloudVendorId(cloudVendorId); + applicationHostDTO.setLastTime(TimeUtil.parseZonedTime(lastTime)); + return applicationHostDTO; + } +} diff --git a/src/backend/commons/cmdb-sdk-model/src/main/java/com/tencent/bk/job/common/cc/model/result/HostRelationEventDetail.java b/src/backend/commons/cmdb-sdk-model/src/main/java/com/tencent/bk/job/common/cc/model/result/HostRelationEventDetail.java index 4a3d6577f3..460955967e 100644 --- a/src/backend/commons/cmdb-sdk-model/src/main/java/com/tencent/bk/job/common/cc/model/result/HostRelationEventDetail.java +++ b/src/backend/commons/cmdb-sdk-model/src/main/java/com/tencent/bk/job/common/cc/model/result/HostRelationEventDetail.java @@ -42,4 +42,7 @@ public class HostRelationEventDetail { @JsonProperty("bk_module_id") private Long moduleId; + + @JsonProperty("last_time") + private String lastTime; } diff --git a/src/backend/commons/cmdb-sdk-model/src/main/java/com/tencent/bk/job/common/cc/model/result/HostWithModules.java b/src/backend/commons/cmdb-sdk-model/src/main/java/com/tencent/bk/job/common/cc/model/result/HostWithModules.java new file mode 100644 index 0000000000..c0c5acd1cf --- /dev/null +++ b/src/backend/commons/cmdb-sdk-model/src/main/java/com/tencent/bk/job/common/cc/model/result/HostWithModules.java @@ -0,0 +1,40 @@ +/* + * Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-JOB蓝鲸智云作业平台 is licensed under the MIT License. + * + * License for BK-JOB蓝鲸智云作业平台: + * -------------------------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO + * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +package com.tencent.bk.job.common.cc.model.result; + +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + +import java.util.List; + + +@Getter +@Setter +@ToString +public class HostWithModules { + private HostProp host; + private List modules; +} diff --git a/src/backend/commons/cmdb-sdk-model/src/main/java/com/tencent/bk/job/common/cc/model/result/ModuleProp.java b/src/backend/commons/cmdb-sdk-model/src/main/java/com/tencent/bk/job/common/cc/model/result/ModuleProp.java new file mode 100644 index 0000000000..ae108111cd --- /dev/null +++ b/src/backend/commons/cmdb-sdk-model/src/main/java/com/tencent/bk/job/common/cc/model/result/ModuleProp.java @@ -0,0 +1,45 @@ +/* + * Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-JOB蓝鲸智云作业平台 is licensed under the MIT License. + * + * License for BK-JOB蓝鲸智云作业平台: + * -------------------------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO + * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +package com.tencent.bk.job.common.cc.model.result; + + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + +@Getter +@Setter +@ToString +public class ModuleProp { + @JsonProperty("bk_module_id") + private Long moduleId; + @JsonProperty("bk_set_id") + private Long setId; + @JsonProperty("bk_module_type") + private String moduleType; + @JsonProperty("last_time") + private String lastTime; +} diff --git a/src/backend/commons/cmdb-sdk/src/main/java/com/tencent/bk/job/common/cc/sdk/BizCmdbClient.java b/src/backend/commons/cmdb-sdk/src/main/java/com/tencent/bk/job/common/cc/sdk/BizCmdbClient.java index 06a38800e9..899be91879 100644 --- a/src/backend/commons/cmdb-sdk/src/main/java/com/tencent/bk/job/common/cc/sdk/BizCmdbClient.java +++ b/src/backend/commons/cmdb-sdk/src/main/java/com/tencent/bk/job/common/cc/sdk/BizCmdbClient.java @@ -69,9 +69,12 @@ import com.tencent.bk.job.common.cc.model.result.GetBizInternalModuleResult; import com.tencent.bk.job.common.cc.model.result.HostBizRelationDTO; import com.tencent.bk.job.common.cc.model.result.HostEventDetail; +import com.tencent.bk.job.common.cc.model.result.HostProp; import com.tencent.bk.job.common.cc.model.result.HostRelationEventDetail; +import com.tencent.bk.job.common.cc.model.result.HostWithModules; import com.tencent.bk.job.common.cc.model.result.ListBizHostResult; import com.tencent.bk.job.common.cc.model.result.ListHostsWithoutBizResult; +import com.tencent.bk.job.common.cc.model.result.ModuleProp; import com.tencent.bk.job.common.cc.model.result.ResourceWatchResult; import com.tencent.bk.job.common.cc.model.result.SearchAppResult; import com.tencent.bk.job.common.cc.model.result.SearchCloudAreaResult; @@ -429,12 +432,13 @@ public InstanceTopologyDTO getBizInternalModuleFromCMDB(long bizId) { @Override public List getHosts(long bizId, List ccInstList) { - return getHostsByTopology(bizId, ccInstList); + List hostWithModuleList = getHostRelationsByTopology(bizId, ccInstList); + return convertToHostInfoDTOList(bizId, hostWithModuleList); } @Override - public List getHostsByTopology(long bizId, List ccInstList) { - StopWatch watch = new StopWatch("getHostsByTopology"); + public List getHostRelationsByTopology(long bizId, List ccInstList) { + StopWatch watch = new StopWatch("getHostRelationsByTopology"); watch.start("getCachedBizInstCompleteTopology"); InstanceTopologyDTO appCompleteTopology = getCachedBizInstCompleteTopology(bizId); watch.stop(); @@ -454,22 +458,33 @@ public List getHostsByTopology(long bizId, List applicationHostDTOList = findHostByModule(bizId, + watch.start("findHostRelationByModule"); + List hostWithModulesList = findHostRelationByModule(bizId, new ArrayList<>(moduleIdSet)); watch.stop(); if (watch.getTotalTimeMillis() > 1000L) { - log.warn("Get hosts by topo is slow, bizId: {}, ccInsts: {}, watchInfo: {}", bizId, ccInstList, + log.warn("Get hostRelations by topo is slow, bizId: {}, ccInsts: {}, watchInfo: {}", bizId, ccInstList, watch.prettyPrint()); } - return applicationHostDTOList; + hostWithModulesList = hostWithModulesList.stream().filter(hostWithModules -> { + boolean valid = hostWithModules.getHost() != null && hostWithModules.getHost().getHostId() != null; + if (!valid) { + log.warn("Ignore hostWithModules because of null host/hostId:{}", hostWithModules); + } + if (hostWithModules.getModules() == null) { + log.warn("Ignore hostWithModules because of null modules:{}", hostWithModules); + valid = false; + } + return valid; + }).collect(Collectors.toList()); + return hostWithModulesList; } @Override - public List findHostByModule(long bizId, List moduleIdList) { + public List findHostRelationByModule(long bizId, List moduleIdList) { //moduleId分批 - List resultList = new ArrayList<>(); + List resultList = new ArrayList<>(); int batchSize = 200; int start = 0; int end = start + batchSize; @@ -478,7 +493,6 @@ public List findHostByModule(long bizId, List moduleId do { List moduleIdSubList = moduleIdList.subList(start, end); if (moduleIdSubList.size() > 0) { - // 使用find_module_host_relation接口 resultList.addAll(findModuleHostRelationConcurrently(bizId, moduleIdSubList)); } start += batchSize; @@ -513,20 +527,19 @@ private FindModuleHostRelationReq genFindModuleHostRelationReq(long bizId, List< * @param moduleIdList 模块ID列表 * @return 主机列表 */ - private List findModuleHostRelationConcurrently(long bizId, List moduleIdList) { + private List findModuleHostRelationConcurrently(long bizId, + List moduleIdList) { if (moduleIdList == null || moduleIdList.isEmpty()) { return Collections.emptyList(); } - - List applicationHostDTOList; int start = 0; //已调优 int limit = 500; FindModuleHostRelationReq req = genFindModuleHostRelationReq(bizId, moduleIdList, start, limit); //先拉一次获取总数 FindModuleHostRelationResult pageData = getHostsByReq(req); - List hostWithModulesList = pageData.getRelation(); - LinkedBlockingQueue resultQueue = + List hostWithModulesList = pageData.getRelation(); + LinkedBlockingQueue resultQueue = new LinkedBlockingQueue<>(hostWithModulesList); // 如果该页未达到limit,说明是最后一页 if (pageData.getCount() <= limit) { @@ -563,19 +576,12 @@ private List findModuleHostRelationConcurrently(long bizId, log.warn("bizId {}:{} hosts in total, {} hosts indeed, CMDB interface params invalid", bizId, pageData.getCount(), resultQueue.size()); } - Long startTime = System.currentTimeMillis(); - applicationHostDTOList = convertToHostInfoDTOList(bizId, new ArrayList<>(resultQueue)); - Long endTime = System.currentTimeMillis(); - long timeConsuming = endTime - startTime; - if (timeConsuming >= 1000) { - log.info("convertToHostInfoDTOList time consuming:" + timeConsuming); - } - return applicationHostDTOList; + return new ArrayList<>(resultQueue); } private void fillAgentInfo( ApplicationHostDTO applicationHostDTO, - FindModuleHostRelationResult.HostProp host + HostProp host ) { String multiIp = host.getIp(); multiIp = multiIp.trim(); @@ -590,9 +596,9 @@ private void fillAgentInfo( private ApplicationHostDTO convertToHostInfoDTO( Long bizId, - FindModuleHostRelationResult.HostWithModules hostWithModules + HostWithModules hostWithModules ) { - FindModuleHostRelationResult.HostProp host = hostWithModules.getHost(); + HostProp host = hostWithModules.getHost(); String multiIp = host.getIp(); if (multiIp != null) { multiIp = multiIp.trim(); @@ -609,21 +615,21 @@ private ApplicationHostDTO convertToHostInfoDTO( applicationHostDTO.setHostId(host.getHostId()); applicationHostDTO.setCloudVendorId(host.getCloudVendorId()); fillAgentInfo(applicationHostDTO, host); - List modules = hostWithModules.getModules(); - for (FindModuleHostRelationResult.ModuleProp module : modules) { + List modules = hostWithModules.getModules(); + for (ModuleProp module : modules) { if (module == null || null == module.getModuleId()) { log.warn("invalid host:" + JsonUtils.toJson(applicationHostDTO)); } } - List validModules = + List validModules = hostWithModules.getModules().stream().filter(Objects::nonNull).collect(Collectors.toList()); applicationHostDTO.setModuleId( validModules.stream() - .map(FindModuleHostRelationResult.ModuleProp::getModuleId) + .map(ModuleProp::getModuleId) .collect(Collectors.toList())); applicationHostDTO.setSetId( validModules.stream() - .map(FindModuleHostRelationResult.ModuleProp::getSetId) + .map(ModuleProp::getSetId) .collect(Collectors.toList())); applicationHostDTO.setModuleType(validModules.stream().map(it -> { try { @@ -645,12 +651,12 @@ private ApplicationHostDTO convertToHostInfoDTO( private List convertToHostInfoDTOList( long bizId, - List hostWithModulesList + List hostWithModulesList ) { List applicationHostDTOList = new ArrayList<>(); Set ipSet = new HashSet<>(); - for (FindModuleHostRelationResult.HostWithModules hostWithModules : hostWithModulesList) { - FindModuleHostRelationResult.HostProp host = hostWithModules.getHost(); + for (HostWithModules hostWithModules : hostWithModulesList) { + HostProp host = hostWithModules.getHost(); if (host == null) { log.warn("host=null,hostWithTopoInfo={}", JsonUtils.toJson(hostWithModules)); continue; @@ -1317,7 +1323,7 @@ public ResourceWatchResult getHostEvents(Long startTime, String ResourceWatchReq req = makeBaseReqByWeb( ResourceWatchReq.class, null, defaultUin, defaultSupplierAccount); req.setFields(Arrays.asList("bk_host_id", "bk_host_innerip", "bk_host_innerip_v6", "bk_agent_id", - "bk_host_name", "bk_os_name", "bk_os_type", "bk_cloud_id", "bk_cloud_vendor")); + "bk_host_name", "bk_os_name", "bk_os_type", "bk_cloud_id", "bk_cloud_vendor", "last_time")); req.setResource("host"); req.setCursor(cursor); req.setStartTime(startTime); @@ -1331,7 +1337,7 @@ public ResourceWatchResult getHostEvents(Long startTime, String public ResourceWatchResult getHostRelationEvents(Long startTime, String cursor) { ResourceWatchReq req = makeBaseReqByWeb( ResourceWatchReq.class, null, defaultUin, defaultSupplierAccount); - req.setFields(Arrays.asList("bk_host_id", "bk_biz_id", "bk_set_id", "bk_module_id")); + req.setFields(Arrays.asList("bk_host_id", "bk_biz_id", "bk_set_id", "bk_module_id", "last_time")); req.setResource("host_relation"); req.setCursor(cursor); req.setStartTime(startTime); @@ -1358,11 +1364,11 @@ public ResourceWatchResult getAppEvents(Long startTime, String c class FindModuleHostRelationTask implements Runnable { //结果队列 - LinkedBlockingQueue resultQueue; + LinkedBlockingQueue resultQueue; FindModuleHostRelationReq req; String requestId; - FindModuleHostRelationTask(LinkedBlockingQueue resultQueue, + FindModuleHostRelationTask(LinkedBlockingQueue resultQueue, FindModuleHostRelationReq req, String requestId) { this.resultQueue = resultQueue; this.req = req; diff --git a/src/backend/commons/cmdb-sdk/src/main/java/com/tencent/bk/job/common/cc/sdk/IBizCmdbClient.java b/src/backend/commons/cmdb-sdk/src/main/java/com/tencent/bk/job/common/cc/sdk/IBizCmdbClient.java index d4acc09920..b97eeb5236 100644 --- a/src/backend/commons/cmdb-sdk/src/main/java/com/tencent/bk/job/common/cc/sdk/IBizCmdbClient.java +++ b/src/backend/commons/cmdb-sdk/src/main/java/com/tencent/bk/job/common/cc/sdk/IBizCmdbClient.java @@ -37,6 +37,7 @@ import com.tencent.bk.job.common.cc.model.result.HostBizRelationDTO; import com.tencent.bk.job.common.cc.model.result.HostEventDetail; import com.tencent.bk.job.common.cc.model.result.HostRelationEventDetail; +import com.tencent.bk.job.common.cc.model.result.HostWithModules; import com.tencent.bk.job.common.cc.model.result.ResourceWatchResult; import com.tencent.bk.job.common.model.dto.ApplicationDTO; import com.tencent.bk.job.common.model.dto.ApplicationHostDTO; @@ -83,22 +84,22 @@ public interface IBizCmdbClient { List getHosts(long bizId, List ccInstList); /** - * 根据topo实例获取hosts + * 根据topo实例获取主机及主机关系 * * @param bizId cmdb业务ID * @param ccInstList topo节点列表 * @return 主机列表 */ - List getHostsByTopology(long bizId, List ccInstList); + List getHostRelationsByTopology(long bizId, List ccInstList); /** - * 根据module获取hosts + * 根据module获取主机及主机关系 * * @param bizId cmdb业务ID * @param moduleIdList 模块ID列表 * @return 主机 */ - List findHostByModule(long bizId, List moduleIdList); + List findHostRelationByModule(long bizId, List moduleIdList); /** * 从CC获取所有业务信息 diff --git a/src/backend/commons/common/src/main/java/com/tencent/bk/job/common/model/dto/ApplicationHostDTO.java b/src/backend/commons/common/src/main/java/com/tencent/bk/job/common/model/dto/ApplicationHostDTO.java index 812fab6459..0b4f687a3f 100644 --- a/src/backend/commons/common/src/main/java/com/tencent/bk/job/common/model/dto/ApplicationHostDTO.java +++ b/src/backend/commons/common/src/main/java/com/tencent/bk/job/common/model/dto/ApplicationHostDTO.java @@ -132,6 +132,11 @@ public class ApplicationHostDTO { */ private String cloudVendorName; + /** + * CMDB中的上次修改时间 + */ + private Long lastTime; + /** * 集群ID */ diff --git a/src/backend/commons/common/src/main/java/com/tencent/bk/job/common/model/dto/BasicHostDTO.java b/src/backend/commons/common/src/main/java/com/tencent/bk/job/common/model/dto/BasicHostDTO.java index e7ed055536..949119c74a 100644 --- a/src/backend/commons/common/src/main/java/com/tencent/bk/job/common/model/dto/BasicHostDTO.java +++ b/src/backend/commons/common/src/main/java/com/tencent/bk/job/common/model/dto/BasicHostDTO.java @@ -45,8 +45,8 @@ public class BasicHostDTO { */ private Long hostId; /** - * 上次修改时间 + * CMDB数据的上次修改时间 */ - private Long lastModifyTime; + private Long lastTime; } diff --git a/src/backend/commons/common/src/main/java/com/tencent/bk/job/common/util/TimeUtil.java b/src/backend/commons/common/src/main/java/com/tencent/bk/job/common/util/TimeUtil.java index f1c13487dc..adf331570d 100644 --- a/src/backend/commons/common/src/main/java/com/tencent/bk/job/common/util/TimeUtil.java +++ b/src/backend/commons/common/src/main/java/com/tencent/bk/job/common/util/TimeUtil.java @@ -24,21 +24,20 @@ package com.tencent.bk.job.common.util; +import lombok.extern.slf4j.Slf4j; + import java.text.DateFormat; import java.text.SimpleDateFormat; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.ZoneOffset; +import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.util.Calendar; import java.util.Date; -/** - * @Description - * @Date 2020/3/6 - * @Version 1.0 - */ +@Slf4j public class TimeUtil { public static final String DEFAULT_TIME_FORMAT = "yyyy-MM-dd HH:mm:ss"; @@ -155,4 +154,14 @@ public static String formatTime(long timeMills, String format) { DateFormat formatter = new SimpleDateFormat(format); return formatter.format(date); } + + public static Long parseZonedTime(String zonedTimeStr) { + try { + ZonedDateTime zonedDateTime = ZonedDateTime.parse(zonedTimeStr, DateTimeFormatter.ISO_DATE_TIME); + return zonedDateTime.toInstant().toEpochMilli(); + } catch (Exception e) { + log.warn("Fail to parseZonedTime from: {}", zonedTimeStr); + return null; + } + } } diff --git a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/dao/ApplicationHostDAO.java b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/dao/ApplicationHostDAO.java index 644305a654..d5461b139b 100644 --- a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/dao/ApplicationHostDAO.java +++ b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/dao/ApplicationHostDAO.java @@ -40,9 +40,6 @@ public interface ApplicationHostDAO { // 查询类操作 - - boolean existsHost(long bizId, String ip); - boolean existAppHostInfoByHostId(Long hostId); ApplicationHostDTO getHostById(Long hostId); @@ -65,7 +62,7 @@ public interface ApplicationHostDAO { List listHostInfoByBizId(long bizId); - List listBasicHostInfoByBizId(long bizId); + List listBasicHostInfo(Collection hostIds); List listAllHostInfo(Long start, Long limit); @@ -160,6 +157,13 @@ Long countHostInfoByMultiKeys(Collection bizIds, */ List listAllHostSimpleInfo(); + /** + * 查询全部主机,主机对象只有基础属性 + * + * @return 主机列表 + */ + List listAllBasicHost(); + /** * 批量更新主机状态 * @@ -170,27 +174,19 @@ Long countHostInfoByMultiKeys(Collection bizIds, int batchUpdateHostStatusByHostIds(int status, List hostIdList); // 新增、更新类操作 - int insertHostWithoutTopo(ApplicationHostDTO applicationHostDTO); - void insertOrUpdateHost(ApplicationHostDTO hostDTO); + int batchInsertHost(List applicationHostDTOList); - int batchInsertAppHostInfo(List applicationHostDTOList); + void updateHostAttrsBeforeLastTime(ApplicationHostDTO applicationHostDTO); - void updateHostAttrsById(ApplicationHostDTO applicationHostDTO); - - void updateBizHostInfoByHostId(Long bizId, ApplicationHostDTO applicationHostDTO); - - int updateBizHostInfoByHostId(Long bizId, ApplicationHostDTO applicationHostDTO, boolean updateTopo); - - int batchUpdateBizHostInfoByHostId(List applicationHostDTOList); + int batchUpdateHostsBeforeLastTime(List applicationHostDTOList); int syncHostTopo(Long hostId); // 删除类操作 - - int deleteBizHostInfoById(Long bizId, Long hostId); + int deleteHostBeforeLastTime(Long bizId, Long hostId, Long lastTime); /** * 根据传入的主机ID批量删除主机 @@ -226,6 +222,14 @@ Long countHostInfoByMultiKeys(Collection bizIds, */ int deleteBizHostInfoByBizId(long bizId); + /** + * 根据主机基础信息进行批量删除 + * + * @param basicHostList 主机基础信息列表 + * @return 成功删除的主机数量 + */ + int deleteByBasicHost(List basicHostList); + /** * 根据业务id统计主机状态数量 * diff --git a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/dao/HostTopoDAO.java b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/dao/HostTopoDAO.java index b9b1be7b32..98f0f89fe7 100644 --- a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/dao/HostTopoDAO.java +++ b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/dao/HostTopoDAO.java @@ -30,26 +30,40 @@ import java.util.List; public interface HostTopoDAO { - void insertHostTopo(HostTopoDTO hostTopoDTO); + int insertHostTopo(HostTopoDTO hostTopoDTO); int batchInsertHostTopo(List hostTopoDTOList); void deleteHostTopoByHostId(Long appId, Long hostId); - void deleteHostTopo(Long hostId, Long appId, Long setId, Long moduleId); + int deleteHostTopoBeforeLastTime(Long hostId, Long appId, Long setId, Long moduleId, Long lastTime); int batchDeleteHostTopo(List hostIdList); + int batchDeleteWithLastTime(List hostTopoList); + int batchDeleteHostTopo(Long bizId, List hostIdList); + int batchUpdateBeforeLastTime(List hostTopoList); + int countHostTopo(Long bizId, Long hostId); List listHostTopoByHostId(Long hostId); + List listHostTopoByHostIds(Collection hostId); + List listHostTopoByModuleIds(Collection moduleIds); List listHostTopoByModuleIds(Collection moduleIds, Long start, Long limit); + /** + * 根据要排除的hostId查询其他拓扑 + * + * @param excludeHostIds 要排除的hostId集合 + * @return 拓扑列表 + */ + List listHostTopoByExcludeHostIds(Collection excludeHostIds); + /** * 根据CMDB业务IDs查询下属主机ID列表 * @@ -66,4 +80,12 @@ public interface HostTopoDAO { * @return 主机ID列表 */ List listHostIdByBizAndHostIds(Collection bizIds, Collection hostIds); + + /** + * 根据hostId查询所属模块Id + * + * @param hostId 主机ID + * @return 模块ID列表 + */ + List listModuleIdByHostId(Long hostId); } diff --git a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/dao/impl/ApplicationHostDAOImpl.java b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/dao/impl/ApplicationHostDAOImpl.java index 237131f7b5..dfb0f988de 100644 --- a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/dao/impl/ApplicationHostDAOImpl.java +++ b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/dao/impl/ApplicationHostDAOImpl.java @@ -38,7 +38,6 @@ import com.tencent.bk.job.common.model.dto.ResourceScope; import com.tencent.bk.job.common.util.StringUtil; import com.tencent.bk.job.common.util.TagUtils; -import com.tencent.bk.job.common.util.json.JsonUtils; import com.tencent.bk.job.manage.common.TopologyHelper; import com.tencent.bk.job.manage.common.util.JooqDataTypeUtil; import com.tencent.bk.job.manage.dao.ApplicationDAO; @@ -122,6 +121,11 @@ public class ApplicationHostDAOImpl implements ApplicationHostDAO { TABLE.OS_TYPE }; + private static final TableField[] BASIC_FIELDS = { + TABLE.HOST_ID, + TABLE.LAST_TIME + }; + private final DSLContext context; private final ApplicationDAO applicationDAO; private final HostTopoDAO hostTopoDAO; @@ -229,8 +233,8 @@ public List listHostInfoByBizId(long bizId) { } @Override - public List listBasicHostInfoByBizId(long bizId) { - List conditions = buildBizIdCondition(bizId); + public List listBasicHostInfo(Collection hostIds) { + List conditions = buildHostIdsCondition(hostIds); return listBasicHostInfoByConditions(conditions); } @@ -249,17 +253,17 @@ private List listBasicHostInfoByConditions(Collection c } val query = context.select( TABLE.HOST_ID, - TABLE.LAST_MODIFY_TIME + TABLE.LAST_TIME ).from(TABLE) .where(conditions); - Result> records = query.fetch(); + Result> records = query.fetch(); List basicHostInfoList = new ArrayList<>(); if (CollectionUtils.isNotEmpty(records)) { records.forEach(record -> { BasicHostDTO basicHost = new BasicHostDTO( JooqDataTypeUtil.getLongFromULong(record.get(TABLE.HOST_ID)), - JooqDataTypeUtil.getLongFromULong(record.get(TABLE.LAST_MODIFY_TIME)) + record.get(TABLE.LAST_TIME) ); basicHostInfoList.add(basicHost); }); @@ -707,38 +711,13 @@ public PageData listHostInfoByPage(ApplicationHostDTO applic return hostInfoPageData; } - private List genHostTopoDTOList(ApplicationHostDTO applicationHostDTO) { - List setIdList = applicationHostDTO.getSetId(); - List moduleIdList = applicationHostDTO.getModuleId(); - if (setIdList == null || moduleIdList == null) { - return Collections.emptyList(); - } else if (setIdList.size() != moduleIdList.size()) { - throw new RuntimeException("setIdList.size()!=moduleIdList.size(),hostInfo=" + JsonUtils.toJson(applicationHostDTO)); - } else { - List hostTopoDTOList = new ArrayList<>(); - for (int i = 0; i < setIdList.size(); i++) { - hostTopoDTOList.add(new HostTopoDTO(applicationHostDTO.getHostId(), - applicationHostDTO.getBizId(), setIdList.get(i), moduleIdList.get(i))); - } - return hostTopoDTOList; - } - } - - @Transactional @Override public int insertHostWithoutTopo(ApplicationHostDTO applicationHostDTO) { - return insertOrUpdateHost(context, applicationHostDTO, false); - } - - @Transactional - @Override - public void insertOrUpdateHost(ApplicationHostDTO hostDTO) { - insertOrUpdateHost(context, hostDTO, true); + return insertOrUpdateHost(context, applicationHostDTO); } private int insertOrUpdateHost(DSLContext defaultContext, - ApplicationHostDTO applicationHostDTO, - Boolean insertTopo) { + ApplicationHostDTO applicationHostDTO) { int result; String finalSetIdsStr = applicationHostDTO.getSetIdsStr(); String finalModuleIdsStr = applicationHostDTO.getModuleIdsStr(); @@ -755,6 +734,7 @@ private int insertOrUpdateHost(DSLContext defaultContext, UByte gseAgentAlive = UByte.valueOf(applicationHostDTO.getGseAgentAlive() ? 1 : 0); String cloudIp = applicationHostDTO.getCloudIp(); String cloudVendor = applicationHostDTO.getCloudVendorId(); + Long lastTime = applicationHostDTO.getLastTime(); var query = defaultContext.insertInto(TABLE, TABLE.HOST_ID, TABLE.APP_ID, @@ -772,7 +752,8 @@ private int insertOrUpdateHost(DSLContext defaultContext, TABLE.IS_AGENT_ALIVE, TABLE.CLOUD_IP, TABLE.LAST_MODIFY_TIME, - TABLE.CLOUD_VENDOR_ID + TABLE.CLOUD_VENDOR_ID, + TABLE.LAST_TIME ).values( JooqDataTypeUtil.buildULong(applicationHostDTO.getHostId()), bizId, @@ -790,7 +771,8 @@ private int insertOrUpdateHost(DSLContext defaultContext, gseAgentAlive, cloudIp, JooqDataTypeUtil.buildULong(System.currentTimeMillis()), - cloudVendor + cloudVendor, + lastTime ); try { result = query.onDuplicateKeyUpdate() @@ -809,22 +791,17 @@ private int insertOrUpdateHost(DSLContext defaultContext, .set(TABLE.IS_AGENT_ALIVE, gseAgentAlive) .set(TABLE.CLOUD_IP, cloudIp) .set(TABLE.CLOUD_VENDOR_ID, cloudVendor) + .set(TABLE.LAST_TIME, lastTime) .execute(); } catch (Throwable t) { log.info("SQL=" + query.getSQL(ParamType.INLINED)); throw t; } - if (insertTopo) { - List hostTopoDTOList = genHostTopoDTOList(applicationHostDTO); - hostTopoDAO.deleteHostTopoByHostId(applicationHostDTO.getBizId(), applicationHostDTO.getHostId()); - int affectedNum = hostTopoDAO.batchInsertHostTopo(hostTopoDTOList); - log.debug("{} hostTopo inserted", affectedNum); - } return result; } @Override - public int batchInsertAppHostInfo(List applicationHostDTOList) { + public int batchInsertHost(List applicationHostDTOList) { int batchSize = 1000; int size = applicationHostDTOList.size(); int start = 0; @@ -851,7 +828,8 @@ public int batchInsertAppHostInfo(List applicationHostDTOLis TABLE.IS_AGENT_ALIVE, TABLE.CLOUD_IP, TABLE.LAST_MODIFY_TIME, - TABLE.CLOUD_VENDOR_ID + TABLE.CLOUD_VENDOR_ID, + TABLE.LAST_TIME ).values( (ULong) null, null, @@ -869,10 +847,10 @@ public int batchInsertAppHostInfo(List applicationHostDTOLis null, null, null, + null, null ); BatchBindStep batchQuery = context.batch(insertQuery); - List hostTopoDTOList = new ArrayList<>(); for (ApplicationHostDTO applicationHostDTO : subList) { batchQuery = batchQuery.bind( JooqDataTypeUtil.buildULong(applicationHostDTO.getHostId()), @@ -890,15 +868,14 @@ public int batchInsertAppHostInfo(List applicationHostDTOLis applicationHostDTO.getModuleTypeStr(), JooqDataTypeUtil.buildUByte(applicationHostDTO.getAgentStatusValue()), applicationHostDTO.getCloudIp(), - applicationHostDTO.getCloudVendorId() + applicationHostDTO.getCloudVendorId(), + applicationHostDTO.getLastTime() ); - hostTopoDTOList.addAll(genHostTopoDTOList(applicationHostDTO)); } int[] results = batchQuery.execute(); for (int result : results) { affectedNum += result; } - hostTopoDAO.batchInsertHostTopo(hostTopoDTOList); start += batchSize; } while (end < size); return affectedNum; @@ -918,7 +895,7 @@ public boolean existAppHostInfoByHostId(Long hostId) { } @Override - public void updateHostAttrsById(ApplicationHostDTO applicationHostDTO) { + public void updateHostAttrsBeforeLastTime(ApplicationHostDTO applicationHostDTO) { Long hostId = applicationHostDTO.getHostId(); if (hostId == null || hostId <= 0) { FormattingTuple msg = MessageFormatter.format( @@ -930,6 +907,7 @@ public void updateHostAttrsById(ApplicationHostDTO applicationHostDTO) { } List conditions = new ArrayList<>(); conditions.add(TABLE.HOST_ID.eq(ULong.valueOf(applicationHostDTO.getHostId()))); + conditions.add(TABLE.LAST_TIME.lessThan(applicationHostDTO.getLastTime())); val query = context.update(TABLE) .set(TABLE.CLOUD_AREA_ID, ULong.valueOf(applicationHostDTO.getCloudAreaId())) .set(TABLE.IP, applicationHostDTO.getIp()) @@ -943,6 +921,7 @@ public void updateHostAttrsById(ApplicationHostDTO applicationHostDTO) { .set(TABLE.IS_AGENT_ALIVE, UByte.valueOf(applicationHostDTO.getAgentStatusValue())) .set(TABLE.LAST_MODIFY_TIME, JooqDataTypeUtil.buildULong(System.currentTimeMillis())) .set(TABLE.CLOUD_VENDOR_ID, applicationHostDTO.getCloudVendorId()) + .set(TABLE.LAST_TIME, applicationHostDTO.getLastTime()) .where(conditions); try { query.execute(); @@ -952,43 +931,6 @@ public void updateHostAttrsById(ApplicationHostDTO applicationHostDTO) { } } - @Override - public void updateBizHostInfoByHostId(Long bizId, ApplicationHostDTO applicationHostDTO) { - updateBizHostInfoByHostId(bizId, applicationHostDTO, true); - } - - @Transactional - @Override - public int updateBizHostInfoByHostId(Long bizId, - ApplicationHostDTO applicationHostDTO, - boolean updateTopo) { - if (applicationHostDTO.getHostId() == -1L) { - return -1; - } - int affectedNum; - List conditions = new ArrayList<>(); - if (bizId != null) { - conditions.add(TABLE.APP_ID.eq(JooqDataTypeUtil.buildULong(bizId))); - } - Long hostId = applicationHostDTO.getHostId(); - if (hostId != null) { - conditions.add(TABLE.HOST_ID.eq(JooqDataTypeUtil.buildULong(applicationHostDTO.getHostId()))); - } - val query = buildQueryWithHostAndConditions(applicationHostDTO, conditions); - try { - affectedNum = query.execute(); - } catch (Throwable t) { - log.info("SQL=" + query.getSQL(ParamType.INLINED)); - throw t; - } - if (updateTopo) { - List hostTopoDTOList = genHostTopoDTOList(applicationHostDTO); - hostTopoDAO.deleteHostTopoByHostId(bizId, hostId); - hostTopoDAO.batchInsertHostTopo(hostTopoDTOList); - } - return affectedNum; - } - private Query buildQueryWithHostAndConditions(ApplicationHostDTO applicationHostDTO, List conditions) { return context.update(TABLE) .set(TABLE.APP_ID, JooqDataTypeUtil.buildULong(applicationHostDTO.getBizId())) @@ -1006,13 +948,14 @@ private Query buildQueryWithHostAndConditions(ApplicationHostDTO applicationHost .set(TABLE.MODULE_TYPE, applicationHostDTO.getModuleTypeStr()) .set(TABLE.IS_AGENT_ALIVE, JooqDataTypeUtil.buildUByte(applicationHostDTO.getAgentStatusValue())) .set(TABLE.CLOUD_VENDOR_ID, applicationHostDTO.getCloudVendorId()) + .set(TABLE.LAST_TIME, applicationHostDTO.getLastTime()) .where(conditions); } @Override - public int batchUpdateBizHostInfoByHostId(List applicationHostDTOList) { + public int batchUpdateHostsBeforeLastTime(List hostList) { int batchSize = 1000; - int size = applicationHostDTOList.size(); + int size = hostList.size(); int start = 0; int end; List queryList = new ArrayList<>(); @@ -1020,36 +963,26 @@ public int batchUpdateBizHostInfoByHostId(List applicationHo do { end = start + batchSize; end = Math.min(end, size); - List subList = applicationHostDTOList.subList(start, end); - List hostTopoDTOList = new ArrayList<>(); - for (ApplicationHostDTO applicationHostDTO : subList) { - if (applicationHostDTO.getHostId() == -1L) { - log.warn("Unexpected hostId==-1,hostInfo={}", applicationHostDTO); - continue; - } + List subList = hostList.subList(start, end); + for (ApplicationHostDTO host : subList) { List conditions = new ArrayList<>(); - conditions.add(TABLE.HOST_ID.eq(ULong.valueOf(applicationHostDTO.getHostId()))); - conditions.add(TABLE.APP_ID.eq(ULong.valueOf(applicationHostDTO.getBizId()))); - queryList.add(buildQueryWithHostAndConditions(applicationHostDTO, conditions)); - hostTopoDTOList.addAll(genHostTopoDTOList(applicationHostDTO)); + conditions.add(TABLE.HOST_ID.eq(ULong.valueOf(host.getHostId()))); + conditions.add(TABLE.LAST_TIME.lessThan(host.getLastTime())); + queryList.add(buildQueryWithHostAndConditions(host, conditions)); } int[] results = context.batch(queryList).execute(); queryList.clear(); for (int result : results) { affectedNum += result; } - // 更新hostTopo表数据 - hostTopoDAO.batchDeleteHostTopo( - new ArrayList<>(hostTopoDTOList.stream().map(HostTopoDTO::getHostId).collect(Collectors.toSet())) - ); - hostTopoDAO.batchInsertHostTopo(hostTopoDTOList); start += batchSize; } while (end < size); return affectedNum; } + @Transactional @Override - public int deleteBizHostInfoById(Long bizId, Long hostId) { + public int deleteHostBeforeLastTime(Long bizId, Long hostId, Long lastTime) { int affectedNum; List conditions = new ArrayList<>(); if (bizId != null) { @@ -1058,6 +991,9 @@ public int deleteBizHostInfoById(Long bizId, Long hostId) { if (hostId != null) { conditions.add(TABLE.HOST_ID.eq(JooqDataTypeUtil.buildULong(hostId))); } + if (lastTime != null) { + conditions.add(TABLE.LAST_TIME.lessOrEqual(lastTime)); + } affectedNum = context.deleteFrom(TABLE) .where(conditions) .execute(); @@ -1142,6 +1078,34 @@ public int deleteBizHostInfoByBizId(long bizId) { return batchDeleteBizHostInfoById(bizId, hostIds); } + @Override + public int deleteByBasicHost(List basicHostList) { + int affectedNum = 0; + int batchSize = 1000; + int size = basicHostList.size(); + int start = 0; + int end; + List queryList = new ArrayList<>(); + do { + end = start + batchSize; + end = Math.min(end, size); + List subList = basicHostList.subList(start, end); + for (BasicHostDTO basicHost : subList) { + queryList.add(context.deleteFrom(TABLE) + .where(TABLE.HOST_ID.eq(JooqDataTypeUtil.buildULong(basicHost.getHostId()))) + .and(TABLE.LAST_TIME.eq(basicHost.getLastTime())) + ); + } + int[] results = context.batch(queryList).execute(); + queryList.clear(); + for (int result : results) { + affectedNum += result; + } + start += batchSize; + } while (end < size); + return affectedNum; + } + @Override public List listAllHostSimpleInfo() { val query = context.select(SIMPLE_FIELDS) @@ -1154,6 +1118,18 @@ public List listAllHostSimpleInfo() { return hostInfoList; } + @Override + public List listAllBasicHost() { + val query = context.select(BASIC_FIELDS) + .from(TABLE); + Result records = query.fetch(); + List basicHostList = new ArrayList<>(); + if (CollectionUtils.isNotEmpty(records)) { + records.map(record -> basicHostList.add(extractBasicHost(record))); + } + return basicHostList; + } + @Override public int batchUpdateHostStatusByHostIds(int status, List hostIdList) { return context.update(TABLE) @@ -1162,11 +1138,6 @@ public int batchUpdateHostStatusByHostIds(int status, List hostIdList) { .execute(); } - @Override - public boolean existsHost(long bizId, String ip) { - return context.fetchExists(TABLE, TABLE.APP_ID.eq(ULong.valueOf(bizId)).and(TABLE.IP.eq(ip))); - } - @Override public long countHostsByBizIds(Collection bizIds) { List conditions = new ArrayList<>(); @@ -1207,11 +1178,28 @@ public int syncHostTopo(Long hostId) { hostInfoDTO.setSetId(setIds); hostInfoDTO.setModuleId(moduleIds); hostInfoDTO.setModuleType(moduleTypes); - return updateBizHostInfoByHostId(null, hostInfoDTO, false); + return updateHostTopoAttrsByHostId(hostInfoDTO); } return -1; } + private int updateHostTopoAttrsByHostId(ApplicationHostDTO host) { + List conditions = new ArrayList<>(); + conditions.add(TABLE.HOST_ID.eq(ULong.valueOf(host.getHostId()))); + val query = context.update(TABLE) + .set(TABLE.APP_ID, ULong.valueOf(host.getBizId())) + .set(TABLE.SET_IDS, host.getSetIdsStr()) + .set(TABLE.MODULE_IDS, host.getModuleIdsStr()) + .set(TABLE.MODULE_TYPE, host.getModuleTypeStr()) + .where(conditions); + try { + return query.execute(); + } catch (Throwable t) { + log.info("SQL=" + query.getSQL(ParamType.INLINED)); + throw t; + } + } + /** * 查询符合条件的主机数量 */ @@ -1223,6 +1211,12 @@ private long countHostByConditions(List conditions) { return context.selectCount().from(TABLE).where(conditions).fetchOne(0, Long.class); } + private List buildHostIdsCondition(Collection hostIds) { + List conditions = new ArrayList<>(); + conditions.add(TABLE.HOST_ID.in(hostIds)); + return conditions; + } + private List buildBizIdCondition(long bizId) { ApplicationDTO appInfo = applicationDAO.getAppByScope( new ResourceScope(ResourceScopeTypeEnum.BIZ, "" + bizId) @@ -1367,4 +1361,14 @@ public static HostSimpleDTO extractSimpleData(Record record) { return hostSimpleDTO; } + + public static BasicHostDTO extractBasicHost(Record record) { + if (record == null) { + return null; + } + BasicHostDTO basicHost = new BasicHostDTO(); + basicHost.setHostId(record.get(TABLE.HOST_ID).longValue()); + basicHost.setLastTime(record.get(TABLE.LAST_TIME)); + return basicHost; + } } diff --git a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/dao/impl/HostTopoDAOImpl.java b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/dao/impl/HostTopoDAOImpl.java index 2fb6ac1f87..56ee9d6892 100644 --- a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/dao/impl/HostTopoDAOImpl.java +++ b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/dao/impl/HostTopoDAOImpl.java @@ -36,6 +36,7 @@ import org.jooq.DeleteConditionStep; import org.jooq.Query; import org.jooq.Result; +import org.jooq.UpdateConditionStep; import org.jooq.conf.ParamType; import org.jooq.generated.tables.HostTopo; import org.jooq.generated.tables.records.HostTopoRecord; @@ -63,23 +64,28 @@ public HostTopoDAOImpl(DSLContext dslContext) { } @Override - public void insertHostTopo(HostTopoDTO hostTopoDTO) { + public int insertHostTopo(HostTopoDTO hostTopoDTO) { val query = defaultContext.insertInto(defaultTable, defaultTable.HOST_ID, defaultTable.APP_ID, defaultTable.SET_ID, - defaultTable.MODULE_ID + defaultTable.MODULE_ID, + defaultTable.LAST_TIME ).values( ULong.valueOf(hostTopoDTO.getHostId()), ULong.valueOf(hostTopoDTO.getBizId()), hostTopoDTO.getSetId(), - hostTopoDTO.getModuleId() + hostTopoDTO.getModuleId(), + hostTopoDTO.getLastTime() ).onDuplicateKeyIgnore(); - query.execute(); + return query.execute(); } @Override public int batchInsertHostTopo(List hostTopoDTOList) { + if (CollectionUtils.isEmpty(hostTopoDTOList)) { + return 0; + } int batchSize = 1000; int size = hostTopoDTOList.size(); int start = 0; @@ -89,19 +95,17 @@ public int batchInsertHostTopo(List hostTopoDTOList) { end = start + batchSize; end = Math.min(end, size); List subList = hostTopoDTOList.subList(start, end); - if (subList.isEmpty()) { - // 避免插入空数据 - break; - } val insertQuery = defaultContext.insertInto(defaultTable, defaultTable.HOST_ID, defaultTable.APP_ID, defaultTable.SET_ID, - defaultTable.MODULE_ID + defaultTable.MODULE_ID, + defaultTable.LAST_TIME ).values( (ULong) null, null, null, + null, null ).onDuplicateKeyIgnore(); BatchBindStep batchQuery = defaultContext.batch(insertQuery); @@ -110,7 +114,8 @@ public int batchInsertHostTopo(List hostTopoDTOList) { ULong.valueOf(hostTopoDTO.getHostId()), hostTopoDTO.getBizId(), hostTopoDTO.getSetId(), - hostTopoDTO.getModuleId() + hostTopoDTO.getModuleId(), + hostTopoDTO.getLastTime() ); } int[] results = batchQuery.execute(); @@ -123,10 +128,10 @@ public int batchInsertHostTopo(List hostTopoDTOList) { } @Override - public void deleteHostTopoByHostId(Long appId, Long hostId) { + public void deleteHostTopoByHostId(Long bizId, Long hostId) { List conditions = new ArrayList<>(); - if (appId != null) { - conditions.add(defaultTable.APP_ID.eq(ULong.valueOf(appId))); + if (bizId != null) { + conditions.add(defaultTable.APP_ID.eq(ULong.valueOf(bizId))); } if (hostId != null) { conditions.add(defaultTable.HOST_ID.eq(ULong.valueOf(hostId))); @@ -137,12 +142,13 @@ public void deleteHostTopoByHostId(Long appId, Long hostId) { } @Override - public void deleteHostTopo(Long hostId, Long appId, Long setId, Long moduleId) { - defaultContext.deleteFrom(defaultTable) + public int deleteHostTopoBeforeLastTime(Long hostId, Long bizId, Long setId, Long moduleId, Long lastTime) { + return defaultContext.deleteFrom(defaultTable) .where(defaultTable.HOST_ID.eq(ULong.valueOf(hostId))) - .and(defaultTable.APP_ID.eq(ULong.valueOf(appId))) + .and(defaultTable.APP_ID.eq(ULong.valueOf(bizId))) .and(defaultTable.SET_ID.eq(setId)) .and(defaultTable.MODULE_ID.eq(moduleId)) + .and(defaultTable.LAST_TIME.lessOrEqual(lastTime)) .execute(); } @@ -183,11 +189,83 @@ public int batchDeleteHostTopo(Long bizId, List hostIdList) { return affectedNum; } + private List buildHostTopoMainFieldCondition(HostTopoDTO hostTopo) { + List conditions = new ArrayList<>(); + conditions.add(defaultTable.HOST_ID.eq(JooqDataTypeUtil.buildULong(hostTopo.getHostId()))); + conditions.add(defaultTable.APP_ID.eq(JooqDataTypeUtil.buildULong(hostTopo.getBizId()))); + conditions.add(defaultTable.SET_ID.eq(hostTopo.getSetId())); + conditions.add(defaultTable.MODULE_ID.eq(hostTopo.getModuleId())); + return conditions; + } + + @Override + public int batchUpdateBeforeLastTime(List hostTopoList) { + if (CollectionUtils.isEmpty(hostTopoList)) { + return 0; + } + int batchSize = 1000; + int size = hostTopoList.size(); + int start = 0; + int end; + List queryList = new ArrayList<>(); + int affectedNum = 0; + do { + end = Math.min(start + batchSize, size); + List subList = hostTopoList.subList(start, end); + for (HostTopoDTO hostTopo : subList) { + List conditions = buildHostTopoMainFieldCondition(hostTopo); + conditions.add(defaultTable.LAST_TIME.lessThan(hostTopo.getLastTime())); + UpdateConditionStep step = defaultContext.update(defaultTable) + .set(defaultTable.LAST_TIME, hostTopo.getLastTime()) + .where(conditions); + queryList.add(step); + } + int[] results = defaultContext.batch(queryList).execute(); + queryList.clear(); + for (int result : results) { + affectedNum += result; + } + start += batchSize; + } while (end < size); + return affectedNum; + } + @Override public int batchDeleteHostTopo(List hostIdList) { return batchDeleteHostTopo(null, hostIdList); } + @Override + public int batchDeleteWithLastTime(List hostTopoList) { + if (CollectionUtils.isEmpty(hostTopoList)) { + return 0; + } + int batchSize = 1000; + int size = hostTopoList.size(); + int start = 0; + int end; + List queryList = new ArrayList<>(); + int affectedNum = 0; + do { + end = Math.min(start + batchSize, size); + List subList = hostTopoList.subList(start, end); + for (HostTopoDTO hostTopo : subList) { + List conditions = buildHostTopoMainFieldCondition(hostTopo); + conditions.add(defaultTable.LAST_TIME.eq(hostTopo.getLastTime())); + DeleteConditionStep step = defaultContext.deleteFrom(defaultTable) + .where(conditions); + queryList.add(step); + } + int[] results = defaultContext.batch(queryList).execute(); + queryList.clear(); + for (int result : results) { + affectedNum += result; + } + start += batchSize; + } while (end < size); + return affectedNum; + } + private List listHostTopoByConditions(Collection conditions) { return listHostTopoByConditions(conditions, null, null); } @@ -244,6 +322,17 @@ public List listHostTopoByHostId(Long hostId) { return listHostTopoByConditions(conditions); } + @Override + public List listHostTopoByHostIds(Collection hostIds) { + List conditions = new ArrayList<>(); + conditions.add(defaultTable.HOST_ID.in( + hostIds.stream() + .map(JooqDataTypeUtil::buildULong) + .collect(Collectors.toList()) + )); + return listHostTopoByConditions(conditions); + } + @Override public List listHostTopoByModuleIds(Collection moduleIds) { return listHostTopoByModuleIds(moduleIds, null, null); @@ -257,6 +346,13 @@ public List listHostTopoByModuleIds(Collection moduleIds, Lon return listHostTopoByConditions(conditions, start, limit); } + @Override + public List listHostTopoByExcludeHostIds(Collection excludeHostIds) { + List conditions = new ArrayList<>(); + conditions.add(defaultTable.HOST_ID.notIn(excludeHostIds)); + return listHostTopoByConditions(conditions, null, null); + } + private List listHostIdByConditions(Collection conditions) { val query = defaultContext.select( defaultTable.HOST_ID @@ -293,12 +389,22 @@ public List listHostIdByBizAndHostIds(Collection bizIds, Collection< return listHostIdByConditions(conditions); } + @Override + public List listModuleIdByHostId(Long hostId) { + val query = defaultContext.select( + defaultTable.MODULE_ID + ).from(defaultTable) + .where(defaultTable.HOST_ID.eq(JooqDataTypeUtil.buildULong(hostId))); + return query.fetch().map(record -> record.get(defaultTable.MODULE_ID, Long.class)); + } + private HostTopoDTO convertRecordToDto(HostTopoRecord record) { return new HostTopoDTO( record.getHostId().longValue(), record.getAppId().longValue(), record.getSetId(), - record.getModuleId() + record.getModuleId(), + record.getLastTime() ); } } diff --git a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/manager/host/HostCache.java b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/manager/host/HostCache.java index 3a0b137f7d..af515bb330 100644 --- a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/manager/host/HostCache.java +++ b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/manager/host/HostCache.java @@ -32,9 +32,13 @@ import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; +import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -96,6 +100,25 @@ public void deleteHost(ApplicationHostDTO applicationHostDTO) { redisTemplate.delete(hostIdKey); } + /** + * 批量删除缓存中的主机 + * + * @param hosts 主机集合 + */ + public void batchDeleteHost(Collection hosts) { + if (CollectionUtils.isEmpty(hosts)) { + return; + } + Set hostIpKeys = new HashSet<>(); + Set hostIdKeys = new HashSet<>(); + for (ApplicationHostDTO host : hosts) { + hostIpKeys.add(buildHostIpKey(host)); + hostIdKeys.add(buildHostIdKey(host)); + } + redisTemplate.delete(hostIpKeys); + redisTemplate.delete(hostIdKeys); + } + /** * 更新缓存中的主机 * diff --git a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/model/dto/HostTopoDTO.java b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/model/dto/HostTopoDTO.java index 19fd32c435..7acde54dd1 100644 --- a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/model/dto/HostTopoDTO.java +++ b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/model/dto/HostTopoDTO.java @@ -25,6 +25,7 @@ package com.tencent.bk.job.manage.model.dto; import com.tencent.bk.job.common.cc.model.result.HostRelationEventDetail; +import com.tencent.bk.job.common.util.TimeUtil; import lombok.AllArgsConstructor; import lombok.Data; import lombok.EqualsAndHashCode; @@ -54,9 +55,18 @@ public class HostTopoDTO { * 模块ID */ private Long moduleId; + /** + * CMDB中的数据最后修改时间 + */ + private Long lastTime; public static HostTopoDTO fromHostRelationEvent(HostRelationEventDetail eventDetail) { - return new HostTopoDTO(eventDetail.getHostId(), eventDetail.getBizId(), eventDetail.getSetId(), - eventDetail.getModuleId()); + return new HostTopoDTO( + eventDetail.getHostId(), + eventDetail.getBizId(), + eventDetail.getSetId(), + eventDetail.getModuleId(), + TimeUtil.parseZonedTime(eventDetail.getLastTime()) + ); } } diff --git a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/host/HostService.java b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/host/HostService.java index c0a2356b61..561419ddf3 100644 --- a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/host/HostService.java +++ b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/host/HostService.java @@ -29,6 +29,7 @@ import com.tencent.bk.job.common.model.PageData; import com.tencent.bk.job.common.model.dto.AppResourceScope; import com.tencent.bk.job.common.model.dto.ApplicationHostDTO; +import com.tencent.bk.job.common.model.dto.BasicHostDTO; import com.tencent.bk.job.common.model.dto.DynamicGroupWithHost; import com.tencent.bk.job.common.model.dto.HostDTO; import com.tencent.bk.job.common.model.dto.HostSimpleDTO; @@ -48,36 +49,38 @@ * 主机、topo相关服务 */ public interface HostService { - boolean existHost(long bizId, String ip); List getHostsByAppId(Long appId); /** - * 新增业务下的主机 + * 批量新增主机 * - * @param bizId 业务ID * @param insertList 主机信息 - * @return 新增失败的主机ID + * @return 成功插入的主机数量 */ - List insertHostsToBiz(Long bizId, List insertList); + int batchInsertHosts(List insertList); /** - * 更新业务下的主机 + * 批量更新主机(只更新时间戳比当前数据旧的) * - * @param bizId 业务ID * @param hostInfoList 主机信息 - * @return 更新失败的主机ID + * @return 成功更新的主机数量 */ - List updateHostsInBiz(Long bizId, List hostInfoList); + int batchUpdateHostsBeforeLastTime(List hostInfoList); /** - * 将主机从业务下移除但不删除 + * 创建或更新主机(仅更新时间戳在当前数据之前的数据) * - * @param bizId 业务ID - * @param hostIdList 主机ID列表 - * @return 移除失败的主机ID + * @param hostInfoDTO 主机信息 */ - List removeHostsFromBiz(Long bizId, List hostIdList); + void createOrUpdateHostBeforeLastTime(ApplicationHostDTO hostInfoDTO); + + /** + * 删除主机(仅删除时间戳在当前数据之前的数据) + * + * @param hostInfoDTO 主机信息 + */ + void deleteHostBeforeLastTime(ApplicationHostDTO hostInfoDTO); long countHostsByOsType(String osType); @@ -242,4 +245,18 @@ List listHostByAppTopologyNodes(String username, * @return 更新成功的条数 */ int updateHostsStatus(List simpleHostList); + + /** + * 查询所有主机基础信息 + * + * @return 主机基础信息 + */ + List listAllBasicHost(); + + /** + * 根据主机基础信息批量删除主机 + * + * @return 成功删除的主机数量 + */ + int deleteByBasicHost(List basicHostList); } diff --git a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/host/impl/HostServiceImpl.java b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/host/impl/HostServiceImpl.java index 3ae730bbcf..f2501bf030 100644 --- a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/host/impl/HostServiceImpl.java +++ b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/host/impl/HostServiceImpl.java @@ -45,6 +45,7 @@ import com.tencent.bk.job.common.model.dto.AppResourceScope; import com.tencent.bk.job.common.model.dto.ApplicationDTO; import com.tencent.bk.job.common.model.dto.ApplicationHostDTO; +import com.tencent.bk.job.common.model.dto.BasicHostDTO; import com.tencent.bk.job.common.model.dto.DynamicGroupWithHost; import com.tencent.bk.job.common.model.dto.HostDTO; import com.tencent.bk.job.common.model.dto.HostSimpleDTO; @@ -77,9 +78,9 @@ import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; -import org.jooq.exception.DataAccessException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import org.springframework.util.StopWatch; import java.util.ArrayList; @@ -134,11 +135,6 @@ public HostServiceImpl(ApplicationHostDAO applicationHostDAO, this.i18nService = i18nService; } - @Override - public boolean existHost(long bizId, String ip) { - return applicationHostDAO.existsHost(bizId, ip); - } - @Override public List getHostsByAppId(Long appId) { ApplicationDTO applicationDTO = applicationService.getAppByAppId(appId); @@ -150,119 +146,48 @@ public List getHostsByAppId(Long appId) { } } - private boolean insertOrUpdateOneAppHost(Long bizId, ApplicationHostDTO infoDTO) { - try { - applicationHostDAO.insertOrUpdateHost(infoDTO); - hostCache.addOrUpdateHost(infoDTO); - } catch (Throwable t) { - log.error(String.format("insertHost fail:bizId=%d,hostInfo=%s", bizId, infoDTO), t); - return false; - } - return true; - } - @Override - public List insertHostsToBiz(Long bizId, List insertList) { - StopWatch watch = new StopWatch(); - // 插入主机 - watch.start("insertAppHostInfo"); - List insertFailHostIds = new ArrayList<>(); - boolean batchInserted = false; - try { - //尝试批量插入 - if (!insertList.isEmpty()) { - int affectedNum = applicationHostDAO.batchInsertAppHostInfo(insertList); - log.info("{} hosts inserted", affectedNum); - insertList.forEach(hostCache::addOrUpdateHost); - } - batchInserted = true; - } catch (Throwable throwable) { - if (throwable instanceof DataAccessException) { - String errorMessage = throwable.getMessage(); - if (errorMessage.contains("Duplicate entry") && errorMessage.contains("PRIMARY")) { - log.info("Fail to batchInsertAppHostInfo, try to insert one by one"); - } else { - log.warn("Fail to batchInsertAppHostInfo, try to insert one by one.", throwable); - } - } else { - log.warn("Fail to batchInsertAppHostInfo, try to insert one by one..", throwable); - } - //批量插入失败,尝试逐条插入 - for (ApplicationHostDTO infoDTO : insertList) { - if (!insertOrUpdateOneAppHost(bizId, infoDTO)) { - insertFailHostIds.add(infoDTO.getHostId()); - } - } + public int batchInsertHosts(List insertList) { + if (CollectionUtils.isEmpty(insertList)) { + return 0; } + StopWatch watch = new StopWatch(); + watch.start("batchInsertHost"); + // 批量插入主机 + int affectedNum = applicationHostDAO.batchInsertHost(insertList); + log.info("{} hosts inserted", affectedNum); + insertList.forEach(hostCache::addOrUpdateHost); watch.stop(); - if (!batchInserted) { - watch.start("log insertAppHostInfo"); - if (!insertFailHostIds.isEmpty()) { - log.warn(String.format("appId=%s,insertFailHostIds.size=%d,insertFailHostIds=%s", - bizId, insertFailHostIds.size(), String.join(",", - insertFailHostIds.stream().map(Object::toString).collect(Collectors.toSet())))); - } - watch.stop(); - } - log.debug("Performance:insertHostsToApp:appId={},{}", bizId, watch.prettyPrint()); - return insertFailHostIds; + log.debug("Performance:insertHosts:{}", watch.prettyPrint()); + return affectedNum; } @Override - public List updateHostsInBiz(Long bizId, List hostInfoList) { - StopWatch watch = new StopWatch(); - watch.start("updateAppHostInfo"); - // 更新主机 - long updateCount = 0L; - List updateHostIds = new ArrayList<>(); - long errorCount = 0L; - List errorHostIds = new ArrayList<>(); - boolean batchUpdated = false; - try { - // 尝试批量更新 - if (!hostInfoList.isEmpty()) { - int affectedNum = applicationHostDAO.batchUpdateBizHostInfoByHostId(hostInfoList); - log.info("{} hosts updated", affectedNum); - hostInfoList.forEach(hostCache::addOrUpdateHost); - } - batchUpdated = true; - } catch (Throwable throwable) { - if (throwable instanceof DataAccessException) { - String errorMessage = throwable.getMessage(); - if (errorMessage.contains("Duplicate entry") && errorMessage.contains("PRIMARY")) { - log.info("Fail to batchUpdateAppHostInfoByHostId, try to update one by one"); - } else { - log.warn("Fail to batchUpdateAppHostInfoByHostId, try to update one by one.", throwable); - } - } else { - log.warn("Fail to batchUpdateAppHostInfoByHostId, try to update one by one..", throwable); - } - // 批量更新失败,尝试逐条更新 - for (ApplicationHostDTO hostInfoDTO : hostInfoList) { - try { - if (!applicationHostDAO.existAppHostInfoByHostId(hostInfoDTO.getHostId())) { - applicationHostDAO.updateBizHostInfoByHostId(hostInfoDTO.getBizId(), hostInfoDTO); - hostCache.addOrUpdateHost(hostInfoDTO); - updateCount += 1; - updateHostIds.add(hostInfoDTO.getHostId()); - } - } catch (Throwable t) { - log.error(String.format("updateHost fail:appId=%d,hostInfo=%s", bizId, hostInfoDTO), t); - errorCount += 1; - errorHostIds.add(hostInfoDTO.getHostId()); - } - } + public int batchUpdateHostsBeforeLastTime(List hostInfoList) { + if (CollectionUtils.isEmpty(hostInfoList)) { + return 0; } + StopWatch watch = new StopWatch(); + watch.start("batchUpdateHostsBeforeLastTime to DB"); + // 批量更新主机 + int affectedNum = applicationHostDAO.batchUpdateHostsBeforeLastTime(hostInfoList); + log.info("try to update {} hosts, {} updated", hostInfoList.size(), affectedNum); watch.stop(); - if (!batchUpdated) { - watch.start("log updateAppHostInfo"); - log.info("Update host of appId={},errorCount={}," + - "updateCount={},errorHostIds={},updateHostIds={}", - bizId, errorCount, updateCount, errorHostIds, updateHostIds); - watch.stop(); + watch.start("listHostInfoByHostIds"); + List hostIds = hostInfoList.stream().map(ApplicationHostDTO::getHostId).collect(Collectors.toList()); + hostInfoList = applicationHostDAO.listHostInfoByHostIds(hostIds); + watch.stop(); + watch.start("updateHostToCache"); + hostInfoList.forEach(hostCache::addOrUpdateHost); + watch.stop(); + if (watch.getTotalTimeMillis() < 10_000L) { + log.debug("Performance:batchUpdateHostsBeforeLastTime:{}", watch.prettyPrint()); + } else if (watch.getTotalTimeMillis() < 60_000L) { + log.info("Performance:batchUpdateHostsBeforeLastTime:{}", watch.prettyPrint()); + } else { + log.warn("Performance:batchUpdateHostsBeforeLastTime:{}", watch.prettyPrint()); } - log.debug("Performance:updateHostsInApp:appId={},{}", bizId, watch.prettyPrint()); - return errorHostIds; + return affectedNum; } @Override @@ -305,20 +230,79 @@ public int updateHostsStatus(List simpleHostList) { return updateCount; } + @Transactional @Override - public List removeHostsFromBiz(Long bizId, List hostIdList) { - StopWatch watch = new StopWatch(); - watch.start("deleteHostTopoOfBiz"); - List deleteFailHostIds = new ArrayList<>(); - // 删除业务与主机的关系 - hostTopoDAO.batchDeleteHostTopo(bizId, hostIdList); - watch.stop(); - watch.start("syncHostTopo"); - // 同步主机关系到host表 - hostIdList.forEach(applicationHostDAO::syncHostTopo); - watch.stop(); - log.debug("Performance:removeHostsFromBiz:bizId={},{}", bizId, watch.prettyPrint()); - return deleteFailHostIds; + public void createOrUpdateHostBeforeLastTime(ApplicationHostDTO hostInfoDTO) { + try { + if (applicationHostDAO.existAppHostInfoByHostId(hostInfoDTO.getHostId())) { + // 只更新事件中的主机属性与agent状态 + applicationHostDAO.updateHostAttrsBeforeLastTime(hostInfoDTO); + } else { + hostInfoDTO.setBizId(JobConstants.PUBLIC_APP_ID); + int affectedNum = applicationHostDAO.insertHostWithoutTopo(hostInfoDTO); + log.info("insert host: id={}, affectedNum={}", hostInfoDTO.getHostId(), affectedNum); + } + } catch (Throwable t) { + log.error("handle host event fail", t); + } finally { + // 从拓扑表向主机表同步拓扑数据 + int affectedNum = applicationHostDAO.syncHostTopo(hostInfoDTO.getHostId()); + log.info("hostTopo synced: hostId={}, affectedNum={}", hostInfoDTO.getHostId(), affectedNum); + } + // 更新缓存 + updateHostCache(hostInfoDTO); + } + + @Override + public void deleteHostBeforeLastTime(ApplicationHostDTO hostInfoDTO) { + int affectedRowNum = applicationHostDAO.deleteHostBeforeLastTime( + null, + hostInfoDTO.getHostId(), + hostInfoDTO.getLastTime() + ); + log.info( + "{} host deleted, id={} ,ip={}", + affectedRowNum, + hostInfoDTO.getHostId(), + hostInfoDTO.getIp() + ); + if (affectedRowNum > 0) { + hostCache.deleteHost(hostInfoDTO); + } + } + + private void updateHostCache(ApplicationHostDTO hostInfoDTO) { + hostInfoDTO = applicationHostDAO.getHostById(hostInfoDTO.getHostId()); + if (hostInfoDTO.getBizId() != null && hostInfoDTO.getBizId() > 0) { + // 只更新常规业务的主机到缓存 + if (applicationService.existBiz(hostInfoDTO.getBizId())) { + hostCache.addOrUpdateHost(hostInfoDTO); + log.info("host cache updated: hostId:{}", hostInfoDTO.getHostId()); + } + } + } + + @Override + public List listAllBasicHost() { + return applicationHostDAO.listAllBasicHost(); + } + + @Override + public int deleteByBasicHost(List basicHostList) { + if (CollectionUtils.isEmpty(basicHostList)) { + return 0; + } + List hostIdList = basicHostList.stream().map(BasicHostDTO::getHostId).collect(Collectors.toList()); + // 先查出主机信息用于更新缓存 + List hostList = applicationHostDAO.listHostInfoByHostIds(hostIdList); + // 从DB删除 + int deletedNum = applicationHostDAO.deleteByBasicHost(basicHostList); + // 删除缓存 + hostCache.batchDeleteHost(hostList); + hostList = applicationHostDAO.listHostInfoByHostIds(hostIdList); + // 未成功从DB删除的主机重新加入缓存 + hostCache.addOrUpdateHosts(hostList); + return deletedNum; } @Override @@ -699,7 +683,8 @@ public void fillHostInfo(Long bizId, CcTopologyNodeVO topologyTree, boolean upda for (int i = 0; i < hostInfoVOList.size(); i++) { ApplicationHostDTO host = dbHosts.get(i); HostInfoVO hostInfoVO = hostInfoVOList.get(i); - host.getModuleId().forEach(moduleId -> { + List moduleIdList = hostTopoDAO.listModuleIdByHostId(host.getHostId()); + moduleIdList.forEach(moduleId -> { CcTopologyNodeVO moduleNode = map.get(moduleId); if (moduleNode == null) { log.warn("cannot find moduleNode in topoTree, cache may expire, ignore this moduleNode"); diff --git a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/AppHostsUpdateHelper.java b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/AppHostsUpdateHelper.java index 896bf5b5f7..f3d9f6aeef 100644 --- a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/AppHostsUpdateHelper.java +++ b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/AppHostsUpdateHelper.java @@ -112,7 +112,7 @@ public void startToUpdateAppHosts(Long appId) { heartBeatThreadMap.put(Thread.currentThread().getName(), appHostUpdateRedisKeyHeartBeatThread); } - public void endToUpdateBizHosts(Long appId) { + public void endToUpdateBizHosts() { String key = Thread.currentThread().getName(); RedisKeyHeartBeatThread heartBeatThread = heartBeatThreadMap.get(key); if (heartBeatThread != null) { diff --git a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/EventsHandler.java b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/EventsHandler.java index 2f1966470a..e5464313d2 100644 --- a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/EventsHandler.java +++ b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/EventsHandler.java @@ -45,7 +45,6 @@ public abstract class EventsHandler extends Thread { private final CmdbEventSampler cmdbEventSampler; protected boolean enabled = true; BlockingQueue> queue; - Long bizId = null; public EventsHandler(BlockingQueue> queue, Tracer tracer, @@ -55,17 +54,11 @@ public EventsHandler(BlockingQueue> queue, this.cmdbEventSampler = cmdbEventSampler; } - public Long getBizId() { - return bizId; - } - - public void commitEvent(Long bizId, ResourceEvent event) { + public void commitEvent(ResourceEvent event) { try { boolean result = this.queue.add(event); if (!result) { log.warn("Fail to commitEvent:{}", event); - } else { - this.bizId = bizId; } } catch (Exception e) { log.warn("Fail to commitEvent:" + event, e); @@ -118,10 +111,6 @@ public void run() { log.warn("queue.take interrupted", e); } catch (Throwable t) { log.warn("Fail to handleOneEvent:" + event, t); - } finally { - if (queue.size() == 0) { - this.bizId = null; - } } } } diff --git a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/HostEventHandler.java b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/HostEventHandler.java index f7f0fa0102..cfa97ea223 100644 --- a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/HostEventHandler.java +++ b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/HostEventHandler.java @@ -27,16 +27,13 @@ import com.tencent.bk.job.common.cc.model.req.ResourceWatchReq; import com.tencent.bk.job.common.cc.model.result.HostEventDetail; import com.tencent.bk.job.common.cc.model.result.ResourceEvent; -import com.tencent.bk.job.common.constant.JobConstants; import com.tencent.bk.job.common.gse.service.AgentStateClient; import com.tencent.bk.job.common.gse.v2.model.resp.AgentState; import com.tencent.bk.job.common.model.dto.ApplicationHostDTO; import com.tencent.bk.job.common.util.json.JsonUtils; -import com.tencent.bk.job.manage.dao.ApplicationHostDAO; -import com.tencent.bk.job.manage.manager.host.HostCache; import com.tencent.bk.job.manage.metrics.CmdbEventSampler; import com.tencent.bk.job.manage.metrics.MetricsConstants; -import com.tencent.bk.job.manage.service.ApplicationService; +import com.tencent.bk.job.manage.service.host.HostService; import io.micrometer.core.instrument.Tags; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -47,23 +44,17 @@ @Slf4j public class HostEventHandler extends EventsHandler { - private final ApplicationService applicationService; - private final ApplicationHostDAO applicationHostDAO; + private final HostService hostService; private final AgentStateClient agentStateClient; - private final HostCache hostCache; HostEventHandler(Tracer tracer, CmdbEventSampler cmdbEventSampler, BlockingQueue> queue, - ApplicationService applicationService, - ApplicationHostDAO applicationHostDAO, - AgentStateClient agentStateClient, - HostCache hostCache) { + HostService hostService, + AgentStateClient agentStateClient) { super(queue, tracer, cmdbEventSampler); - this.applicationService = applicationService; - this.applicationHostDAO = applicationHostDAO; + this.hostService = hostService; this.agentStateClient = agentStateClient; - this.hostCache = hostCache; } @Override @@ -86,10 +77,21 @@ private void handleOneEventRelatedToApp(ResourceEvent event) { handleOneEventIndeed(event); } + private void setDefaultLastTimeForHostIfNeed(ResourceEvent event, ApplicationHostDTO hostInfoDTO) { + if (hostInfoDTO.getLastTime() == null || hostInfoDTO.getLastTime() < 0) { + log.warn( + "HostEvent lastTime is invalid({}), use event create time({}) to update host", + hostInfoDTO.getLastTime(), + event.getCreateTime() + ); + hostInfoDTO.setLastTime(event.getCreateTime()); + } + } private void handleOneEventIndeed(ResourceEvent event) { String eventType = event.getEventType(); ApplicationHostDTO hostInfoDTO = HostEventDetail.toHostInfoDTO(event.getDetail()); + setDefaultLastTimeForHostIfNeed(event, hostInfoDTO); switch (eventType) { case ResourceWatchReq.EVENT_TYPE_CREATE: case ResourceWatchReq.EVENT_TYPE_UPDATE: @@ -100,13 +102,11 @@ private void handleOneEventIndeed(ResourceEvent event) { } // 找出Agent有效的IP,并设置Agent状态 updateIpAndAgentStatus(hostInfoDTO); - // 更新DB中的主机数据 - createOrUpdateHostInDB(hostInfoDTO); - // 更新缓存中的主机数据 - updateHostCache(hostInfoDTO); + // 更新DB与缓存中的主机数据 + hostService.createOrUpdateHostBeforeLastTime(hostInfoDTO); break; case ResourceWatchReq.EVENT_TYPE_DELETE: - handleHostDelete(hostInfoDTO); + hostService.deleteHostBeforeLastTime(hostInfoDTO); break; default: break; @@ -122,44 +122,4 @@ private void updateIpAndAgentStatus(ApplicationHostDTO hostInfoDTO) { } } - private void createOrUpdateHostInDB(ApplicationHostDTO hostInfoDTO) { - try { - if (applicationHostDAO.existAppHostInfoByHostId(hostInfoDTO.getHostId())) { - // 只更新事件中的主机属性与agent状态 - applicationHostDAO.updateHostAttrsById(hostInfoDTO); - } else { - hostInfoDTO.setBizId(JobConstants.PUBLIC_APP_ID); - int affectedNum = applicationHostDAO.insertHostWithoutTopo(hostInfoDTO); - log.info("insert host: id={}, affectedNum={}", hostInfoDTO.getHostId(), affectedNum); - } - } catch (Throwable t) { - log.error("handle host event fail", t); - } finally { - // 从拓扑表向主机表同步拓扑数据 - int affectedNum = applicationHostDAO.syncHostTopo(hostInfoDTO.getHostId()); - log.info("hostTopo synced: hostId={}, affectedNum={}", hostInfoDTO.getHostId(), affectedNum); - } - } - - private void updateHostCache(ApplicationHostDTO hostInfoDTO) { - hostInfoDTO = applicationHostDAO.getHostById(hostInfoDTO.getHostId()); - if (hostInfoDTO.getBizId() != null && hostInfoDTO.getBizId() > 0) { - // 只更新常规业务的主机到缓存 - if (applicationService.existBiz(hostInfoDTO.getBizId())) { - hostCache.addOrUpdateHost(hostInfoDTO); - log.info("host cache updated: hostId:{}", hostInfoDTO.getHostId()); - } - } - } - - private void handleHostDelete(ApplicationHostDTO hostInfoDTO) { - int affectedRowNum = applicationHostDAO.deleteBizHostInfoById(null, hostInfoDTO.getHostId()); - log.info( - "{} host deleted, id={} ,ip={}", - affectedRowNum, - hostInfoDTO.getHostId(), - hostInfoDTO.getIp() - ); - hostCache.deleteHost(hostInfoDTO); - } } diff --git a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/HostEventWatcher.java b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/HostEventWatcher.java index 22da40c6ee..b28aa0277e 100644 --- a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/HostEventWatcher.java +++ b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/HostEventWatcher.java @@ -29,13 +29,10 @@ import com.tencent.bk.job.common.cc.model.result.ResourceWatchResult; import com.tencent.bk.job.common.cc.sdk.BizCmdbClient; import com.tencent.bk.job.common.gse.service.AgentStateClient; -import com.tencent.bk.job.common.model.dto.ApplicationHostDTO; import com.tencent.bk.job.manage.config.JobManageConfig; -import com.tencent.bk.job.manage.dao.ApplicationHostDAO; -import com.tencent.bk.job.manage.manager.host.HostCache; import com.tencent.bk.job.manage.metrics.CmdbEventSampler; import com.tencent.bk.job.manage.metrics.MetricsConstants; -import com.tencent.bk.job.manage.service.ApplicationService; +import com.tencent.bk.job.manage.service.host.HostService; import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.Tags; import lombok.extern.slf4j.Slf4j; @@ -60,10 +57,8 @@ public class HostEventWatcher extends AbstractCmdbResourceEventWatcher redisTemplate, Tracer tracer, CmdbEventSampler cmdbEventSampler, BizCmdbClient bizCmdbClient, - ApplicationService applicationService, - ApplicationHostDAO applicationHostDAO, + HostService hostService, AgentStateClient agentStateClient, - HostCache hostCache, JobManageConfig jobManageConfig) { super("host", redisTemplate, tracer, cmdbEventSampler); this.tracer = tracer; this.cmdbEventSampler = cmdbEventSampler; this.bizCmdbClient = bizCmdbClient; - this.applicationService = applicationService; - this.applicationHostDAO = applicationHostDAO; + this.hostService = hostService; this.agentStateClient = agentStateClient; - this.hostCache = hostCache; this.eventsHandlerNum = jobManageConfig.getHostEventHandlerNum(); } @@ -163,10 +154,8 @@ private HostEventHandler buildHostEventHandler(BlockingQueue event) { - ApplicationHostDTO hostInfoDTO = HostEventDetail.toHostInfoDTO(event.getDetail()); - Long hostId = hostInfoDTO.getHostId(); - ApplicationHostDTO oldHostInfoDTO = applicationHostDAO.getHostById(hostId); - HostEventHandler eventsHandler = chooseHandler(hostId); - eventsHandler.commitEvent(oldHostInfoDTO == null ? null : oldHostInfoDTO.getBizId(), event); + HostEventHandler eventsHandler = chooseHandler(event.getDetail().getHostId()); + eventsHandler.commitEvent(event); } } diff --git a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/HostRelationEventHandler.java b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/HostRelationEventHandler.java index bfa709100f..d6dac22671 100644 --- a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/HostRelationEventHandler.java +++ b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/HostRelationEventHandler.java @@ -27,6 +27,7 @@ import com.tencent.bk.job.common.cc.model.req.ResourceWatchReq; import com.tencent.bk.job.common.cc.model.result.HostRelationEventDetail; import com.tencent.bk.job.common.cc.model.result.ResourceEvent; +import com.tencent.bk.job.common.constant.JobConstants; import com.tencent.bk.job.common.model.dto.ApplicationHostDTO; import com.tencent.bk.job.common.util.json.JsonUtils; import com.tencent.bk.job.manage.dao.ApplicationHostDAO; @@ -96,30 +97,56 @@ private void handleOneEvent(ResourceEvent event) { } } + private void setDefaultLastTimeForHostTopoIfNeed(ResourceEvent event, + HostTopoDTO hostTopoDTO) { + if (hostTopoDTO.getLastTime() == null || hostTopoDTO.getLastTime() < 0) { + hostTopoDTO.setLastTime(event.getCreateTime()); + } + } + private void handleOneEventIndeed(ResourceEvent event) { String eventType = event.getEventType(); HostTopoDTO hostTopoDTO = HostTopoDTO.fromHostRelationEvent(event.getDetail()); + setDefaultLastTimeForHostTopoIfNeed(event, hostTopoDTO); switch (eventType) { case ResourceWatchReq.EVENT_TYPE_CREATE: // 插入拓扑数据 - hostTopoDAO.insertHostTopo(hostTopoDTO); + int insertedHostTopoNum = hostTopoDAO.insertHostTopo(hostTopoDTO); // 同步拓扑数据至主机表冗余字段 - updateTopoToHost(hostTopoDTO); + int affectedHostNum = updateTopoToHost(hostTopoDTO); // 更新主机缓存 - updateHostCacheWhenRelCreated(hostTopoDTO); + boolean cacheUpdated = updateOrDeleteHostCache(hostTopoDTO); + log.info( + "create event handle result: insertedHostTopoNum={}, affectedHostNum={}, cacheUpdated={}", + insertedHostTopoNum, + affectedHostNum, + cacheUpdated + ); break; case ResourceWatchReq.EVENT_TYPE_DELETE: // 删除拓扑数据 - hostTopoDAO.deleteHostTopo( + int deletedHostTopoNum = hostTopoDAO.deleteHostTopoBeforeLastTime( hostTopoDTO.getHostId(), hostTopoDTO.getBizId(), hostTopoDTO.getSetId(), - hostTopoDTO.getModuleId() + hostTopoDTO.getModuleId(), + hostTopoDTO.getLastTime() ); - // 同步拓扑数据至主机表冗余字段 - updateTopoToHost(hostTopoDTO); - // 更新主机缓存 - updateHostCacheWhenRelationDeleted(hostTopoDTO); + if (deletedHostTopoNum > 0) { + // 同步拓扑数据至主机表冗余字段 + int deleteEventAffectedHostNum = updateTopoToHost(hostTopoDTO); + // 更新主机缓存 + boolean deleteEventCacheUpdated = updateOrDeleteHostCache(hostTopoDTO); + log.info( + "delete event handle result: deletedHostTopoNum={}, deleteEventAffectedHostNum={}," + + " deleteEventCacheUpdated={}", + deletedHostTopoNum, + deleteEventAffectedHostNum, + deleteEventCacheUpdated + ); + } else { + log.warn("no hostTopo deleted, delete event may expire for long time"); + } break; default: break; @@ -130,57 +157,45 @@ private void handleOneEventIndeed(ResourceEvent event) * 将主机拓扑表中的拓扑数据同步至主机表 * * @param hostTopoDTO 主机拓扑信息 + * @return 受影响的主机数量 */ - private void updateTopoToHost(HostTopoDTO hostTopoDTO) { + private int updateTopoToHost(HostTopoDTO hostTopoDTO) { // 若主机存在需将拓扑信息同步至主机信息冗余字段 - int affectedNum = applicationHostDAO.syncHostTopo(hostTopoDTO.getHostId()); - if (affectedNum > 0) { - log.info("host topo synced: affectedNum={}", affectedNum); - } else if (affectedNum == 0) { + int affectedHostNum = applicationHostDAO.syncHostTopo(hostTopoDTO.getHostId()); + if (affectedHostNum > 0) { + log.info("host topo synced: affectedHostNum={}", affectedHostNum); + } else if (affectedHostNum == 0) { log.info("no host topo synced"); } else { log.warn("cannot find hostInfo by hostId:{}, wait for host event or sync", hostTopoDTO.getHostId()); } + return affectedHostNum; } /** - * 当主机关系被创建时,更新缓存中的主机信息 + * 更新或删除缓存中的主机信息 * * @param hostTopoDTO 主机拓扑信息 + * @return 是否执行了更新/删除缓存动作 */ - private void updateHostCacheWhenRelCreated(HostTopoDTO hostTopoDTO) { - ApplicationHostDTO host = applicationHostDAO.getHostById(hostTopoDTO.getHostId()); - if (host != null && applicationService.existBiz(host.getBizId())) { - hostCache.addOrUpdateHost(host); - log.info("host cached updated: hostId={}", host.getHostId()); - } - } - - /** - * 当主机关系被删除时,更新缓存中的主机信息 - * - * @param hostTopoDTO 主机拓扑信息 - */ - private void updateHostCacheWhenRelationDeleted(HostTopoDTO hostTopoDTO) { + private boolean updateOrDeleteHostCache(HostTopoDTO hostTopoDTO) { ApplicationHostDTO host = applicationHostDAO.getHostById(hostTopoDTO.getHostId()); if (host == null) { - return; + log.info("host already deleted by others: hostId={}, ignore", hostTopoDTO.getHostId()); + return false; } - int curAppRelationCount = hostTopoDAO.countHostTopo(hostTopoDTO.getBizId(), hostTopoDTO.getHostId()); - int hostRelationCount = hostTopoDAO.countHostTopo(null, hostTopoDTO.getHostId()); - if (curAppRelationCount != 0) { - return; - } - if (hostRelationCount == 0) { - // 主机被移除 + if (host.getBizId() == JobConstants.PUBLIC_APP_ID) { hostCache.deleteHost(host); log.info("host cached deleted: hostId={}", host.getHostId()); + return true; + } + if (applicationService.existBiz(host.getBizId())) { + hostCache.addOrUpdateHost(host); + log.info("host cached updated: hostId={}", host.getHostId()); } else { - // 主机被转移到其他业务下 - if (applicationService.existBiz(host.getBizId())) { - hostCache.addOrUpdateHost(host); - log.info("host cached updated: hostId={}", host.getHostId()); - } + hostCache.deleteHost(host); + log.info("host biz({}) not exist, host cached deleted: hostId={}", host.getBizId(), host.getHostId()); } + return true; } } diff --git a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/HostRelationEventWatcher.java b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/HostRelationEventWatcher.java index 3be788e2ff..3769c0f41a 100644 --- a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/HostRelationEventWatcher.java +++ b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/HostRelationEventWatcher.java @@ -155,8 +155,6 @@ public void setWatchFlag(boolean value) { } private void dispatchEventToHandler(ResourceEvent event) { - HostTopoDTO hostTopoDTO = HostTopoDTO.fromHostRelationEvent(event.getDetail()); - Long appId = hostTopoDTO.getBizId(); - eventsHandler.commitEvent(appId, event); + eventsHandler.commitEvent(event); } } diff --git a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/HostSyncService.java b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/HostSyncService.java index 62b62ea2e3..57ad7d6077 100644 --- a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/HostSyncService.java +++ b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/HostSyncService.java @@ -25,24 +25,32 @@ package com.tencent.bk.job.manage.service.impl.sync; import com.tencent.bk.job.common.cc.model.CcInstanceDTO; -import com.tencent.bk.job.common.cc.sdk.CmdbClientFactory; -import com.tencent.bk.job.common.cc.sdk.IBizCmdbClient; +import com.tencent.bk.job.common.cc.model.result.HostProp; +import com.tencent.bk.job.common.cc.model.result.HostWithModules; +import com.tencent.bk.job.common.cc.model.result.ModuleProp; +import com.tencent.bk.job.common.cc.sdk.BizCmdbClient; import com.tencent.bk.job.common.constant.CcNodeTypeEnum; import com.tencent.bk.job.common.model.dto.ApplicationDTO; import com.tencent.bk.job.common.model.dto.ApplicationHostDTO; import com.tencent.bk.job.common.model.dto.BasicHostDTO; import com.tencent.bk.job.common.util.StringUtil; +import com.tencent.bk.job.common.util.TimeUtil; import com.tencent.bk.job.manage.dao.ApplicationHostDAO; +import com.tencent.bk.job.manage.dao.HostTopoDAO; +import com.tencent.bk.job.manage.model.dto.HostTopoDTO; import com.tencent.bk.job.manage.service.host.HostService; -import com.tencent.bk.job.manage.service.impl.agent.AgentStatusService; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.tuple.Triple; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.util.Pair; import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; import org.springframework.util.StopWatch; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -57,237 +65,603 @@ public class HostSyncService { private final AppHostsUpdateHelper appHostsUpdateHelper; private final ApplicationHostDAO applicationHostDAO; + private final HostTopoDAO hostTopoDAO; private final HostService hostService; - private final AgentStatusService agentStatusService; + private final BizCmdbClient bizCmdbClient; @Autowired public HostSyncService(AppHostsUpdateHelper appHostsUpdateHelper, ApplicationHostDAO applicationHostDAO, + HostTopoDAO hostTopoDAO, HostService hostService, - AgentStatusService agentStatusService) { + BizCmdbClient bizCmdbClient) { this.appHostsUpdateHelper = appHostsUpdateHelper; this.applicationHostDAO = applicationHostDAO; + this.hostTopoDAO = hostTopoDAO; this.hostService = hostService; - this.agentStatusService = agentStatusService; + this.bizCmdbClient = bizCmdbClient; } - private List getHostsByAppInfo(IBizCmdbClient bizCmdbClient, ApplicationDTO applicationDTO) { + private List getHostRelationsFromCmdb(ApplicationDTO bizApp) { List ccInstanceDTOList = new ArrayList<>(); - ccInstanceDTOList.add(new CcInstanceDTO(CcNodeTypeEnum.BIZ.getType(), applicationDTO.getBizIdIfBizApp())); - List hosts = bizCmdbClient.getHosts(applicationDTO.getBizIdIfBizApp(), - ccInstanceDTOList); - // 获取Agent状态 - agentStatusService.fillRealTimeAgentStatus(hosts); - return hosts; + ccInstanceDTOList.add(new CcInstanceDTO(CcNodeTypeEnum.BIZ.getType(), bizApp.getBizIdIfBizApp())); + return bizCmdbClient.getHostRelationsByTopology( + bizApp.getBizIdIfBizApp(), + ccInstanceDTOList + ); } - private List computeInsertList( + private List computeInsertHostList( Long bizId, - Set localBizHostIds, - List applicationHostDTOList + Set localHostIds, + List hostPropList, + long cmdbHostsFetchTimeMills ) { - StopWatch watch = new StopWatch(); - List insertList = - applicationHostDTOList.stream().filter(ApplicationHostDTO -> - !localBizHostIds.contains(ApplicationHostDTO.getHostId())).collect(Collectors.toList()); - watch.start("log insertList"); - log.info(String.format("bizId=%s,insertHostIds=%s", bizId, String.join(",", - insertList.stream().map(ApplicationHostDTO::getHostId).map(Object::toString) - .collect(Collectors.toSet())))); - watch.stop(); - if (watch.getTotalTimeMillis() > 1000) { - log.warn("Write log too slow, {}", watch.prettyPrint()); - } - return insertList; + List insertHostIdList = new ArrayList<>(); + List insertHostList = hostPropList.stream() + .filter(hostProp -> !localHostIds.contains(hostProp.getHostId())) + .map(hostProp -> { + ApplicationHostDTO hostDTO = hostProp.toApplicationHostDTO(); + Long lastTime = hostDTO.getLastTime(); + if (lastTime == null || lastTime < 0) { + hostDTO.setLastTime(cmdbHostsFetchTimeMills); + log.warn( + "cmdbHostLastTime({}) is invalid, use cmdbHostsFetchTimeMills({}) for insert, host={}", + lastTime, + cmdbHostsFetchTimeMills, + hostDTO + ); + } + hostDTO.setBizId(bizId); + insertHostIdList.add(hostProp.getHostId()); + return hostDTO; + }).collect(Collectors.toList()); + log.info( + "bizId={}, insertHostIdList.size={}, insertHostIdList={}", + bizId, + insertHostIdList.size(), + insertHostIdList + ); + return insertHostList; } - private List computeUpdateList( + private List computeUpdateHostList( Long bizId, - Set localBizHostIds, - List localBizHosts, - List applicationHostDTOList, + Set localHostIds, + List localHosts, + List hostPropList, long cmdbHostsFetchTimeMills ) { + Map localHostsIdMap = new HashMap<>(); + for (BasicHostDTO localHost : localHosts) { + localHostsIdMap.put(localHost.getHostId(), localHost); + } + List updateHostIdList = new ArrayList<>(); + List updateHostList = hostPropList.stream().filter(hostProp -> + needToUpdate(hostProp, localHostIds, localHostsIdMap, cmdbHostsFetchTimeMills) + ).map(hostProp -> { + ApplicationHostDTO hostDTO = hostProp.toApplicationHostDTO(); + Long lastTime = hostDTO.getLastTime(); + if (lastTime == null || lastTime < 0) { + hostDTO.setLastTime(cmdbHostsFetchTimeMills); + log.warn( + "cmdbHostLastTime({}) is invalid, use cmdbHostsFetchTimeMills({}) for update, host={}", + lastTime, + cmdbHostsFetchTimeMills, + hostDTO + ); + } + hostDTO.setBizId(bizId); + updateHostIdList.add(hostProp.getHostId()); + return hostDTO; + }).collect(Collectors.toList()); + log.info( + "bizId={}, updateHostIdList.size={}, updateHostIdList={}", + bizId, + updateHostIdList.size(), + updateHostIdList + ); + return updateHostList; + } + + /** + * 判断主机信息是否需要更新 + * + * @param hostProp 从CMDB获取的主机信息 + * @param localHostIds 本地主机ID列表 + * @param localHostsIdMap 本地主机ID与主机信息映射表 + * @param cmdbHostsFetchTimeMills CMDB数据获取时间 + * @return 本地库中主机信息是否需要更新 + */ + private boolean needToUpdate(HostProp hostProp, + Set localHostIds, + Map localHostsIdMap, + long cmdbHostsFetchTimeMills) { + if (!localHostIds.contains(hostProp.getHostId())) { + return false; + } + Long lastTime = TimeUtil.parseZonedTime(hostProp.getLastTime()); + if (lastTime == null || lastTime < 0) { + lastTime = cmdbHostsFetchTimeMills; + log.warn( + "cmdbHostLastTime({}) is invalid, use cmdbHostsFetchTimeMills({}) to check whether update, host={}", + hostProp.getLastTime(), + cmdbHostsFetchTimeMills, + hostProp + ); + } + BasicHostDTO localHost = localHostsIdMap.get(hostProp.getHostId()); + if (localHost.getLastTime() >= lastTime) { + log.debug( + "local host(hostId={}, lastTime={}) is not older than target host last time({}), ignore update", + localHost.getHostId(), + localHost.getLastTime(), + cmdbHostsFetchTimeMills + ); + return false; + } else { + log.info( + "local host(hostId={}, lastTime={}) is older than target host last time({}), need to update", + localHost.getHostId(), + localHost.getLastTime(), + cmdbHostsFetchTimeMills + ); + return true; + } + } + + /** + * 使用从CMDB获取的单个业务下的主机、主机关系去更新DB中的对应数据 + * + * @param bizId CMDB业务ID + * @param hostWithModulesList 主机与模块信息 + * @param cmdbHostsFetchTimeMills CMDB主机信息获取时间 + * @return CMDB主机ID集合 + */ + private Set refreshBizHostAndRelations(Long bizId, + List hostWithModulesList, + long cmdbHostsFetchTimeMills) { StopWatch watch = new StopWatch(); - Map localBizHostsIdMap = new HashMap<>(); - for (BasicHostDTO localBizHost : localBizHosts) { - localBizHostsIdMap.put(localBizHost.getHostId(), localBizHost); + // CMDB数据拆分 + Set cmdbHostIds = new HashSet<>(); + Set cmdbBasicHosts = new HashSet<>(); + List hostPropList = new ArrayList<>(); + for (HostWithModules hostWithModules : hostWithModulesList) { + HostProp hostProp = hostWithModules.getHost(); + if (hostProp != null) { + hostPropList.add(hostProp); + cmdbHostIds.add(hostProp.getHostId()); + BasicHostDTO cmdbBasicHost = new BasicHostDTO( + hostProp.getHostId(), + TimeUtil.parseZonedTime(hostProp.getLastTime()) + ); + cmdbBasicHosts.add(cmdbBasicHost); + } + } + // 本地数据:主机 + List localHostList = applicationHostDAO.listBasicHostInfo(cmdbHostIds); + Set localHostIds = localHostList.stream().map(BasicHostDTO::getHostId).collect(Collectors.toSet()); + + logCmdbHostIds(watch, bizId, cmdbHostIds); + logLocalHostIds(watch, bizId, localHostIds); + + // 本地数据:主机关系 + List localHostTopoList = hostTopoDAO.listHostTopoByHostIds(cmdbHostIds); + + // 对比CMDB数据与本地数据找出要新增的主机关系 + List insertHostTopoList = computeInsertHostTopoList( + bizId, + localHostTopoList, + hostWithModulesList, + cmdbHostsFetchTimeMills + ); + + // 对比CMDB数据与本地数据找出要更新的主机关系 + List updateHostTopoList = computeUpdateHostTopoList( + bizId, + localHostTopoList, + hostWithModulesList, + cmdbHostsFetchTimeMills + ); + + // 对比CMDB数据与本地数据找出要删除的主机关系 + List deleteHostTopoList = computeDeleteHostTopoList( + bizId, + localHostTopoList, + hostWithModulesList, + cmdbHostsFetchTimeMills + ); + + // 刷新主机关系数据 + Triple refreshHostTopoResult = refreshHostTopos( + insertHostTopoList, + updateHostTopoList, + deleteHostTopoList, + watch + ); + int insertedHostTopoNum = refreshHostTopoResult.getLeft(); + int updatedHostTopoNum = refreshHostTopoResult.getMiddle(); + int deletedHostTopoNum = refreshHostTopoResult.getRight(); + + // 对比CMDB数据与本地数据找出要新增的主机 + List insertHostList = computeInsertHostList( + bizId, + localHostIds, + hostPropList, + cmdbHostsFetchTimeMills + ); + + // 对比CMDB数据与本地数据找出要更新的主机 + List updateHostList = computeUpdateHostList( + bizId, + localHostIds, + localHostList, + hostPropList, + cmdbHostsFetchTimeMills + ); + + // 刷新主机数据 + Pair refreshHostResult = refreshHosts(insertHostList, updateHostList, watch); + int insertedHostNum = refreshHostResult.getFirst(); + int updatedHostNum = refreshHostResult.getSecond(); + + logRefreshResult( + bizId, + insertedHostTopoNum, + updatedHostTopoNum, + deletedHostTopoNum, + insertedHostNum, + updatedHostNum, + watch + ); + return cmdbBasicHosts; + } + + /** + * 根据本地主机拓扑数据与从CMDB获取的主机拓扑数据计算需要新增的拓扑数据 + * + * @param bizId 业务ID + * @param localHostTopoList 本地主机拓扑数据 + * @param hostWithModulesList 从CMDB获取的主机拓扑数据 + * @return 需要新增的主机拓扑数据 + */ + private List computeInsertHostTopoList(Long bizId, + List localHostTopoList, + List hostWithModulesList, + long cmdbHostsFetchTimeMills) { + List insertHostTopoList = new ArrayList<>(); + Set localTopoKeys = new HashSet<>(); + for (HostTopoDTO hostTopoDTO : localHostTopoList) { + localTopoKeys.add(buildTopoKey(hostTopoDTO)); } - List updateList = - applicationHostDTOList.stream().filter(ApplicationHostDTO -> { - if (!localBizHostIds.contains(ApplicationHostDTO.getHostId())) { - return false; + for (HostWithModules hostWithModules : hostWithModulesList) { + HostProp host = hostWithModules.getHost(); + List modules = hostWithModules.getModules(); + if (CollectionUtils.isEmpty(modules)) { + continue; + } + for (ModuleProp module : modules) { + if (module == null) { + continue; } - BasicHostDTO localHost = localBizHostsIdMap.get(ApplicationHostDTO.getHostId()); - if (localHost.getLastModifyTime() > cmdbHostsFetchTimeMills) { - log.info( - "local host(hostId={}) is newer({}) than target host fetch time({}), ignore update", - localHost.getHostId(), - localHost.getLastModifyTime(), - cmdbHostsFetchTimeMills + String topoKey = buildTopoKey(bizId, host, module); + if (localTopoKeys.contains(topoKey)) { + continue; + } + HostTopoDTO hostTopo = buildHostTopo(bizId, host, module); + Long lastTime = hostTopo.getLastTime(); + if (lastTime == null || lastTime < 0) { + hostTopo.setLastTime(cmdbHostsFetchTimeMills); + log.warn( + "cmdbHostTopoLastTime({}) is invalid, use cmdbHostsFetchTimeMills({}) for insert, hostTopo={}", + lastTime, + cmdbHostsFetchTimeMills, + hostTopo ); - return false; } - return true; - }).collect(Collectors.toList()); - watch.start("log updateList"); - Set updateHostIds = updateList.stream().map(ApplicationHostDTO::getHostId).collect(Collectors.toSet()); - log.info("bizId={},updateHostIds.size={},updateHostIds={}", + insertHostTopoList.add(hostTopo); + } + } + log.info( + "bizId={}, insertHostTopoList.size={}, insertHostTopoList={}", bizId, - updateHostIds.size(), - StringUtil.concatCollection(updateHostIds) + insertHostTopoList.size(), + insertHostTopoList ); - watch.stop(); - if (watch.getTotalTimeMillis() > 1000) { - log.warn("Write log too slow, {}", watch.prettyPrint()); - } - return updateList; + return insertHostTopoList; } - private List computeDeleteIdList( - Long bizId, - Set ccBizHostIds, - List localBizHosts, - long cmdbHostsFetchTimeMills - ) { - StopWatch watch = new StopWatch(); - List deleteIdList = - localBizHosts.stream().filter(localHost -> { - if (ccBizHostIds.contains(localHost.getHostId())) { - return false; + /** + * 根据本地主机拓扑数据与从CMDB获取的主机拓扑数据计算需要更新的拓扑数据 + * + * @param bizId 业务ID + * @param localHostTopoList 本地主机拓扑数据 + * @param hostWithModulesList 从CMDB获取的主机拓扑数据 + * @param cmdbHostsFetchTimeMills CMDB主机信息获取时间 + * @return 需要更新的主机拓扑数据 + */ + private List computeUpdateHostTopoList(Long bizId, + List localHostTopoList, + List hostWithModulesList, + long cmdbHostsFetchTimeMills) { + List updateHostTopoList = new ArrayList<>(); + Map localTopoMap = new HashMap<>(); + for (HostTopoDTO hostTopoDTO : localHostTopoList) { + localTopoMap.put(buildTopoKey(hostTopoDTO), hostTopoDTO); + } + for (HostWithModules hostWithModules : hostWithModulesList) { + HostProp host = hostWithModules.getHost(); + List modules = hostWithModules.getModules(); + if (CollectionUtils.isEmpty(modules)) { + continue; + } + for (ModuleProp module : modules) { + if (module == null) { + continue; + } + String topoKey = buildTopoKey(bizId, host, module); + if (!localTopoMap.containsKey(topoKey)) { + continue; + } + HostTopoDTO localHostTopo = localTopoMap.get(topoKey); + HostTopoDTO cmdbHostTopo = buildHostTopo(bizId, host, module); + Long cmdbHostTopoLastTime = cmdbHostTopo.getLastTime(); + if (cmdbHostTopoLastTime == null || cmdbHostTopoLastTime < 0) { + cmdbHostTopoLastTime = cmdbHostsFetchTimeMills; + cmdbHostTopo.setLastTime(cmdbHostTopoLastTime); + log.warn( + "cmdbHostTopoLastTime({}) is invalid, use cmdbHostsFetchTimeMills({}) for update, " + + "cmdbHostTopo={}", + cmdbHostTopoLastTime, + cmdbHostsFetchTimeMills, + cmdbHostTopo + ); } - if (localHost.getLastModifyTime() > cmdbHostsFetchTimeMills) { + if (localHostTopo.getLastTime() < cmdbHostTopoLastTime) { + updateHostTopoList.add(cmdbHostTopo); log.info( - "local host(hostId={}) is newer({}) than target host fetch time({}), do not delete", - localHost.getHostId(), - localHost.getLastModifyTime(), - cmdbHostsFetchTimeMills + "local hostTopo({}) is older than cmdb hostTopo time({}), need to update", + localHostTopo, + cmdbHostTopoLastTime + ); + } else { + log.debug( + "local hostTopo({}) is not older than cmdb hostTopo time({}), do not update", + localHostTopo, + cmdbHostTopoLastTime ); - return false; } - return true; - }).map(BasicHostDTO::getHostId).collect(Collectors.toList()); - watch.start("log deleteIdList"); + } + } + logUpdateHostTopoList(bizId, updateHostTopoList); + return updateHostTopoList; + } + + private void logUpdateHostTopoList(Long bizId, List updateHostTopoList) { + if (log.isDebugEnabled()) { + log.debug( + "bizId={}, updateHostTopoList.size={}, updateHostTopoList={}", + bizId, + updateHostTopoList.size(), + updateHostTopoList + ); + return; + } + if (updateHostTopoList.size() < 200) { + log.info( + "bizId={}, updateHostTopoList.size={}, updateHostTopoList={}", + bizId, + updateHostTopoList.size(), + updateHostTopoList + ); + return; + } log.info( - "bizId={},deleteHostIds.size={},deleteHostIds={}", + "bizId={}, updateHostTopoList.size={}", bizId, - deleteIdList.size(), - StringUtil.concatCollection(deleteIdList) + updateHostTopoList.size() ); - watch.stop(); - if (watch.getTotalTimeMillis() > 1000) { - log.warn("Write log too slow, {}", watch.prettyPrint()); + } + + /** + * 根据本地主机拓扑数据与从CMDB获取的主机拓扑数据计算需要删除的拓扑数据 + * + * @param bizId 业务ID + * @param localHostTopoList 本地主机拓扑数据 + * @param hostWithModulesList 从CMDB获取的主机拓扑数据 + * @param cmdbHostsFetchTimeMills CMDB主机信息获取时间 + * @return 需要更新的主机拓扑数据 + */ + private List computeDeleteHostTopoList(Long bizId, + List localHostTopoList, + List hostWithModulesList, + long cmdbHostsFetchTimeMills) { + List deleteHostTopoList = new ArrayList<>(); + Set cmdbHostTopoKeys = new HashSet<>(); + + for (HostWithModules hostWithModules : hostWithModulesList) { + HostProp host = hostWithModules.getHost(); + List modules = hostWithModules.getModules(); + if (CollectionUtils.isEmpty(modules)) { + continue; + } + for (ModuleProp module : modules) { + if (module == null) { + continue; + } + String topoKey = buildTopoKey(bizId, host, module); + cmdbHostTopoKeys.add(topoKey); + } + } + for (HostTopoDTO localHostTopo : localHostTopoList) { + String topoKey = buildTopoKey(localHostTopo); + if (cmdbHostTopoKeys.contains(topoKey)) { + continue; + } + if (localHostTopo.getLastTime() < cmdbHostsFetchTimeMills) { + deleteHostTopoList.add(localHostTopo); + log.info( + "local hostTopo({}) is older than cmdb hostTopo fetch time({}), need to delete", + localHostTopo, + cmdbHostsFetchTimeMills + ); + } else { + log.info( + "local hostTopo({}) is not older than cmdb hostTopo fetch time({}), do not delete", + localHostTopo, + cmdbHostsFetchTimeMills + ); + } } - return deleteIdList; + log.info( + "bizId={}, deleteHostTopoList.size={}, deleteHostTopoList={}", + bizId, + deleteHostTopoList.size(), + deleteHostTopoList + ); + return deleteHostTopoList; } - private void refreshBizHosts(Long bizId, - List applicationHostDTOList, - long cmdbHostsFetchTimeMills) { - StopWatch watch = new StopWatch(); - //找出要删除的/更新的/新增的分别处理 - //对比库中数据与接口数据 - watch.start("listHostInfoByBizId"); - List localBizHosts = applicationHostDAO.listBasicHostInfoByBizId(bizId); + private Triple refreshHostTopos(List insertHostTopoList, + List updateHostTopoList, + List deleteHostTopoList, + StopWatch watch) { + watch.start("batchDeleteWithLastTime"); + int deletedNum = hostTopoDAO.batchDeleteWithLastTime(deleteHostTopoList); watch.stop(); - watch.start("mapTo ccBizHostIds"); - Set ccBizHostIds = - applicationHostDTOList.stream().map(ApplicationHostDTO::getHostId).collect(Collectors.toSet()); + watch.start("batchInsertHostTopo"); + int insertedNum = hostTopoDAO.batchInsertHostTopo(insertHostTopoList); watch.stop(); - watch.start("mapTo localBizHostIds"); - Set localBizHostIds = localBizHosts.stream().map(BasicHostDTO::getHostId).collect(Collectors.toSet()); + watch.start("batchUpdateBeforeLastTime"); + int updatedNum = hostTopoDAO.batchUpdateBeforeLastTime(updateHostTopoList); watch.stop(); - watch.start("log ccBizHostIds"); - log.info( - "bizId={}, ccBizHostIds.size={}, ccBizHostIds={}", + return Triple.of(insertedNum, updatedNum, deletedNum); + } + + private String buildTopoKey(HostTopoDTO hostTopoDTO) { + String delimiter = "_"; + return hostTopoDTO.getHostId() + + delimiter + + hostTopoDTO.getBizId() + + delimiter + + hostTopoDTO.getSetId() + + delimiter + + hostTopoDTO.getModuleId(); + } + + private String buildTopoKey(Long bizId, HostProp host, ModuleProp moduleProp) { + String delimiter = "_"; + return host.getHostId() + + delimiter + + bizId + + delimiter + + moduleProp.getSetId() + + delimiter + + moduleProp.getModuleId(); + } + + private HostTopoDTO buildHostTopo(Long bizId, HostProp host, ModuleProp moduleProp) { + return new HostTopoDTO( + host.getHostId(), bizId, - ccBizHostIds.size(), - StringUtil.concatCollection(ccBizHostIds) + moduleProp.getSetId(), + moduleProp.getModuleId(), + TimeUtil.parseZonedTime(moduleProp.getLastTime()) ); + } + + private Pair refreshHosts(List insertHostList, + List updateHostList, + StopWatch watch) { + watch.start("batchInsertHosts"); + // 新增主机 + int insertNum = hostService.batchInsertHosts(insertHostList); + watch.stop(); + watch.start("batchUpdateHostsBeforeLastTime"); + // 更新主机 + int updateNum = hostService.batchUpdateHostsBeforeLastTime(updateHostList); watch.stop(); - watch.start("log localBizHostIds"); + return Pair.of(insertNum, updateNum); + } + + private void logRefreshResult(Long bizId, + int insertedHostTopoNum, + int updatedHostTopoNum, + int deletedHostTopoNum, + int insertedHostNum, + int updatedHostNum, + StopWatch watch) { + if (watch.getTotalTimeMillis() > 300_000) { + log.warn("Performance:refreshBizHostAndRelations: bizId={}, {}", bizId, watch.prettyPrint()); + } else if (watch.getTotalTimeMillis() > 50_000) { + log.info("Performance:refreshBizHostAndRelations: bizId={}, {}", bizId, watch.prettyPrint()); + } else { + log.debug("Performance:refreshBizHostAndRelations: bizId={}, {}", bizId, watch.prettyPrint()); + } log.info( - "bizId={}, localBizHostIds.size={}, localBizHostIds={}", + "RefreshBizHostAndRelationsStatistics:bizId={}, insertedHostTopoNum={}, " + + "updatedHostTopoNum={}, deletedHostTopoNum={}, insertedHostNum={}, updatedHostNum={}", bizId, - localBizHostIds.size(), - StringUtil.concatCollection(localBizHostIds) + insertedHostTopoNum, + updatedHostTopoNum, + deletedHostTopoNum, + insertedHostNum, + updatedHostNum ); - watch.stop(); - watch.start("compute insertList"); - List insertList = computeInsertList(bizId, localBizHostIds, applicationHostDTOList); - watch.stop(); - watch.start("compute updateList"); - List updateList = computeUpdateList( + } + + private void logCmdbHostIds(StopWatch watch, Long bizId, Collection cmdbHostIds) { + watch.start("logCmdbHostIds"); + log.info( + "bizId={}, cmdbHostIds.size={}, cmdbHostIds={}", bizId, - localBizHostIds, - localBizHosts, - applicationHostDTOList, - cmdbHostsFetchTimeMills + cmdbHostIds.size(), + StringUtil.concatCollection(cmdbHostIds) ); watch.stop(); - watch.start("compute deleteIdList"); - List deleteIdList = computeDeleteIdList( + } + + private void logLocalHostIds(StopWatch watch, Long bizId, Collection localHostIds) { + watch.start("logLocalHostIds"); + log.info( + "bizId={}, localHostIds.size={}, localHostIds={}", bizId, - ccBizHostIds, - localBizHosts, - cmdbHostsFetchTimeMills + localHostIds.size(), + StringUtil.concatCollection(localHostIds) ); watch.stop(); - watch.start("deleteHostsFromBiz"); - // 记录一次业务主机同步过程中所有更新失败的主机ID - // 需要从业务下移除的主机 - List removeFailHostIds = hostService.removeHostsFromBiz(bizId, deleteIdList); - watch.stop(); - watch.start("insertHostsToBiz"); - // 需要新增的主机 - List insertFailHostIds = hostService.insertHostsToBiz(bizId, insertList); - watch.stop(); - watch.start("updateHostsInBiz"); - // 需要更新的主机 - List updateFailHostIds = hostService.updateHostsInBiz(bizId, updateList); - watch.stop(); - int failNum = insertFailHostIds.size() + updateFailHostIds.size() + removeFailHostIds.size(); - if (watch.getTotalTimeMillis() > 300_000) { - log.warn("Performance:refreshBizHosts: bizId={}, {}", bizId, watch.prettyPrint()); - } else if (watch.getTotalTimeMillis() > 50_000) { - log.info("Performance:refreshBizHosts: bizId={}, {}", bizId, watch.prettyPrint()); - } else { - log.debug("Performance:refreshBizHosts: bizId={}, {}", bizId, watch.prettyPrint()); - } - if (failNum > 0) { - log.warn( - "FailedHostsStatistics:bizId={}, insertFailHostIds={}, updateFailHostIds={}, removeFailHostIds={}", - bizId, - insertFailHostIds, - updateFailHostIds, - removeFailHostIds - ); - } } - private Pair syncBizHostsIndeed(ApplicationDTO bizApp) { + private Triple, Long, Long> syncBizHostsIndeed(ApplicationDTO bizApp) { Long bizId = Long.valueOf(bizApp.getScope().getId()); long cmdbInterfaceTimeConsuming = 0L; long writeToDBTimeConsuming = 0L; - IBizCmdbClient bizCmdbClient = CmdbClientFactory.getCmdbClient(); StopWatch bizHostsWatch = new StopWatch(); - bizHostsWatch.start("getHostsByAppInfo from CMDB"); + bizHostsWatch.start("getHostRelationsFromCmdb"); long startTime = System.currentTimeMillis(); log.info("begin to syncBizHosts:bizId={}", bizId); long cmdbHostsFetchTimeMills = System.currentTimeMillis(); - List hostsFromCmdb = getHostsByAppInfo(bizCmdbClient, bizApp); + List hostRelationsFromCmdb = getHostRelationsFromCmdb(bizApp); cmdbInterfaceTimeConsuming += (System.currentTimeMillis() - startTime); bizHostsWatch.stop(); - bizHostsWatch.start("updateHosts to local DB"); + bizHostsWatch.start("refreshBizHostAndRelations to local DB"); startTime = System.currentTimeMillis(); - refreshBizHosts(bizId, hostsFromCmdb, cmdbHostsFetchTimeMills); + Set cmdbBasicHosts = refreshBizHostAndRelations( + bizId, + hostRelationsFromCmdb, + cmdbHostsFetchTimeMills + ); writeToDBTimeConsuming += (System.currentTimeMillis() - startTime); bizHostsWatch.stop(); - log.info("Performance:syncBizHosts:bizId={},{}", bizId, bizHostsWatch); - return Pair.of(cmdbInterfaceTimeConsuming, writeToDBTimeConsuming); + if (bizHostsWatch.getTotalTimeMillis() < 600_000L) { + log.info("Performance:syncBizHostAndRelations:bizId={},{}", bizId, bizHostsWatch); + } else { + log.warn("SLOW:Performance:syncBizHostAndRelations:bizId={},{}", bizId, bizHostsWatch); + } + return Triple.of(cmdbBasicHosts, cmdbInterfaceTimeConsuming, writeToDBTimeConsuming); } - public Pair syncBizHostsAtOnce(ApplicationDTO bizApp) { + public Triple, Long, Long> syncBizHostsAtOnce(ApplicationDTO bizApp) { Long bizId = Long.valueOf(bizApp.getScope().getId()); try { appHostsUpdateHelper.waitAndStartBizHostsUpdating(bizId); @@ -296,8 +670,103 @@ public Pair syncBizHostsAtOnce(ApplicationDTO bizApp) { log.error("Fail to syncBizHosts of bizId " + bizId, t); return null; } finally { - appHostsUpdateHelper.endToUpdateBizHosts(bizId); + appHostsUpdateHelper.endToUpdateBizHosts(); + } + } + + public void clearHostNotInCmdb(Set cmdbBasicHosts, long cmdbHostsFetchTimeMills) { + // 删除主机拓扑 + List deleteHostTopoList = computeDeleteHostTopoList(cmdbBasicHosts, cmdbHostsFetchTimeMills); + int deletedHostTopoNum = 0; + if (!CollectionUtils.isEmpty(deleteHostTopoList)) { + for (HostTopoDTO hostTopoDTO : deleteHostTopoList) { + deletedHostTopoNum += hostTopoDAO.deleteHostTopoBeforeLastTime( + hostTopoDTO.getHostId(), + hostTopoDTO.getBizId(), + hostTopoDTO.getSetId(), + hostTopoDTO.getModuleId(), + cmdbHostsFetchTimeMills + ); + } + log.info( + "deleteHostTopoList.size={}, deletedHostTopoNum={}, deleteHostTopoList={}", + deleteHostTopoList.size(), + deletedHostTopoNum, + deleteHostTopoList + ); + } + + // 删除主机 + List deleteHostList = computeDeleteHostList(cmdbBasicHosts, cmdbHostsFetchTimeMills); + if (!CollectionUtils.isEmpty(deleteHostList)) { + int deletedHostNum = hostService.deleteByBasicHost(deleteHostList); + log.info( + "deleteHostList.size={}, deletedHostNum={}, deleteHostIdList={}", + deleteHostList.size(), + deletedHostNum, + deleteHostList.stream().map(BasicHostDTO::getHostId).collect(Collectors.toList()) + ); + } + } + + private List computeDeleteHostTopoList( + Set cmdbBasicHosts, + long cmdbHostsFetchTimeMills + ) { + Set cmdbHostIds = cmdbBasicHosts.stream().map(BasicHostDTO::getHostId).collect(Collectors.toSet()); + List hostTopoList = hostTopoDAO.listHostTopoByExcludeHostIds(cmdbHostIds); + return hostTopoList.stream().filter(hostTopoDTO -> { + if (hostTopoDTO.getLastTime() > cmdbHostsFetchTimeMills) { + log.info( + "local hostTopo({}) is not older than cmdb hosts fetch time({}), do not delete", + hostTopoDTO, + cmdbHostsFetchTimeMills + ); + return false; + } + log.info( + "local hostTopo({}) is equal or older than cmdb hosts fetch time({}), need to delete", + hostTopoDTO, + cmdbHostsFetchTimeMills + ); + return true; + }).collect(Collectors.toList()); + } + + private List computeDeleteHostList( + Set cmdbBasicHosts, + long cmdbHostsFetchTimeMills + ) { + List localBasicHosts = hostService.listAllBasicHost(); + Map cmdbBasicHostMap = new HashMap<>(); + for (BasicHostDTO cmdbBasicHost : cmdbBasicHosts) { + cmdbBasicHostMap.put(cmdbBasicHost.getHostId(), cmdbBasicHost); + } + List deleteHostList = new ArrayList<>(); + for (BasicHostDTO localBasicHost : localBasicHosts) { + if (cmdbBasicHostMap.containsKey(localBasicHost.getHostId())) { + continue; + } + // 本地主机时间戳比从CMDB获取的主机要新,不删除 + if (localBasicHost.getLastTime() > cmdbHostsFetchTimeMills) { + log.info( + "local host(hostId={}, lastTime={}) is not older than cmdb hosts fetch time({}), do not delete", + localBasicHost.getHostId(), + localBasicHost.getLastTime(), + cmdbHostsFetchTimeMills + ); + continue; + } + log.info( + "local host(hostId={}, lastTime={}) is equal or older than cmdb hosts fetch time({}), need to " + + "delete", + localBasicHost.getHostId(), + localBasicHost.getLastTime(), + cmdbHostsFetchTimeMills + ); + deleteHostList.add(localBasicHost); } + return deleteHostList; } } diff --git a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/SyncServiceImpl.java b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/SyncServiceImpl.java index 8d853d113e..fc18d98d7d 100644 --- a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/SyncServiceImpl.java +++ b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/SyncServiceImpl.java @@ -26,6 +26,7 @@ import com.tencent.bk.job.common.constant.ResourceScopeTypeEnum; import com.tencent.bk.job.common.model.dto.ApplicationDTO; +import com.tencent.bk.job.common.model.dto.BasicHostDTO; import com.tencent.bk.job.common.model.dto.ResourceScope; import com.tencent.bk.job.common.redis.util.LockUtils; import com.tencent.bk.job.common.redis.util.RedisKeyHeartBeatThread; @@ -34,29 +35,30 @@ import com.tencent.bk.job.manage.config.JobManageConfig; import com.tencent.bk.job.manage.dao.ApplicationDAO; import com.tencent.bk.job.manage.manager.app.ApplicationCache; -import com.tencent.bk.job.manage.service.ApplicationService; import com.tencent.bk.job.manage.service.SyncService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Triple; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.DependsOn; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.util.Pair; import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; import org.springframework.util.StopWatch; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +@SuppressWarnings("FieldCanBeLocal") @Slf4j @Service @DependsOn("redisLockConfig") @@ -69,7 +71,6 @@ public class SyncServiceImpl implements SyncService { private static final String REDIS_KEY_LAST_FINISH_TIME_SYNC_HOST = "last-finish-time-sync-host"; private static final String REDIS_KEY_LAST_FINISH_TIME_SYNC_AGENT_STATUS = "last-finish-time-sync-agent-status"; private static final String machineIp = IpUtils.getFirstMachineIP(); - private static final int MAX_RETRY_COUNT = 3; static { List keyList = Arrays.asList(REDIS_KEY_SYNC_APP_JOB_LOCK, REDIS_KEY_SYNC_HOST_JOB_LOCK, @@ -85,7 +86,6 @@ public class SyncServiceImpl implements SyncService { } private final ApplicationDAO applicationDAO; - private final ApplicationService applicationService; private final ThreadPoolExecutor syncAppExecutor; private final ThreadPoolExecutor syncHostExecutor; private final JobManageConfig jobManageConfig; @@ -93,7 +93,6 @@ public class SyncServiceImpl implements SyncService { private final String REDIS_KEY_SYNC_APP_JOB_RUNNING_MACHINE = "sync-app-job-running-machine"; private final String REDIS_KEY_SYNC_HOST_JOB_RUNNING_MACHINE = "sync-host-job-running-machine"; private final String REDIS_KEY_SYNC_AGENT_STATUS_JOB_RUNNING_MACHINE = "sync-agent-status-job-running-machine"; - private final BlockingQueue> appHostFailQueue = new LinkedBlockingDeque<>(); private volatile boolean enableSyncApp; private volatile boolean enableSyncHost; private volatile boolean enableSyncAgentStatus; @@ -114,7 +113,6 @@ public SyncServiceImpl(BizSyncService bizSyncService, HostSyncService hostSyncService, AgentStatusSyncService agentStatusSyncService, ApplicationDAO applicationDAO, - ApplicationService applicationService, JobManageConfig jobManageConfig, RedisTemplate redisTemplate, ApplicationCache applicationCache, @@ -126,7 +124,6 @@ public SyncServiceImpl(BizSyncService bizSyncService, @Qualifier("syncAppExecutor") ThreadPoolExecutor syncAppExecutor, @Qualifier("syncHostExecutor") ThreadPoolExecutor syncHostExecutor) { this.applicationDAO = applicationDAO; - this.applicationService = applicationService; this.jobManageConfig = jobManageConfig; this.redisTemplate = redisTemplate; this.enableSyncApp = jobManageConfig.isEnableSyncApp(); @@ -252,7 +249,7 @@ public Long syncApp() { return 1L; } - private Future> arrangeSyncBizHostsTask(ApplicationDTO bizApp) { + private Future, Long, Long>> arrangeSyncBizHostsTask(ApplicationDTO bizApp) { return syncHostExecutor.submit(() -> hostSyncService.syncBizHostsAtOnce(bizApp) ); @@ -306,31 +303,57 @@ public Long syncHost() { localApps.stream().filter(ApplicationDTO::isBiz).collect(Collectors.toList()); long cmdbInterfaceTimeConsuming = 0L; long writeToDBTimeConsuming = 0L; - List>>> appFutureList = new ArrayList<>(); + List, Long, Long>>>> bizAppFutureList = + new ArrayList<>(); + Set allBizCmdbBasicHosts = new HashSet<>(); + int failedBizNum = 0; + long cmdbHostsFetchTimeMills = System.currentTimeMillis(); for (ApplicationDTO bizApp : localBizApps) { - Future> future = arrangeSyncBizHostsTask(bizApp); - appFutureList.add(Pair.of(bizApp, future)); + Future, Long, Long>> future = arrangeSyncBizHostsTask(bizApp); + bizAppFutureList.add(Pair.of(bizApp, future)); } - for (Pair>> appFuture : appFutureList) { - ApplicationDTO applicationDTO = appFuture.getFirst(); - Future> future = appFuture.getSecond(); + for (Pair, Long, Long>>> bizAppFuture : + bizAppFutureList) { + ApplicationDTO bizApp = bizAppFuture.getFirst(); + Future, Long, Long>> future = bizAppFuture.getSecond(); try { - Pair timeConsumingPair = future.get(30, TimeUnit.MINUTES); - cmdbInterfaceTimeConsuming += timeConsumingPair.getFirst(); - writeToDBTimeConsuming += timeConsumingPair.getSecond(); + Triple, Long, Long> timeConsumingPair = future.get(30, TimeUnit.MINUTES); + Set cmdbBasicHosts = timeConsumingPair.getLeft(); + if (!CollectionUtils.isEmpty(cmdbBasicHosts)) { + allBizCmdbBasicHosts.addAll(cmdbBasicHosts); + } + cmdbInterfaceTimeConsuming += timeConsumingPair.getMiddle(); + writeToDBTimeConsuming += timeConsumingPair.getRight(); } catch (Throwable t) { - appHostFailQueue.add(Pair.of(applicationDTO, MAX_RETRY_COUNT)); - log.error("syncHost of app fail:appId=" + applicationDTO.getId(), t); + log.error("syncHost of biz fail:bizId=" + bizApp.getBizIdIfBizApp(), t); + failedBizNum += 1; } } - log.info( - Thread.currentThread().getName() + - ":Finished:sync host from cc," + - "cmdbInterfaceTimeConsuming={}ms,writeToDBTimeConsuming={}ms,rate={}", - cmdbInterfaceTimeConsuming, - writeToDBTimeConsuming, - cmdbInterfaceTimeConsuming / (0. + writeToDBTimeConsuming) - ); + if (failedBizNum == 0) { + // 删除CMDB中不存在的主机 + hostSyncService.clearHostNotInCmdb(allBizCmdbBasicHosts, cmdbHostsFetchTimeMills); + log.info( + Thread.currentThread().getName() + + ":Finished:sync host from cc, bizNum={}, failedBizNum={}, " + + "cmdbInterfaceTimeConsuming={}ms,writeToDBTimeConsuming={}ms,rate={}", + localBizApps.size(), + failedBizNum, + cmdbInterfaceTimeConsuming, + writeToDBTimeConsuming, + cmdbInterfaceTimeConsuming / (0. + writeToDBTimeConsuming) + ); + } else { + log.warn( + Thread.currentThread().getName() + + ":Finished:sync host from cc, bizNum={}, failedBizNum={}, " + + "cmdbInterfaceTimeConsuming={}ms,writeToDBTimeConsuming={}ms,rate={}", + localBizApps.size(), + failedBizNum, + cmdbInterfaceTimeConsuming, + writeToDBTimeConsuming, + cmdbInterfaceTimeConsuming / (0. + writeToDBTimeConsuming) + ); + } // 将最后同步时间写入Redis redisTemplate.opsForValue().set(REDIS_KEY_LAST_FINISH_TIME_SYNC_HOST, "" + System.currentTimeMillis()); @@ -421,11 +444,17 @@ public Boolean syncBizHosts(Long bizId) { ApplicationDTO applicationDTO = applicationDAO.getAppByScope( new ResourceScope(ResourceScopeTypeEnum.BIZ, bizId.toString()) ); - Pair pair = hostSyncService.syncBizHostsAtOnce(applicationDTO); - Long cmdbInterfaceTimeConsuming = pair.getFirst(); - Long writeToDBTimeConsuming = pair.getSecond(); - log.info("syncBizHosts:cmdbInterfaceTimeConsuming={},writeToDBTimeConsuming={}", cmdbInterfaceTimeConsuming, - writeToDBTimeConsuming); + Triple, Long, Long> triple = hostSyncService.syncBizHostsAtOnce(applicationDTO); + Set cmdbBasicHosts = triple.getLeft(); + Long cmdbInterfaceTimeConsuming = triple.getMiddle(); + Long writeToDBTimeConsuming = triple.getRight(); + log.info( + "syncBizHosts:cmdbInterfaceTimeConsuming={},writeToDBTimeConsuming={}, {} hosts, cmdbHostIds={}", + cmdbInterfaceTimeConsuming, + writeToDBTimeConsuming, + cmdbBasicHosts.size(), + cmdbBasicHosts.stream().map(BasicHostDTO::getHostId).collect(Collectors.toList()) + ); return true; } diff --git a/support-files/sql/job-manage/0031_job_manage_20230618-1000_V3.7.4_mysql.sql b/support-files/sql/job-manage/0031_job_manage_20230618-1000_V3.7.4_mysql.sql new file mode 100644 index 0000000000..b7c389cc07 --- /dev/null +++ b/support-files/sql/job-manage/0031_job_manage_20230618-1000_V3.7.4_mysql.sql @@ -0,0 +1,39 @@ +USE job_manage; + +SET NAMES utf8mb4; + +DROP PROCEDURE IF EXISTS job_schema_update; + +DELIMITER + +CREATE PROCEDURE job_schema_update() +BEGIN + + DECLARE db VARCHAR(100); + SET AUTOCOMMIT = 0; + SELECT DATABASE() INTO db; + + -- Update `host` schema + IF NOT EXISTS(SELECT 1 + FROM information_schema.columns + WHERE TABLE_SCHEMA = db + AND TABLE_NAME = 'host' + AND COLUMN_NAME = 'last_time') THEN + ALTER TABLE host ADD COLUMN last_time BIGINT(20) NOT NULL DEFAULT 0 COMMENT 'CMDB中的数据最后修改时间'; + END IF; + + -- Update `host_topo` schema + IF NOT EXISTS(SELECT 1 + FROM information_schema.columns + WHERE TABLE_SCHEMA = db + AND TABLE_NAME = 'host_topo' + AND COLUMN_NAME = 'last_time') THEN + ALTER TABLE host_topo ADD COLUMN last_time BIGINT(20) NOT NULL DEFAULT 0 COMMENT 'CMDB中的数据最后修改时间' AFTER `app_id`; + END IF; + +COMMIT; +END +DELIMITER ; +CALL job_schema_update(); + +DROP PROCEDURE IF EXISTS job_schema_update;