Skip to content

Commit

Permalink
Monitor store files deletions by external applications
Browse files Browse the repository at this point in the history
Use file system monitoring possibilities to provide notification in case of store files deletion
by third parties.
  • Loading branch information
MishaDemianenko committed Jan 9, 2017
1 parent 4cbe20e commit d33ed88
Show file tree
Hide file tree
Showing 39 changed files with 1,767 additions and 125 deletions.
Expand Up @@ -54,6 +54,7 @@
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.helpers.progress.ProgressMonitorFactory;
import org.neo4j.io.fs.watcher.FileWatcher;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.logging.NullLogService;
import org.neo4j.kernel.impl.store.format.RecordFormats;
Expand Down
Expand Up @@ -450,6 +450,7 @@ public static void doImport( PrintStream out, PrintStream err, File storeDir, Fi
LogService logService = life.add( StoreLogService.inLogsDirectory( fs, logsDir ) );

life.start();
//TODO: add file watcher here?
BatchImporter importer = new ParallelBatchImporter( storeDir,
fs,
configuration,
Expand Down
Expand Up @@ -34,12 +34,16 @@
import java.nio.channels.FileChannel;
import java.nio.charset.Charset;
import java.nio.file.CopyOption;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.WatchService;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;

import org.neo4j.io.IOUtils;
import org.neo4j.io.fs.watcher.DefaultFileSystemWatcher;
import org.neo4j.io.fs.watcher.FileWatcher;

import static java.lang.String.format;

Expand All @@ -50,6 +54,13 @@ public class DefaultFileSystemAbstraction implements FileSystemAbstraction
{
static final String UNABLE_TO_CREATE_DIRECTORY_FORMAT = "Unable to create directory path [%s] for Neo4j store.";

@Override
public FileWatcher fileWatcher() throws IOException
{
WatchService watchService = FileSystems.getDefault().newWatchService();
return new DefaultFileSystemWatcher( watchService );
}

@Override
public StoreFileChannel open( File fileName, String mode ) throws IOException
{
Expand Down
Expand Up @@ -44,6 +44,8 @@
import java.util.stream.Stream;

import org.neo4j.io.IOUtils;
import org.neo4j.io.fs.watcher.DefaultFileSystemWatcher;
import org.neo4j.io.fs.watcher.FileWatcher;

import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;

Expand All @@ -61,6 +63,12 @@ public DelegateFileSystemAbstraction( FileSystem fs )
this.fs = fs;
}

@Override
public FileWatcher fileWatcher() throws IOException
{
return new DefaultFileSystemWatcher( fs.newWatchService() );
}

@Override
public StoreChannel open( File fileName, String mode ) throws IOException
{
Expand Down
Expand Up @@ -32,8 +32,19 @@
import java.util.function.Function;
import java.util.zip.ZipOutputStream;

import org.neo4j.io.fs.watcher.FileWatcher;

public interface FileSystemAbstraction extends Closeable
{

/**
* Create file watcher that provides possibilities to monitor directories on underlying file system
* abstraction
* @return specific file system abstract watcher
* @throws IOException in case exception occur during file watcher creation
*/
FileWatcher fileWatcher() throws IOException;

StoreChannel open( File fileName, String mode ) throws IOException;

OutputStream openAsOutputStream( File fileName, boolean append ) throws IOException;
Expand Down
@@ -0,0 +1,146 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.io.fs.watcher;

import org.apache.commons.lang3.StringUtils;

import java.io.File;
import java.io.IOException;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;

import org.neo4j.io.fs.watcher.event.FileWatchEventListener;
import org.neo4j.io.fs.watcher.resource.WatchedFile;
import org.neo4j.io.fs.watcher.resource.WatchedResource;

/**
* File watcher that monitors registered directories state using possibilities provided by {@link WatchService}.
*
* Safe to be used from multiple threads
*/
public class DefaultFileSystemWatcher implements FileWatcher
{
private final WatchService watchService;
private final List<FileWatchEventListener> listeners = new CopyOnWriteArrayList<>();
private volatile boolean watch;

public DefaultFileSystemWatcher( WatchService watchService )
{
this.watchService = watchService;
}

@Override
public WatchedResource watch( File file ) throws IOException
{
if ( !file.isDirectory() )
{
throw new IllegalArgumentException( "Only directories can be registered to be monitored." );
}
WatchKey watchKey = file.toPath()
.register( watchService, StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_MODIFY );
return new WatchedFile( watchKey );
}

@Override
public void startWatching() throws InterruptedException
{
watch = true;
while ( watch )
{
WatchKey key = watchService.take();
if ( key != null )
{
List<WatchEvent<?>> watchEvents = key.pollEvents();
for ( WatchEvent<?> watchEvent : watchEvents )
{
WatchEvent.Kind<?> kind = watchEvent.kind();
if ( StandardWatchEventKinds.ENTRY_MODIFY == kind )
{
notifyAboutModification( watchEvent );
}
if ( StandardWatchEventKinds.ENTRY_DELETE == kind )
{
notifyAboutDeletion( watchEvent );
}
}
key.reset();
}
}
}

@Override
public void stopWatching()
{
watch = false;
}

@Override
public void addFileWatchEventListener( FileWatchEventListener listener )
{
listeners.add( listener );
}

@Override
public void removeFileWatchEventListener( FileWatchEventListener listener )
{
listeners.remove( listener );
}

@Override
public void close() throws IOException
{
stopWatching();
watchService.close();
}

private void notifyAboutModification( WatchEvent<?> watchEvent )
{
String context = getContext( watchEvent );
if ( StringUtils.isNotEmpty( context ) )
{
for ( FileWatchEventListener listener : listeners )
{
listener.fileModified( context );
}
}
}

private void notifyAboutDeletion( WatchEvent<?> watchEvent )
{
String context = getContext( watchEvent );
if ( StringUtils.isNotEmpty( context ) )
{
for ( FileWatchEventListener listener : listeners )
{
listener.fileDeleted( context );
}
}
}

private String getContext( WatchEvent<?> watchEvent )
{
return Objects.toString( watchEvent.context(), StringUtils.EMPTY );
}
}
@@ -0,0 +1,76 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.io.fs.watcher;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.file.WatchService;

import org.neo4j.io.fs.watcher.event.FileWatchEventListener;
import org.neo4j.io.fs.watcher.resource.WatchedResource;

/**
* Watcher that allows receive notification about files modifications/removal for particular underlying file system.
*
* To be able to get notification users need to register resource they are interested in using
* {@link #watch(File)} method call and add by adding {@link FileWatchEventListener listner} to be able to receive
* status updates.
*
* @see WatchService
*/
public interface FileWatcher extends Closeable
{

FileWatcher SILENT_WATCHER = new SilentFileWatcher();

/**
* Register provided directory in list of resources that we would like to watch and receive status modification
* updates
* @param file directory to be monitored for updates
* @return
* @throws IOException
*/
WatchedResource watch( File file ) throws IOException;

/**
* Register listener to receive updates about registered resources.
* @param listener listener to register
*/
void addFileWatchEventListener( FileWatchEventListener listener );

/**
* Remove listener from a list of updates receivers.
* @param listener listener to remove
*/
void removeFileWatchEventListener( FileWatchEventListener listener );

/**
* Stop monitoring of registered directories
*/
void stopWatching();

/**
* Start monitoring of registered directories
* @throws InterruptedException when interrupted while waiting for update notification to come
*
*/
void startWatching() throws InterruptedException;
}
@@ -0,0 +1,65 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.io.fs.watcher;

import java.io.File;
import java.io.IOException;

import org.neo4j.io.fs.watcher.event.FileWatchEventListener;
import org.neo4j.io.fs.watcher.resource.WatchedResource;

/**
* Silent file watcher implementation that do not perform any monitoring and can't observe any directories status or
* content update.
*/
public class SilentFileWatcher implements FileWatcher
{

@Override
public WatchedResource watch( File file ) throws IOException
{
return WatchedResource.EMPTY;
}

@Override
public void addFileWatchEventListener( FileWatchEventListener listener )
{
}

@Override
public void removeFileWatchEventListener( FileWatchEventListener listener )
{
}

@Override
public void stopWatching()
{
}

@Override
public void startWatching() throws InterruptedException
{
}

@Override
public void close() throws IOException
{
}
}

0 comments on commit d33ed88

Please sign in to comment.