Skip to content

Commit

Permalink
Add progress output to neo4j-admin dump.
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisvest committed Apr 15, 2019
1 parent 7ff59e5 commit 1b58fec
Show file tree
Hide file tree
Showing 7 changed files with 263 additions and 15 deletions.
Expand Up @@ -69,6 +69,6 @@ public AdminCommandSection commandSection()
@Nonnull @Nonnull
public AdminCommand create( Path homeDir, Path configDir, OutsideWorld outsideWorld ) public AdminCommand create( Path homeDir, Path configDir, OutsideWorld outsideWorld )
{ {
return new DumpCommand( homeDir, configDir, new Dumper() ); return new DumpCommand( homeDir, configDir, new Dumper( outsideWorld.errorStream() ) );
} }
} }
100 changes: 91 additions & 9 deletions community/dbms/src/main/java/org/neo4j/dbms/archive/Dumper.java
Expand Up @@ -26,13 +26,21 @@
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.io.PrintStream;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.StandardOpenOption; import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate; import java.util.function.Predicate;


import org.neo4j.commandline.Util; import org.neo4j.commandline.Util;
import org.neo4j.function.ThrowingAction; import org.neo4j.function.ThrowingAction;
import org.neo4j.util.VisibleForTesting;


import static org.neo4j.dbms.archive.Utils.checkWritableDirectory; import static org.neo4j.dbms.archive.Utils.checkWritableDirectory;
import static org.neo4j.dbms.archive.Utils.copy; import static org.neo4j.dbms.archive.Utils.copy;
Expand All @@ -46,17 +54,66 @@


