Skip to content

Commit

Permalink
Merge 2.3 into 3.0
Browse files Browse the repository at this point in the history
  • Loading branch information
MishaDemianenko committed Nov 5, 2015
2 parents 67364ea + 90e0f67 commit 1f5e0ed
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.neo4j.kernel.ha.com.slave.InvalidEpochExceptionHandler;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.util.JobScheduler;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.LogProvider;

/**
Expand All @@ -47,12 +48,13 @@ public class PullerFactory
private final DependencyResolver dependencyResolver;
private final AvailabilityGuard availabilityGuard;
private final HighAvailabilityMemberStateMachine memberStateMachine;
private final Monitors monitors;

public PullerFactory( RequestContextFactory requestContextFactory, Master master,
LastUpdateTime lastUpdateTime, LogProvider logging, InstanceId serverId,
InvalidEpochExceptionHandler invalidEpochHandler, long pullInterval,
JobScheduler jobScheduler, DependencyResolver dependencyResolver, AvailabilityGuard availabilityGuard,
HighAvailabilityMemberStateMachine memberStateMachine )
HighAvailabilityMemberStateMachine memberStateMachine, Monitors monitors )
{

this.requestContextFactory = requestContextFactory;
Expand All @@ -66,12 +68,13 @@ public PullerFactory( RequestContextFactory requestContextFactory, Master master
this.dependencyResolver = dependencyResolver;
this.availabilityGuard = availabilityGuard;
this.memberStateMachine = memberStateMachine;
this.monitors = monitors;
}

public SlaveUpdatePuller createSlaveUpdatePuller()
{
return new SlaveUpdatePuller( requestContextFactory, master, lastUpdateTime, logging, serverId,
availabilityGuard, invalidEpochHandler );
availabilityGuard, invalidEpochHandler, monitors.newMonitor( SlaveUpdatePuller.Monitor.class ) );
}

public UpdatePullingTransactionObligationFulfiller createObligationFulfiller( UpdatePuller updatePuller )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@
*/
public class SlaveUpdatePuller extends LifecycleAdapter implements Runnable, UpdatePuller
{
public interface Monitor
{
void pulledUpdates( long lastAppliedTxId );
}

public static String UPDATE_PULLER_THREAD_PREFIX = "UpdatePuller@";

static final Condition NEXT_TICKET = new Condition()
Expand All @@ -134,21 +139,22 @@ public boolean evaluate( int currentTicket, int targetTicket )
private final InstanceId instanceId;
private final AvailabilityGuard availabilityGuard;
private InvalidEpochExceptionHandler invalidEpochHandler;
private final Monitor monitor;
private Thread me;

SlaveUpdatePuller( RequestContextFactory requestContextFactory, Master master, LastUpdateTime lastUpdateTime,
LogProvider logging, InstanceId instanceId, AvailabilityGuard availabilityGuard,
InvalidEpochExceptionHandler invalidEpochHandler )
InvalidEpochExceptionHandler invalidEpochHandler, Monitor monitor )
{
this.requestContextFactory = requestContextFactory;
this.master = master;
this.lastUpdateTime = lastUpdateTime;
this.instanceId = instanceId;
this.availabilityGuard = availabilityGuard;
this.invalidEpochHandler = invalidEpochHandler;
this.monitor = monitor;
this.logger = logging.getLog( getClass() );
this.cappedLogger = new CappedOperation<Pair<String,? extends Exception>>(
CappedOperation.count( 10 ) )
this.cappedLogger = new CappedOperation<Pair<String,? extends Exception>>( CappedOperation.count( 10 ) )
{
@Override
protected void triggered( Pair<String,? extends Exception> event )
Expand Down Expand Up @@ -304,6 +310,7 @@ private void doPullUpdates()
try ( Response<Void> ignored = master.pullUpdates( context ) )
{
// Updates would be applied as part of response processing
monitor.pulledUpdates( context.lastAppliedTransaction() );
}
}
catch ( InvalidEpochException e )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ public void handle()
PullerFactory pullerFactory = new PullerFactory( requestContextFactory, master, lastUpdateTime,
logging.getInternalLogProvider(), serverId, invalidEpochHandler,
config.get( HaSettings.pull_interval ), platformModule.jobScheduler,
dependencies, platformModule.availabilityGuard, memberStateMachine );
dependencies, platformModule.availabilityGuard, memberStateMachine, monitors );

