Skip to content

Commit

Permalink
Start and stop store watcher when replace database store
Browse files Browse the repository at this point in the history
Store often can be replaced in clustering environment when store get copied from another machine
and then original database store get replaced.
If we will nto gonna stop watcher we will get useless notifications that store files where deleted.
  • Loading branch information
MishaDemianenko committed Feb 24, 2017
1 parent fddd16e commit 4c894a1
Show file tree
Hide file tree
Showing 13 changed files with 393 additions and 44 deletions.
@@ -0,0 +1,114 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.io.fs.watcher;

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import org.neo4j.io.IOUtils;
import org.neo4j.io.fs.watcher.resource.WatchedResource;

/**
* File system delegate that will remember all the files that it was asked to watch
* and will register them in real delegate during {@link #startWatching()} call.
* When delegate will be stopped all registered resources will be closed and delegate delegate will be stopped.
*
* Described pattern allows to perform repeatable startWatching/stopWatching cycle for pre-configured set of files.
*/
public class RestartableFileSystemWatcher implements FileWatcher
{
private FileWatcher delegate;
private Set<File> filesToWatch = Collections.newSetFromMap( new ConcurrentHashMap<>() );
private Set<WatchedResource> watchedResources = Collections.newSetFromMap( new ConcurrentHashMap<>() );

public RestartableFileSystemWatcher( FileWatcher delegate )
{
this.delegate = delegate;
}

@Override
public WatchedResource watch( File file ) throws IOException
{
filesToWatch.add( file );
return WatchedResource.EMPTY;
}

@Override
public void addFileWatchEventListener( FileWatchEventListener listener )
{
delegate.addFileWatchEventListener( listener );
}

@Override
public void removeFileWatchEventListener( FileWatchEventListener listener )
{
delegate.removeFileWatchEventListener( listener );
}

@Override
public void stopWatching()
{
try
{
IOUtils.closeAll( watchedResources );
watchedResources.clear();
}
catch ( IOException e )
{
throw new UncheckedIOException( e );
}
finally
{
delegate.stopWatching();
}
}

@Override
public void startWatching() throws InterruptedException
{
for ( File fileToWatch : filesToWatch )
{
watchFile( fileToWatch );
}
delegate.startWatching();
}

@Override
public void close() throws IOException
{
delegate.close();
}

private void watchFile( File fileToWatch )
{
try
{
watchedResources.add( delegate.watch( fileToWatch ) );
}
catch ( IOException e )
{
throw new UncheckedIOException( e );
}
}
}
@@ -0,0 +1,92 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.io.fs.watcher;

import org.junit.Test;

import java.io.File;
import java.io.IOException;

import org.neo4j.io.fs.watcher.resource.WatchedResource;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class RestartableFileSystemWatcherTest
{

private FileWatcher delegate = mock( FileWatcher.class );
private RestartableFileSystemWatcher watcher = new RestartableFileSystemWatcher( delegate );

@Test
public void delegateListenersCallToRealWatcher()
{
FileWatchEventListener listener = mock( FileWatchEventListener.class );

watcher.addFileWatchEventListener( listener );
verify( delegate ).addFileWatchEventListener( listener );

watcher.removeFileWatchEventListener( listener );
verify( delegate ).removeFileWatchEventListener( listener );
}

@Test
public void closeDelegateOnClose() throws IOException
{
watcher.close();
verify( delegate ).close();
}

@Test
public void startStopFileWatchingCycle() throws IOException, InterruptedException
{
File file1 = mock( File.class );
File file2 = mock( File.class );
WatchedResource resource1 = mock( WatchedResource.class );
WatchedResource resource2 = mock( WatchedResource.class );
watcher.watch( file1 );
watcher.watch( file2 );

when( delegate.watch( file1 ) ).thenReturn( resource1 );
when( delegate.watch( file2 ) ).thenReturn( resource2 );

int invocations = 100;
for ( int i = 0; i < invocations; i++ )
{
startStopWatching();
}

verify( delegate, times( invocations ) ).watch( file1 );
verify( delegate, times( invocations ) ).watch( file2 );
verify( delegate, times( invocations ) ).startWatching();
verify( delegate, times( invocations ) ).stopWatching();

verify( resource1, times( invocations ) ).close();
verify( resource2, times( invocations ) ).close();
}

private void startStopWatching() throws InterruptedException
{
watcher.startWatching();
watcher.stopWatching();
}
}
Expand Up @@ -21,20 +21,18 @@

import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;

