Skip to content

Commit

Permalink
Multithreaded Config Repo update
Browse files Browse the repository at this point in the history
  • Loading branch information
ibnc committed Dec 13, 2018
1 parent cb0348b commit 0a15ede
Show file tree
Hide file tree
Showing 12 changed files with 138 additions and 67 deletions.
@@ -0,0 +1,25 @@
/*
* Copyright 2018 ThoughtWorks, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.thoughtworks.go.server.materials;

import com.thoughtworks.go.domain.materials.Material;

public class ConfigMaterialUpdateCompletedMessage extends MaterialUpdateCompletedMessage {
public ConfigMaterialUpdateCompletedMessage(Material material, long trackingId) {
super(material, trackingId);
}
}
Expand Up @@ -27,17 +27,14 @@
import com.thoughtworks.go.server.service.materials.MaterialPoller;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.io.File;

/**
* Updates configuration from repositories.
*/
@Service
public class ConfigMaterialUpdater implements GoMessageListener<MaterialUpdateCompletedMessage> {
private static final Logger LOGGER = LoggerFactory.getLogger(ConfigMaterialUpdater.class);
public class ConfigMaterialUpdateListener implements GoMessageListener<MaterialUpdateCompletedMessage> {
private static final Logger LOGGER = LoggerFactory.getLogger(ConfigMaterialUpdateListener.class);

private GoRepoConfigDataSource repoConfigDataSource;
private MaterialRepository materialRepository;
Expand All @@ -47,23 +44,20 @@ public class ConfigMaterialUpdater implements GoMessageListener<MaterialUpdateCo
private MaterialService materialService;
private SubprocessExecutionContext subprocessExecutionContext;

@Autowired
public ConfigMaterialUpdater(GoRepoConfigDataSource repoConfigDataSource,
MaterialRepository materialRepository,
MaterialChecker materialChecker,
ConfigMaterialUpdateCompletedTopic configCompletedTopic,
MaterialUpdateCompletedTopic topic,
MaterialService materialService,
SubprocessExecutionContext subprocessExecutionContext) {
public ConfigMaterialUpdateListener(GoRepoConfigDataSource repoConfigDataSource,
MaterialRepository materialRepository,
MaterialChecker materialChecker,
ConfigMaterialUpdateCompletedTopic configCompletedTopic,
MaterialUpdateCompletedTopic topic,
MaterialService materialService,
SubprocessExecutionContext subprocessExecutionContext) {
this.repoConfigDataSource = repoConfigDataSource;
this.materialChecker = materialChecker;
this.materialRepository = materialRepository;
this.configCompleted = configCompletedTopic;
this.topic = topic;
this.materialService = materialService;
this.subprocessExecutionContext = subprocessExecutionContext;

this.configCompleted.addListener(this);
}

@Override
Expand Down Expand Up @@ -94,9 +88,9 @@ public void onMessage(MaterialUpdateCompletedMessage message) {
}
}
} finally {
// always post the original message further
// always post this message further
// this will remove material from inProgress in MUS
topic.post(message);
topic.post(new ConfigMaterialUpdateCompletedMessage(message.getMaterial(), message.trackingId()));
}
}

