diff --git a/core/src/org/pentaho/di/core/logging/LogChannel.java b/core/src/org/pentaho/di/core/logging/LogChannel.java index 6426318bff1d..6a2e784f5bcf 100644 --- a/core/src/org/pentaho/di/core/logging/LogChannel.java +++ b/core/src/org/pentaho/di/core/logging/LogChannel.java @@ -23,7 +23,7 @@ package org.pentaho.di.core.logging; import java.util.Date; -import java.util.Deque; +import java.util.Queue; import java.util.Map; import org.pentaho.di.core.Const; @@ -295,7 +295,7 @@ public void snap( MetricsInterface metric, String subject, long... value ) { String key = MetricsSnapshot.getKey( metric, subject ); Map metricsMap = null; MetricsSnapshotInterface snapshot = null; - Deque metricsList = null; + Queue metricsList = null; switch ( metric.getType() ) { case MAX: // Calculate and store the maximum value for this metric diff --git a/core/src/org/pentaho/di/core/logging/MetricsRegistry.java b/core/src/org/pentaho/di/core/logging/MetricsRegistry.java index d94708dc28a8..ca9a6caa3a2b 100644 --- a/core/src/org/pentaho/di/core/logging/MetricsRegistry.java +++ b/core/src/org/pentaho/di/core/logging/MetricsRegistry.java @@ -2,7 +2,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2002-2013 by Pentaho : http://www.pentaho.com + * Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com * ******************************************************************************* * @@ -22,11 +22,11 @@ package org.pentaho.di.core.logging; -import java.util.Collections; -import java.util.Deque; -import java.util.HashMap; + import java.util.Map; -import java.util.concurrent.LinkedBlockingDeque; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import org.pentaho.di.core.metrics.MetricsSnapshotInterface; @@ -37,21 +37,18 @@ * */ public class MetricsRegistry { - private static MetricsRegistry registry; + private static MetricsRegistry registry = new MetricsRegistry(); private Map> snapshotMaps; - private Map> snapshotLists; + private Map> snapshotLists; public static MetricsRegistry getInstance() { - if ( registry == null ) { - registry = new MetricsRegistry(); - } return registry; } private MetricsRegistry() { - snapshotMaps = new HashMap>(); - snapshotLists = new HashMap>(); + snapshotMaps = new ConcurrentHashMap>(); + snapshotLists = new ConcurrentHashMap>(); } public void addSnapshot( LogChannelInterface logChannel, MetricsSnapshotInterface snapshot ) { @@ -60,7 +57,7 @@ public void addSnapshot( LogChannelInterface logChannel, MetricsSnapshotInterfac switch ( metric.getType() ) { case START: case STOP: - Deque list = getSnapshotList( channelId ); + Queue list = getSnapshotList( channelId ); list.add( snapshot ); break; @@ -77,7 +74,7 @@ public void addSnapshot( LogChannelInterface logChannel, MetricsSnapshotInterfac } } - public Map> getSnapshotLists() { + public Map> getSnapshotLists() { return snapshotLists; } @@ -92,10 +89,10 @@ public Map> getSnapshotMaps() { * The log channel to use. * @return an existing or a new metrics snapshot list. */ - public Deque getSnapshotList( String logChannelId ) { - Deque list = snapshotLists.get( logChannelId ); + public Queue getSnapshotList( String logChannelId ) { + Queue list = snapshotLists.get( logChannelId ); if ( list == null ) { - list = new LinkedBlockingDeque(); + list = new ConcurrentLinkedQueue(); snapshotLists.put( logChannelId, list ); } return list; @@ -112,7 +109,7 @@ public Deque getSnapshotList( String logChannelId ) { public Map getSnapshotMap( String logChannelId ) { Map map = snapshotMaps.get( logChannelId ); if ( map == null ) { - map = Collections.synchronizedMap( new HashMap() ); + map = new ConcurrentHashMap(); snapshotMaps.put( logChannelId, map ); } return map; diff --git a/core/src/org/pentaho/di/core/metrics/MetricsUtil.java b/core/src/org/pentaho/di/core/metrics/MetricsUtil.java index da7a7b4a08e2..884912f5fa95 100644 --- a/core/src/org/pentaho/di/core/metrics/MetricsUtil.java +++ b/core/src/org/pentaho/di/core/metrics/MetricsUtil.java @@ -2,7 +2,7 @@ * * Pentaho Data Integration * - * Copyright (C) 2002-2013 by Pentaho : http://www.pentaho.com + * Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com * ******************************************************************************* * @@ -23,7 +23,7 @@ package org.pentaho.di.core.metrics; import java.util.ArrayList; -import java.util.Deque; +import java.util.Queue; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -49,7 +49,7 @@ public class MetricsUtil { public static List getDuration( String logChannelId, Metrics metric ) { List durations = new ArrayList(); - Deque metrics = MetricsRegistry.getInstance().getSnapshotList( logChannelId ); + Queue metrics = MetricsRegistry.getInstance().getSnapshotList( logChannelId ); MetricsSnapshotInterface start = null; Iterator iterator = metrics.iterator(); @@ -117,7 +117,7 @@ public static List getDurations( String logChannelId ) { Map last = new HashMap(); Map map = new HashMap(); - Deque metrics = MetricsRegistry.getInstance().getSnapshotList( logChannelId ); + Queue metrics = MetricsRegistry.getInstance().getSnapshotList( logChannelId ); Iterator iterator = metrics.iterator(); while ( iterator.hasNext() ) { diff --git a/core/test-src/org/pentaho/di/core/logging/MetricsRegistryTest.java b/core/test-src/org/pentaho/di/core/logging/MetricsRegistryTest.java new file mode 100644 index 000000000000..ee0f7048353c --- /dev/null +++ b/core/test-src/org/pentaho/di/core/logging/MetricsRegistryTest.java @@ -0,0 +1,84 @@ +/******************************************************************************* + * + * Pentaho Data Integration + * + * Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com + * + ******************************************************************************* + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ +package org.pentaho.di.core.logging; + +import static org.junit.Assert.assertTrue; + +import org.junit.Before; +import org.junit.Test; + + +import java.util.ArrayList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class MetricsRegistryTest { + private MetricsRegistry metricsRegistry; + private List logIds; + private int threadCount = 100; + private int logChannelIdCount = 20; + private CountDownLatch countDownLatch = null; + + @Before + public void setUp() { + metricsRegistry = MetricsRegistry.getInstance(); + logIds = new ArrayList<>( logChannelIdCount ); + for ( int i = 1; i <= logChannelIdCount; i++ ) { + logIds.add( "logChannelId_" + i ); + } + countDownLatch = new CountDownLatch( 1 ); + } + + + @Test( timeout = 2000 ) + public void testConcurrencySnap() throws Exception { + ExecutorService service = Executors.newFixedThreadPool( threadCount ); + for ( int i = 0; i < threadCount; i++ ) { + service.submit( new ConcurrentPutIfAbsent( logIds.get( i % 20 ) ) ); + } + countDownLatch.countDown(); + service.shutdown(); + while ( !service.isTerminated() ) { + Thread.currentThread().sleep( 1 ); + } + int expectedQueueCount = logChannelIdCount > threadCount ? threadCount : logChannelIdCount; + assertTrue( expectedQueueCount == metricsRegistry.getSnapshotLists().size() ); + } + + private class ConcurrentPutIfAbsent implements Callable { + private String id; + ConcurrentPutIfAbsent( String id ) { + this.id = id; + } + @Override + public Queue call() throws Exception { + countDownLatch.await(); + return metricsRegistry.getSnapshotList( id ); + } + + } + +} diff --git a/engine/src/org/pentaho/di/trans/Trans.java b/engine/src/org/pentaho/di/trans/Trans.java index 1e5d0522a0c5..119b8a746292 100644 --- a/engine/src/org/pentaho/di/trans/Trans.java +++ b/engine/src/org/pentaho/di/trans/Trans.java @@ -32,7 +32,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.Date; -import java.util.Deque; +import java.util.Queue; import java.util.HashMap; import java.util.Hashtable; import java.util.Iterator; @@ -2510,7 +2510,7 @@ protected synchronized void writeMetricsInformation() throws KettleException { List logChannelIds = LoggingRegistry.getInstance().getLogChannelChildren( getLogChannelId() ); for ( String logChannelId : logChannelIds ) { - Deque snapshotList = + Queue snapshotList = MetricsRegistry.getInstance().getSnapshotLists().get( logChannelId ); if ( snapshotList != null ) { Iterator iterator = snapshotList.iterator(); @@ -3541,7 +3541,7 @@ public void setRunning( boolean running ) { * @throws KettleException * the kettle exception */ - public static final TransSplitter executeClustered( final TransMeta transMeta, + public static TransSplitter executeClustered( final TransMeta transMeta, final TransExecutionConfiguration executionConfiguration ) throws KettleException { if ( Utils.isEmpty( transMeta.getName() ) ) { throw new KettleException( "The transformation needs a name to uniquely identify it by on the remote server." ); @@ -3570,7 +3570,7 @@ public static final TransSplitter executeClustered( final TransMeta transMeta, * the kettle exception * @see org.pentaho.di.ui.spoon.delegates.SpoonTransformationDelegate */ - public static final void executeClustered( final TransSplitter transSplitter, + public static void executeClustered( final TransSplitter transSplitter, final TransExecutionConfiguration executionConfiguration ) throws KettleException { try { // Send the transformations to the servers...