import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.graphdb.security.URLAccessRule;
import org.neo4j.helpers.collection.Iterables;
import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.fs.FileSystemLifecycleAdapter;
import org.neo4j.io.fs.watcher.FileWatcher;
import org.neo4j.io.fs.watcher.RestartableFileSystemWatcher;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.extension.KernelExtensionFactory;
import org.neo4j.kernel.extension.KernelExtensions;
import org.neo4j.kernel.extension.UnsatisfiedDependencyStrategies;
import org.neo4j.kernel.impl.api.LogRotationMonitor;
Expand All @@ -51,7 +49,8 @@
import org.neo4j.kernel.impl.util.JobScheduler;
import org.neo4j.kernel.impl.util.Neo4jJobScheduler;
import org.neo4j.kernel.impl.util.watcher.DefaultFileDeletionEventListener;
import org.neo4j.kernel.impl.util.watcher.WatcherLifecycleAdapterFactory;
import org.neo4j.kernel.impl.util.watcher.FileSystemWatcherService;
import org.neo4j.kernel.impl.util.watcher.FileSystemWatcherServiceFactory;
import org.neo4j.kernel.info.DiagnosticsManager;
import org.neo4j.kernel.info.JvmChecker;
import org.neo4j.kernel.info.JvmMetadataRepository;
Expand Down Expand Up @@ -117,6 +116,8 @@ public class PlatformModule

public final StoreCopyCheckPointMutex storeCopyCheckPointMutex;

public final FileSystemWatcherService watcherService;

public PlatformModule( File providedStoreDir, Map<String,String> params, DatabaseInfo databaseInfo,
GraphDatabaseFacadeFactory.Dependencies externalDependencies, GraphDatabaseFacade graphDatabaseFacade )
{
Expand Down Expand Up @@ -176,8 +177,9 @@ public PlatformModule( File providedStoreDir, Config config, DatabaseInfo databa
CheckPointerMonitor.class, tracers.checkPointTracer, CheckPointerMonitor.NULL ) );

FileWatcher fileWatcher = createFileWatcher();
dependencies.satisfyDependencies( fileWatcher );
life.add( WatcherLifecycleAdapterFactory.createLifecycleAdapter( jobScheduler, fileWatcher ) );
watcherService = FileSystemWatcherServiceFactory.createFileSystemWatcherService( jobScheduler, fileWatcher );
dependencies.satisfyDependencies( watcherService );
life.add( watcherService );

pageCache = dependencies.satisfyDependency( createPageCache( fileSystem, config, logging, tracers ) );
life.add( new PageCacheLifecycle( pageCache ) );
Expand Down Expand Up @@ -212,7 +214,7 @@ protected FileWatcher createFileWatcher()
{
try
{
FileWatcher watcher = fileSystem.fileWatcher();
RestartableFileSystemWatcher watcher = new RestartableFileSystemWatcher( fileSystem.fileWatcher() );
watcher.addFileWatchEventListener( new DefaultFileDeletionEventListener( logging ) );
watcher.watch( storeDir );
// register to watch store dir parent folder to see when store dir removed
Expand Down Expand Up @@ -324,22 +326,4 @@ protected TransactionStats createTransactionStats()
return new TransactionStats();
}

private Iterable<Class<?>> getSettingsClasses( Iterable<Class<?>> settingsClasses,
Iterable<KernelExtensionFactory<?>> kernelExtensions )
{
List<Class<?>> totalSettingsClasses = Iterables.asList( settingsClasses );

// Get the list of settings classes for extensions
for ( KernelExtensionFactory<?> kernelExtension : kernelExtensions )
{
Class<?> settingsClass = kernelExtension.getSettingsClass();
if ( settingsClass != null )
{
totalSettingsClasses.add( settingsClass );
}
}

return totalSettingsClasses;
}

}
@@ -0,0 +1,97 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.util.watcher;

import java.util.concurrent.ThreadFactory;

import org.neo4j.io.fs.watcher.FileWatcher;
import org.neo4j.kernel.impl.util.JobScheduler;

/**
* Factory used for construction of proper adaptor for available {@link FileWatcher}.
* In case if silent matcher is used dummy adapter will be used, otherwise will use default wrapper that will bind
* monitoring cycles to corresponding lifecycle phases.
*/
public class DefaultFileSystemWatcherService implements FileSystemWatcherService
{
private final JobScheduler jobScheduler;
private final FileWatcher fileWatcher;
private final FileSystemEventWatcher eventWatcher;
private ThreadFactory fileWatchers;
private Thread watcher;

DefaultFileSystemWatcherService( JobScheduler jobScheduler, FileWatcher fileWatcher )
{
this.jobScheduler = jobScheduler;
this.fileWatcher = fileWatcher;
this.eventWatcher = new FileSystemEventWatcher();
}

@Override
public void init() throws Throwable
{
fileWatchers = jobScheduler.threadFactory( JobScheduler.Groups.fileWatch );
}

@Override
public void start() throws Throwable
{
watcher = fileWatchers.newThread( eventWatcher );
watcher.start();
}

@Override
public void stop() throws Throwable
{
eventWatcher.stopWatching();
if ( watcher != null )
{
watcher.interrupt();
watcher.join();
watcher = null;
}
}

@Override
public void shutdown() throws Throwable
{
fileWatcher.close();
}

private class FileSystemEventWatcher implements Runnable
{
@Override
public void run()
{
try
{
fileWatcher.startWatching();
}
catch ( InterruptedException ignored )
{
}
}

void stopWatching()
{
fileWatcher.stopWatching();
}
}
}

0 comments on commit 4c894a1

Please sign in to comment.