Expand Down
Expand Up @@ -24,7 +24,7 @@
* @understands messages about required material updates
*/
@Component
public class ConfigMaterialUpdateQueue extends GoMessageQueue<MaterialUpdateMessage> {
public class ConfigMaterialUpdateQueue extends GoMessageQueue<MaterialUpdateCompletedMessage> {
@Autowired
public ConfigMaterialUpdateQueue(MessagingService messaging) {
super(messaging, "config-material-update-required");
Expand Down
Expand Up @@ -16,13 +16,16 @@

package com.thoughtworks.go.server.materials;

import com.thoughtworks.go.config.GoRepoConfigDataSource;
import com.thoughtworks.go.config.materials.SubprocessExecutionContext;
import com.thoughtworks.go.server.cronjob.GoDiskSpaceMonitor;
import com.thoughtworks.go.server.messaging.GoMessageQueue;
import com.thoughtworks.go.server.messaging.GoMessageTopic;
import com.thoughtworks.go.server.perf.MDUPerformanceLogger;
import com.thoughtworks.go.server.persistence.MaterialRepository;
import com.thoughtworks.go.server.service.DrainModeService;
import com.thoughtworks.go.server.service.MaterialExpansionService;
import com.thoughtworks.go.server.service.MaterialService;
import com.thoughtworks.go.server.service.support.DaemonThreadStatsCollector;
import com.thoughtworks.go.server.transaction.TransactionTemplate;
import com.thoughtworks.go.serverhealth.ServerHealthService;
Expand All @@ -40,6 +43,10 @@ public class MaterialUpdateListenerFactory {
private DependencyMaterialUpdateQueue dependencyMaterialQueue;
private final DaemonThreadStatsCollector daemonThreadStatsCollector;
private DrainModeService drainModeService;
private GoRepoConfigDataSource repoConfigDataSource;
private MaterialChecker materialChecker;
private MaterialService materialService;
private SubprocessExecutionContext subprocessExecutionContext;
private SystemEnvironment systemEnvironment;
private final ServerHealthService serverHealthService;
private final GoDiskSpaceMonitor diskSpaceMonitor;
Expand Down Expand Up @@ -69,7 +76,11 @@ public MaterialUpdateListenerFactory(MaterialUpdateCompletedTopic topic,
MDUPerformanceLogger mduPerformanceLogger,
DependencyMaterialUpdateQueue dependencyMaterialQueue,
DaemonThreadStatsCollector daemonThreadStatsCollector,
DrainModeService drainModeService) {
DrainModeService drainModeService,
GoRepoConfigDataSource repoConfigDataSource,
MaterialChecker materialChecker,
MaterialService materialService,
SubprocessExecutionContext subprocessExecutionContext) {
this.topic = topic;
this.configTopic = configTopic;
this.queue = queue;
Expand All @@ -88,6 +99,10 @@ public MaterialUpdateListenerFactory(MaterialUpdateCompletedTopic topic,
this.dependencyMaterialQueue = dependencyMaterialQueue;
this.daemonThreadStatsCollector = daemonThreadStatsCollector;
this.drainModeService = drainModeService;
this.repoConfigDataSource = repoConfigDataSource;
this.materialChecker = materialChecker;
this.materialService = materialService;
this.subprocessExecutionContext = subprocessExecutionContext;
}

public void init(){
Expand All @@ -100,7 +115,7 @@ public void init(){
}

for (int i = 0; i < numberOfConfigListeners; i++) {
createWorker(this.configQueue, this.configTopic);
configQueue.addListener(new ConfigMaterialUpdateListener(repoConfigDataSource, materialRepository, materialChecker, configTopic, topic, materialService, subprocessExecutionContext));
}

for (int i = 0; i < numberOfDependencyMaterialCheckListeners; i++) {
Expand Down
Expand Up @@ -173,7 +173,7 @@ public boolean updateMaterial(Material material) {
LOGGER.debug("[Material Update] Starting update of material {}", material);
try {
long trackingId = mduPerformanceLogger.materialSentToUpdateQueue(material);
queueFor(material).post(new MaterialUpdateMessage(material, trackingId));
postMaterialToCorrespondingQueue(material, trackingId);

return true;
} catch (RuntimeException e) {
Expand Down Expand Up @@ -204,6 +204,11 @@ public void onMessage(MaterialUpdateCompletedMessage message) {
return;
}

if (!(message instanceof ConfigMaterialUpdateCompletedMessage) && isConfigMaterial(message.getMaterial())) {
configUpdateQueue.post(message);
return;
}

try {
LOGGER.debug("[Material Update] Material update completed for material {}", message.getMaterial());

Expand Down Expand Up @@ -257,12 +262,12 @@ private Long getMaterialUpdateInActiveTimeoutInMillis() {
return systemEnvironment.get(SystemEnvironment.MATERIAL_UPDATE_INACTIVE_TIMEOUT) * 60 * 1000L;
}

private GoMessageQueue<MaterialUpdateMessage> queueFor(Material material) {
if (isConfigMaterial(material)) {
return configUpdateQueue;
private void postMaterialToCorrespondingQueue(Material material, long trackingId) {
if (material instanceof DependencyMaterial) {
dependencyMaterialUpdateQueue.post(new MaterialUpdateMessage(material, trackingId));
} else {
updateQueue.post(new MaterialUpdateMessage(material, trackingId));
}

return (material instanceof DependencyMaterial) ? dependencyMaterialUpdateQueue : updateQueue;
}

ProcessManager getProcessManager() {
Expand Down
Expand Up @@ -355,6 +355,7 @@ public void onMaterialUpdate(MaterialUpdateCompletedMessage message) {
format("Material update failed for material '%s' because: %s", material.getDisplayName(), failureReason));
failed = true;
} else if (this.configMaterial != null &&
message instanceof ConfigMaterialUpdateCompletedMessage &&
material.isSameFlyweight(this.configMaterial)) {
// Then we have just updated configuration material.
// A chance to refresh our config instance.
Expand Down Expand Up @@ -391,6 +392,9 @@ public void onMaterialUpdate(MaterialUpdateCompletedMessage message) {
this.pipelineConfig = newPipelineConfig;
}
}
} else if (this.configMaterial != null &&
material.isSameFlyweight(this.configMaterial)) {
return;
}

pendingMaterials.remove(material.getFingerprint());
Expand Down
Expand Up @@ -36,13 +36,13 @@
import static com.thoughtworks.go.domain.materials.Modification.modifications;
import static org.mockito.Mockito.*;

public class ConfigMaterialUpdaterTest {
public class ConfigMaterialUpdateListenerTest {
private GoRepoConfigDataSource repoConfigDataSource;
private MaterialRepository materialRepository;
private MaterialChecker materialChecker;
private ConfigMaterialUpdateCompletedTopic configCompleted;
private MaterialUpdateCompletedTopic topic;
private ConfigMaterialUpdater configUpdater;
private ConfigMaterialUpdateListener configUpdater;
private MaterialService materialService;

private Material material;
Expand Down Expand Up @@ -71,27 +71,21 @@ public void SetUp()

when(materialRepository.findLatestModification(material)).thenReturn(mods);

configUpdater = new ConfigMaterialUpdater(
configUpdater = new ConfigMaterialUpdateListener(
repoConfigDataSource,materialRepository,materialChecker,
configCompleted,topic,materialService,new TestSubprocessExecutionContext());
}
private MaterialRevisions revisions(Material material, Modification modification) {
return new MaterialRevisions(new MaterialRevision(material, modifications(modification)));
}

@Test
public void shouldSubscribeToMaterialUpdateCompletedMessages()
{
verify(configCompleted,times(1)).addListener(configUpdater);
}

@Test
public void shouldPostMaterialUpdateCompletedMessagesFurther()
{
MaterialUpdateSuccessfulMessage message = new MaterialUpdateSuccessfulMessage(material, 123);
this.configUpdater.onMessage(message);

verify(topic,times(1)).post(message);
verify(topic,times(1)).post(new ConfigMaterialUpdateCompletedMessage(material, 123));
}

@Test
Expand All @@ -110,7 +104,7 @@ public void shouldCallGoRepoConfigDataSourceWhenMaterialUpdateSuccessfulMessage(
this.configUpdater.onMessage(message);

verify(repoConfigDataSource,times(1)).onCheckoutComplete(material.config(),folder,"1");
verify(topic,times(1)).post(message);
verify(topic,times(1)).post(new ConfigMaterialUpdateCompletedMessage(material, 123));
}
@Test
public void shouldNotCallGoRepoConfigDataSourceWhenMaterialUpdateFailedMessage()
Expand All @@ -119,7 +113,7 @@ public void shouldNotCallGoRepoConfigDataSourceWhenMaterialUpdateFailedMessage()
this.configUpdater.onMessage(message);

verify(repoConfigDataSource,times(0)).onCheckoutComplete(material.config(),folder,"1");
verify(topic,times(1)).post(message);
verify(topic,times(1)).post(new ConfigMaterialUpdateCompletedMessage(material, 123));
}

@Test
Expand All @@ -133,7 +127,7 @@ public void shouldNotCallGoRepoConfigDataSourceWhenNoChanges()

verify(repoConfigDataSource,times(0)).onCheckoutComplete(material.config(),folder,"1");
// but pass message further anyway
verify(topic,times(1)).post(message);
verify(topic,times(1)).post(new ConfigMaterialUpdateCompletedMessage(material, 123));
}
@Test
public void shouldCallGoRepoConfigDataSourceWhenNewRevision()
Expand All @@ -149,6 +143,6 @@ public void shouldCallGoRepoConfigDataSourceWhenNewRevision()
this.configUpdater.onMessage(message);

verify(repoConfigDataSource,times(1)).onCheckoutComplete(material.config(),folder,"2");
verify(topic,times(1)).post(message);
verify(topic,times(1)).post(new ConfigMaterialUpdateCompletedMessage(material, 123));
}
}
Expand Up @@ -16,12 +16,15 @@

package com.thoughtworks.go.server.materials;

import com.thoughtworks.go.config.GoRepoConfigDataSource;
import com.thoughtworks.go.config.materials.SubprocessExecutionContext;
import com.thoughtworks.go.server.cronjob.GoDiskSpaceMonitor;
import com.thoughtworks.go.server.messaging.GoMessageListener;
import com.thoughtworks.go.server.perf.MDUPerformanceLogger;
import com.thoughtworks.go.server.persistence.MaterialRepository;
import com.thoughtworks.go.server.service.DrainModeService;
import com.thoughtworks.go.server.service.MaterialExpansionService;
import com.thoughtworks.go.server.service.MaterialService;
import com.thoughtworks.go.server.transaction.TransactionTemplate;
import com.thoughtworks.go.serverhealth.ServerHealthService;
import com.thoughtworks.go.util.SystemEnvironment;
Expand Down Expand Up @@ -56,6 +59,10 @@ public class MaterialUpdateListenerFactoryTest {
@Mock private MDUPerformanceLogger mduPerformanceLogger;
@Mock private DependencyMaterialUpdateQueue dependencyMaterialQueue;
@Mock private DrainModeService drainModeService;
@Mock private GoRepoConfigDataSource repoConfigDataSource;
@Mock private SubprocessExecutionContext subprocessExecutionContext;
@Mock private MaterialChecker materialChecker;
@Mock private MaterialService materialService;

@Before
public void setUp() throws Exception {
Expand All @@ -70,7 +77,8 @@ public void shouldCreateCompetingConsumersForSuppliedQueue() throws Exception {
materialRepository, systemEnvironment, healthService, diskSpaceMonitor,
transactionTemplate, dependencyMaterialUpdater, scmMaterialUpdater,
packageMaterialUpdater, pluggableSCMMaterialUpdater, materialExpansionService, mduPerformanceLogger,
dependencyMaterialQueue, null, drainModeService);
dependencyMaterialQueue, null, drainModeService,
repoConfigDataSource, materialChecker, materialService, subprocessExecutionContext);
factory.init();

verify(queue, new Times(NUMBER_OF_CONSUMERS)).addListener(any(GoMessageListener.class));
Expand All @@ -84,7 +92,8 @@ public void shouldCreateCompetingConsumersForSuppliedConfigQueue() throws Except
materialRepository, systemEnvironment, healthService, diskSpaceMonitor,
transactionTemplate, dependencyMaterialUpdater, scmMaterialUpdater,
packageMaterialUpdater, pluggableSCMMaterialUpdater, materialExpansionService, mduPerformanceLogger,
dependencyMaterialQueue, null, drainModeService);
dependencyMaterialQueue, null, drainModeService,
repoConfigDataSource, materialChecker, materialService, subprocessExecutionContext);
factory.init();

verify(configQueue, new Times(NUMBER_OF_CONFIG_CONSUMERS)).addListener(any(GoMessageListener.class));
Expand All @@ -100,7 +109,8 @@ public void shouldCreateCompetingConsumersForSuppliedDependencyMaterialQueue() t
materialRepository, systemEnvironment, healthService, diskSpaceMonitor,
transactionTemplate, dependencyMaterialUpdater, scmMaterialUpdater,
packageMaterialUpdater, pluggableSCMMaterialUpdater, materialExpansionService, mduPerformanceLogger,
dependencyMaterialQueue, null, drainModeService);
dependencyMaterialQueue, null, drainModeService,
repoConfigDataSource, materialChecker, materialService, subprocessExecutionContext);
factory.init();

verify(dependencyMaterialQueue, new Times(noOfDependencyMaterialCheckListeners)).addListener(any(GoMessageListener.class));
Expand Down

0 comments on commit 0a15ede

Please sign in to comment.