dependencies.satisfyDependency( paxosLife.add( pullerFactory.createObligationFulfiller( updatePullerProxy ) ) );

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.RETURNS_MOCKS;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

import static org.neo4j.logging.AssertableLogProvider.inLog;

public class SlaveUpdatePullerTest
Expand All @@ -61,19 +61,21 @@ public class SlaveUpdatePullerTest
private final Config config = mock( Config.class );
private final AvailabilityGuard availabilityGuard = mock( AvailabilityGuard.class );
private final LastUpdateTime lastUpdateTime = mock( LastUpdateTime.class );
private final Master master = mock( Master.class );
private final Master master = mock( Master.class, RETURNS_MOCKS );
private final AssertableLogProvider logProvider = new AssertableLogProvider();
private final RequestContextFactory requestContextFactory = mock( RequestContextFactory.class );
private final InvalidEpochExceptionHandler invalidEpochHandler = mock( InvalidEpochExceptionHandler.class );
private final SlaveUpdatePuller.Monitor monitor = mock( SlaveUpdatePuller.Monitor.class );
private final SlaveUpdatePuller updatePuller = new SlaveUpdatePuller( requestContextFactory,
master, lastUpdateTime, logProvider, instanceId, availabilityGuard, invalidEpochHandler );
master, lastUpdateTime, logProvider, instanceId, availabilityGuard, invalidEpochHandler, monitor );

@Rule
public final CleanupRule cleanup = new CleanupRule();

@Before
public void setup() throws Throwable
{
when( requestContextFactory.newRequestContext() ).thenReturn( new RequestContext( 42, 42, 42, 42, 42 ) );
when( config.get( HaSettings.pull_interval ) ).thenReturn( 1000l );
when( config.get( ClusterSettings.server_id ) ).thenReturn( instanceId );
when( availabilityGuard.isAvailable( anyLong() ) ).thenReturn( true );
Expand All @@ -91,6 +93,7 @@ public void shouldStopPullingAfterStop() throws Throwable
verify( lastUpdateTime, times( 1 ) ).setLastUpdateTime( anyLong() );
verify( availabilityGuard, times( 1 ) ).isAvailable( anyLong() );
verify( master, times( 1 ) ).pullUpdates( Matchers.<RequestContext>any() );
verify( monitor, times( 1 ) ).pulledUpdates( anyLong() );

// WHEN
updatePuller.shutdown();
Expand All @@ -110,6 +113,7 @@ public void keepPullingUpdatesOnConsecutiveCalls() throws Throwable
verify( lastUpdateTime, times( 1 ) ).setLastUpdateTime( anyLong() );
verify( availabilityGuard, times( 1 ) ).isAvailable( anyLong() );
verify( master, times( 1 ) ).pullUpdates( Matchers.<RequestContext>any() );
verify( monitor, times( 1 ) ).pulledUpdates( anyLong() );

// WHEN
updatePuller.pullUpdates();
Expand All @@ -118,6 +122,7 @@ public void keepPullingUpdatesOnConsecutiveCalls() throws Throwable
verify( lastUpdateTime, times( 2 ) ).setLastUpdateTime( anyLong() );
verify( availabilityGuard, times( 2 ) ).isAvailable( anyLong() );
verify( master, times( 2 ) ).pullUpdates( Matchers.<RequestContext>any() );
verify( monitor, times( 2 ) ).pulledUpdates( anyLong() );
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ public enum CsvFile
+ "complete." )
public static Setting<Boolean> neoLogRotationEnabled = setting(
"metrics.neo4j.logrotation.enabled", Settings.BOOLEAN, neoEnabled );
@Description( "Enable reporting metrics about HA cluster info." )
public static Setting<Boolean> neoClusterEnabled = setting(
"metrics.neo4j.cluster.enabled", Settings.BOOLEAN, neoEnabled );