public class Dumper public class Dumper
{ {
private final List<ArchiveOperation> operations;
private final ProgressPrinter progressPrinter;

@VisibleForTesting
Dumper()
{
operations = new ArrayList<>();
progressPrinter = new ProgressPrinter( null );
}

public Dumper( PrintStream output )
{
operations = new ArrayList<>();
progressPrinter = new ProgressPrinter( output );
}

public void dump( Path dbPath, Path transactionalLogsPath, Path archive, CompressionFormat format, Predicate<Path> exclude ) public void dump( Path dbPath, Path transactionalLogsPath, Path archive, CompressionFormat format, Predicate<Path> exclude )
throws IOException throws IOException
{ {
checkWritableDirectory( archive.getParent() ); checkWritableDirectory( archive.getParent() );
try ( ArchiveOutputStream stream = openArchiveOut( archive, format ) ) try ( ArchiveOutputStream stream = openArchiveOut( archive, format ) )
{ {
operations.clear();

visitPath( dbPath, exclude, stream ); visitPath( dbPath, exclude, stream );
if ( !Util.isSameOrChildPath( dbPath, transactionalLogsPath ) ) if ( !Util.isSameOrChildPath( dbPath, transactionalLogsPath ) )
{ {
visitPath( transactionalLogsPath, exclude, stream ); visitPath( transactionalLogsPath, exclude, stream );
} }

progressPrinter.reset();
for ( ArchiveOperation operation : operations )
{
progressPrinter.maxBytes += operation.size;
progressPrinter.maxFiles += operation.isFile ? 1 : 0;
}

ScheduledExecutorService timer = Executors.newSingleThreadScheduledExecutor();
ScheduledFuture<?> timerFuture = timer.scheduleAtFixedRate( progressPrinter::printOnNextUpdate, 0, 100, TimeUnit.MILLISECONDS );
try
{
for ( ArchiveOperation operation : operations )
{
operation.addToArchive();
}
}
finally
{
timerFuture.cancel( false );
timer.shutdown();
try
{
timer.awaitTermination( 10, TimeUnit.SECONDS );
}
catch ( InterruptedException ignore )
{
}
}
progressPrinter.done();
progressPrinter.printProgress();
} }
} }


Expand Down Expand Up @@ -97,22 +154,47 @@ private void dumpDirectory( Path root, ArchiveOutputStream stream, Path dir ) th
private void withEntry( ThrowingAction<IOException> operation, Path root, ArchiveOutputStream stream, Path file ) private void withEntry( ThrowingAction<IOException> operation, Path root, ArchiveOutputStream stream, Path file )
throws IOException throws IOException
{ {
ArchiveEntry entry = createEntry( file, root, stream ); operations.add( new ArchiveOperation( operation, root, stream, file ) );
stream.putArchiveEntry( entry );
operation.apply();
stream.closeArchiveEntry();
} }


private ArchiveEntry createEntry( Path file, Path root, ArchiveOutputStream archive ) throws IOException private void writeFile( Path file, ArchiveOutputStream archiveStream ) throws IOException
{ {
return archive.createArchiveEntry( file.toFile(), "./" + root.relativize( file ).toString() ); try ( InputStream in = Files.newInputStream( file ) )
{
copy( in, archiveStream, progressPrinter );
}
} }


private void writeFile( Path file, ArchiveOutputStream archiveStream ) throws IOException private static class ArchiveOperation
{ {
try ( InputStream in = Files.newInputStream( file ) ) final ThrowingAction<IOException> operation;
final long size;
final boolean isFile;
final Path root;
final ArchiveOutputStream stream;
final Path file;

private ArchiveOperation( ThrowingAction<IOException> operation, Path root, ArchiveOutputStream stream, Path file ) throws IOException
{
this.operation = operation;
this.isFile = Files.isRegularFile( file );
this.size = isFile ? Files.size( file ) : 0;
this.root = root;
this.stream = stream;
this.file = file;
}

void addToArchive() throws IOException
{
ArchiveEntry entry = createEntry( file, root, stream );
stream.putArchiveEntry( entry );
operation.apply();
stream.closeArchiveEntry();
}

private ArchiveEntry createEntry( Path file, Path root, ArchiveOutputStream archive ) throws IOException
{ {
copy( in, archiveStream ); return archive.createArchiveEntry( file.toFile(), "./" + root.relativize( file ).toString() );
} }
} }
} }
Expand Up @@ -112,7 +112,7 @@ private void loadEntry( Path destination, ArchiveInputStream stream, ArchiveEntr
{ {
try ( OutputStream output = Files.newOutputStream( file ) ) try ( OutputStream output = Files.newOutputStream( file ) )
{ {
Utils.copy( stream, output ); Utils.copy( stream, output, new ProgressPrinter( null ) );
} }
} }
} }
Expand Down
@@ -0,0 +1,96 @@
/*
* Copyright (c) 2002-2019 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.dbms.archive;

import java.io.PrintStream;
import java.util.concurrent.atomic.AtomicBoolean;

import org.neo4j.io.ByteUnit;

class ProgressPrinter
{
private final AtomicBoolean printUpdate;
private final PrintStream output;
long maxBytes;
long maxFiles;
long currentBytes;
long currentFiles;
boolean done;

ProgressPrinter( PrintStream output )
{
this.output = output;
printUpdate = new AtomicBoolean();
}

void printOnNextUpdate()
{
printUpdate.set( true );
}

void reset()
{
maxBytes = 0;
maxFiles = 0;
currentBytes = 0;
currentFiles = 0;
}

void beginFile()
{
currentFiles++;
}

void addBytes( long n )
{
currentBytes += n;
if ( printUpdate.get() )
{
printProgress();
printUpdate.set( false );
}
}

void endFile()
{
printProgress();
}

void done()
{
done = true;
}

void printProgress()
{
if ( output != null )
{
if ( done )
{
output.println( "\rDone: " + currentFiles + " files, " + ByteUnit.bytesToString( currentBytes ) + " processed." );
}
else
{
double progress = (currentBytes / (double) maxBytes) * 100;
output.print( "\rFiles: " + currentFiles + '/' + maxFiles + ", data: " + String.format( "%4.1f%%", progress ));
}
}
}
}
Expand Up @@ -53,13 +53,16 @@ public static void checkWritableDirectory( Path directory ) throws FileSystemExc
} }
} }


public static void copy( InputStream in, OutputStream out ) throws IOException public static void copy( InputStream in, OutputStream out, ProgressPrinter progressPrinter ) throws IOException
{ {
progressPrinter.beginFile();
final byte[] buffer = new byte[8192]; final byte[] buffer = new byte[8192];
int n; int n;
while ( -1 != (n = in.read( buffer )) ) while ( -1 != (n = in.read( buffer )) )
{ {
out.write( buffer, 0, n ); out.write( buffer, 0, n );
progressPrinter.addBytes( n );
} }
progressPrinter.endFile();
} }
} }
Expand Up @@ -363,9 +363,8 @@ void shouldPrintNiceHelp() throws Exception
"%n" + "%n" +
"options:%n" + "options:%n" +
" --database=<name> Name of database. [default:" + GraphDatabaseSettings.DEFAULT_DATABASE_NAME + "]%n" + " --database=<name> Name of database. [default:" + GraphDatabaseSettings.DEFAULT_DATABASE_NAME + "]%n" +
" --zstd=<true|false> Use Zstandard compression (the dump file will not be%n" + " --zstd=<true|false> Use Zstandard compression (might not be loadable by%n" +
" loadable by older versions of Neo4j).%n" + " older versions of Neo4j). [default:false]%n" +
" [default:false]%n" +
" --to=<destination-path> Destination (file or folder) of database dump.%n" ), " --to=<destination-path> Destination (file or folder) of database dump.%n" ),
baos.toString() ); baos.toString() );
} }
Expand Down
@@ -0,0 +1,68 @@
/*
* Copyright (c) 2002-2019 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.dbms.archive;

import org.junit.jupiter.api.Test;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;

import static org.junit.jupiter.api.Assertions.assertEquals;

class ProgressPrinterTest
{
@Test
void progressOutput()
{
ByteArrayOutputStream bout = new ByteArrayOutputStream();
PrintStream printStream = new PrintStream( bout );
ProgressPrinter progressPrinter = new ProgressPrinter( printStream );
progressPrinter.maxBytes = 1000;
progressPrinter.maxFiles = 10;

progressPrinter.beginFile();
progressPrinter.addBytes( 5 );
progressPrinter.endFile();
progressPrinter.beginFile();
progressPrinter.addBytes( 50 );
progressPrinter.addBytes( 50 );
progressPrinter.printOnNextUpdate();
progressPrinter.addBytes( 100 );
progressPrinter.endFile();
progressPrinter.beginFile();
progressPrinter.printOnNextUpdate();
progressPrinter.addBytes( 100 );
progressPrinter.endFile();
progressPrinter.done();
progressPrinter.printProgress();

printStream.flush();
String output = bout.toString();
assertEquals( output,
"\rFiles: 1/10, data: 0.5%" +
"\rFiles: 2/10, data: 20.5%" +
"\rFiles: 2/10, data: 20.5%" +
"\rFiles: 3/10, data: 30.5%" +
"\rFiles: 3/10, data: 30.5%" +
"\rDone: 3 files, 305B processed." +
System.lineSeparator()
);
}
}

0 comments on commit 1b58fec

Please sign in to comment.