Skip to content

Commit

Permalink
[PDI-15635] - Changed implementations of collections, which are used …
Browse files Browse the repository at this point in the history
…in multithreaded environment im order to get proper synchronization and better performance.
  • Loading branch information
ArtsiomYurhevich committed Oct 4, 2016
1 parent d2df232 commit a4802f7
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 28 deletions.
4 changes: 2 additions & 2 deletions core/src/org/pentaho/di/core/logging/LogChannel.java
Expand Up @@ -23,7 +23,7 @@
package org.pentaho.di.core.logging;

import java.util.Date;
import java.util.Deque;
import java.util.Queue;
import java.util.Map;

import org.pentaho.di.core.Const;
Expand Down Expand Up @@ -295,7 +295,7 @@ public void snap( MetricsInterface metric, String subject, long... value ) {
String key = MetricsSnapshot.getKey( metric, subject );
Map<String, MetricsSnapshotInterface> metricsMap = null;
MetricsSnapshotInterface snapshot = null;
Deque<MetricsSnapshotInterface> metricsList = null;
Queue<MetricsSnapshotInterface> metricsList = null;
switch ( metric.getType() ) {
case MAX:
// Calculate and store the maximum value for this metric
Expand Down
33 changes: 15 additions & 18 deletions core/src/org/pentaho/di/core/logging/MetricsRegistry.java
Expand Up @@ -2,7 +2,7 @@
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2013 by Pentaho : http://www.pentaho.com
* Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com
*
*******************************************************************************
*
Expand All @@ -22,11 +22,11 @@

package org.pentaho.di.core.logging;

import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;

import java.util.Map;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

import org.pentaho.di.core.metrics.MetricsSnapshotInterface;

Expand All @@ -37,21 +37,18 @@
*
*/
public class MetricsRegistry {
private static MetricsRegistry registry;
private static MetricsRegistry registry = new MetricsRegistry();

private Map<String, Map<String, MetricsSnapshotInterface>> snapshotMaps;
private Map<String, Deque<MetricsSnapshotInterface>> snapshotLists;
private Map<String, Queue<MetricsSnapshotInterface>> snapshotLists;

public static MetricsRegistry getInstance() {
if ( registry == null ) {
registry = new MetricsRegistry();
}
return registry;
}

private MetricsRegistry() {
snapshotMaps = new HashMap<String, Map<String, MetricsSnapshotInterface>>();
snapshotLists = new HashMap<String, Deque<MetricsSnapshotInterface>>();
snapshotMaps = new ConcurrentHashMap<String, Map<String, MetricsSnapshotInterface>>();
snapshotLists = new ConcurrentHashMap<String, Queue<MetricsSnapshotInterface>>();
}

public void addSnapshot( LogChannelInterface logChannel, MetricsSnapshotInterface snapshot ) {
Expand All @@ -60,7 +57,7 @@ public void addSnapshot( LogChannelInterface logChannel, MetricsSnapshotInterfac
switch ( metric.getType() ) {
case START:
case STOP:
Deque<MetricsSnapshotInterface> list = getSnapshotList( channelId );
Queue<MetricsSnapshotInterface> list = getSnapshotList( channelId );
list.add( snapshot );

break;
Expand All @@ -77,7 +74,7 @@ public void addSnapshot( LogChannelInterface logChannel, MetricsSnapshotInterfac
}
}

public Map<String, Deque<MetricsSnapshotInterface>> getSnapshotLists() {
public Map<String, Queue<MetricsSnapshotInterface>> getSnapshotLists() {
return snapshotLists;
}

Expand All @@ -92,10 +89,10 @@ public Map<String, Map<String, MetricsSnapshotInterface>> getSnapshotMaps() {
* The log channel to use.
* @return an existing or a new metrics snapshot list.
*/
public Deque<MetricsSnapshotInterface> getSnapshotList( String logChannelId ) {
Deque<MetricsSnapshotInterface> list = snapshotLists.get( logChannelId );
public Queue<MetricsSnapshotInterface> getSnapshotList( String logChannelId ) {
Queue<MetricsSnapshotInterface> list = snapshotLists.get( logChannelId );
if ( list == null ) {
list = new LinkedBlockingDeque<MetricsSnapshotInterface>();
list = new ConcurrentLinkedQueue<MetricsSnapshotInterface>();
snapshotLists.put( logChannelId, list );
}
return list;
Expand All @@ -112,7 +109,7 @@ public Deque<MetricsSnapshotInterface> getSnapshotList( String logChannelId ) {
public Map<String, MetricsSnapshotInterface> getSnapshotMap( String logChannelId ) {
Map<String, MetricsSnapshotInterface> map = snapshotMaps.get( logChannelId );
if ( map == null ) {
map = Collections.synchronizedMap( new HashMap<String, MetricsSnapshotInterface>() );
map = new ConcurrentHashMap<String, MetricsSnapshotInterface>();
snapshotMaps.put( logChannelId, map );
}
return map;
Expand Down
8 changes: 4 additions & 4 deletions core/src/org/pentaho/di/core/metrics/MetricsUtil.java
Expand Up @@ -2,7 +2,7 @@
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2013 by Pentaho : http://www.pentaho.com
* Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com
*
*******************************************************************************
*
Expand All @@ -23,7 +23,7 @@
package org.pentaho.di.core.metrics;

import java.util.ArrayList;
import java.util.Deque;
import java.util.Queue;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
Expand All @@ -49,7 +49,7 @@ public class MetricsUtil {
public static List<MetricsDuration> getDuration( String logChannelId, Metrics metric ) {
List<MetricsDuration> durations = new ArrayList<MetricsDuration>();

Deque<MetricsSnapshotInterface> metrics = MetricsRegistry.getInstance().getSnapshotList( logChannelId );
Queue<MetricsSnapshotInterface> metrics = MetricsRegistry.getInstance().getSnapshotList( logChannelId );
MetricsSnapshotInterface start = null;

Iterator<MetricsSnapshotInterface> iterator = metrics.iterator();
Expand Down Expand Up @@ -117,7 +117,7 @@ public static List<MetricsDuration> getDurations( String logChannelId ) {
Map<String, MetricsSnapshotInterface> last = new HashMap<String, MetricsSnapshotInterface>();
Map<String, MetricsDuration> map = new HashMap<String, MetricsDuration>();

Deque<MetricsSnapshotInterface> metrics = MetricsRegistry.getInstance().getSnapshotList( logChannelId );
Queue<MetricsSnapshotInterface> metrics = MetricsRegistry.getInstance().getSnapshotList( logChannelId );

Iterator<MetricsSnapshotInterface> iterator = metrics.iterator();
while ( iterator.hasNext() ) {
Expand Down
84 changes: 84 additions & 0 deletions core/test-src/org/pentaho/di/core/logging/MetricsRegistryTest.java
@@ -0,0 +1,84 @@
/*******************************************************************************
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com
*
*******************************************************************************
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
package org.pentaho.di.core.logging;

import static org.junit.Assert.assertTrue;

import org.junit.Before;
import org.junit.Test;


import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class MetricsRegistryTest {
private MetricsRegistry metricsRegistry;
private List<String> logIds;
private int threadCount = 100;
private int logChannelIdCount = 20;
private CountDownLatch countDownLatch = null;

@Before
public void setUp() {
metricsRegistry = MetricsRegistry.getInstance();
logIds = new ArrayList<>( logChannelIdCount );
for ( int i = 1; i <= logChannelIdCount; i++ ) {
logIds.add( "logChannelId_" + i );
}
countDownLatch = new CountDownLatch( 1 );
}


@Test( timeout = 2000 )
public void testConcurrencySnap() throws Exception {
ExecutorService service = Executors.newFixedThreadPool( threadCount );
for ( int i = 0; i < threadCount; i++ ) {
service.submit( new ConcurrentPutIfAbsent( logIds.get( i % 20 ) ) );
}
countDownLatch.countDown();
service.shutdown();
while ( !service.isTerminated() ) {
Thread.currentThread().sleep( 1 );
}
int expectedQueueCount = logChannelIdCount > threadCount ? threadCount : logChannelIdCount;
assertTrue( expectedQueueCount == metricsRegistry.getSnapshotLists().size() );
}

private class ConcurrentPutIfAbsent implements Callable<Queue> {
private String id;
ConcurrentPutIfAbsent( String id ) {
this.id = id;
}
@Override
public Queue call() throws Exception {
countDownLatch.await();
return metricsRegistry.getSnapshotList( id );
}

}

}
8 changes: 4 additions & 4 deletions engine/src/org/pentaho/di/trans/Trans.java
Expand Up @@ -32,7 +32,7 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.Deque;
import java.util.Queue;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Iterator;
Expand Down Expand Up @@ -2510,7 +2510,7 @@ protected synchronized void writeMetricsInformation() throws KettleException {

List<String> logChannelIds = LoggingRegistry.getInstance().getLogChannelChildren( getLogChannelId() );
for ( String logChannelId : logChannelIds ) {
Deque<MetricsSnapshotInterface> snapshotList =
Queue<MetricsSnapshotInterface> snapshotList =
MetricsRegistry.getInstance().getSnapshotLists().get( logChannelId );
if ( snapshotList != null ) {
Iterator<MetricsSnapshotInterface> iterator = snapshotList.iterator();
Expand Down Expand Up @@ -3541,7 +3541,7 @@ public void setRunning( boolean running ) {
* @throws KettleException
* the kettle exception
*/
public static final TransSplitter executeClustered( final TransMeta transMeta,
public static TransSplitter executeClustered( final TransMeta transMeta,
final TransExecutionConfiguration executionConfiguration ) throws KettleException {
if ( Utils.isEmpty( transMeta.getName() ) ) {
throw new KettleException( "The transformation needs a name to uniquely identify it by on the remote server." );
Expand Down Expand Up @@ -3570,7 +3570,7 @@ public static final TransSplitter executeClustered( final TransMeta transMeta,
* the kettle exception
* @see org.pentaho.di.ui.spoon.delegates.SpoonTransformationDelegate
*/
public static final void executeClustered( final TransSplitter transSplitter,
public static void executeClustered( final TransSplitter transSplitter,
final TransExecutionConfiguration executionConfiguration ) throws KettleException {
try {
// Send the transformations to the servers...
Expand Down

0 comments on commit a4802f7

Please sign in to comment.