@Description( "Enable reporting metrics about the duration of garbage collections" )
public static Setting<Boolean> jvmGcEnabled = setting( "metrics.jvm.gc.enabled", Settings.BOOLEAN, neoEnabled );
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright (c) 2002-2015 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.metrics.source;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;

import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.ha.SlaveUpdatePuller;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.metrics.MetricsSettings;

import static com.codahale.metrics.MetricRegistry.name;

public class ClusterMetrics extends LifecycleAdapter
{
private static final String NAME_PREFIX = "neo4j.cluster";
private static final String SLAVE_PULL_UPDATES = name( NAME_PREFIX, "slave_pull_updates" );
private static final String SLAVE_PULL_UPDATE_UP_TO_TX = name( NAME_PREFIX, "slave_pull_update_up_to_tx" );

private final Config config;
private final Monitors monitors;
private final MetricRegistry registry;
private final SlaveUpdatePullerMonitor monitor = new SlaveUpdatePullerMonitor();

public ClusterMetrics( Config config, Monitors monitors, MetricRegistry registry )
{
this.config = config;
this.monitors = monitors;
this.registry = registry;
}

@Override
public void start() throws Throwable
{
if ( config.get( MetricsSettings.neoClusterEnabled ) )
{
monitors.addMonitorListener( monitor );

registry.register( SLAVE_PULL_UPDATES, new Gauge<Long>()
{
@Override
public Long getValue()
{
return monitor.events.get();
}
} );

registry.register( SLAVE_PULL_UPDATE_UP_TO_TX, new Gauge<Long>()
{
@Override
public Long getValue()
{
return monitor.lastAppliedTxId;
}
} );
}
}

@Override
public void stop() throws IOException
{
if ( config.get( MetricsSettings.neoClusterEnabled ) )
{
registry.remove( SLAVE_PULL_UPDATES );
registry.remove( SLAVE_PULL_UPDATE_UP_TO_TX );

monitors.removeMonitorListener( monitor );
}
}

private static class SlaveUpdatePullerMonitor implements SlaveUpdatePuller.Monitor
{
private AtomicLong events = new AtomicLong();
private volatile long lastAppliedTxId;

@Override
public void pulledUpdates( long lastAppliedTxId )
{
events.incrementAndGet();
this.lastAppliedTxId = lastAppliedTxId;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public Lifecycle newInstance()
final DBMetrics dbMetrics = new DBMetrics( registry, config,
transactionCounters, pageCacheCounters, checkPointerMonitor, logRotationMonitor, idGeneratorFactory );
final NetworkMetrics networkMetrics = new NetworkMetrics( config, monitors, registry );
final ClusterMetrics clusterMetrics = new ClusterMetrics( config, monitors, registry );
final JvmMetrics jvmMetrics = new JvmMetrics( config, registry );
final CypherMetrics cypherMetrics = new CypherMetrics( config, monitors, registry );
return new LifecycleAdapter()
Expand All @@ -75,6 +76,7 @@ public void start() throws Throwable
{
dbMetrics.start();
networkMetrics.start();
clusterMetrics.start();
jvmMetrics.start();
cypherMetrics.start();
}
Expand All @@ -84,6 +86,7 @@ public void stop() throws IOException
{
dbMetrics.stop();
networkMetrics.stop();
clusterMetrics.stop();
jvmMetrics.stop();
cypherMetrics.stop();
}
Expand Down

0 comments on commit 1f5e0ed

Please sign in to comment.