Skip to content

Commit

Permalink
Extracted FullStoreChangeStream from LabelScanProvider
Browse files Browse the repository at this point in the history
  • Loading branch information
tinwelint committed Jan 12, 2017
1 parent 2a4863f commit 331d4c5
Show file tree
Hide file tree
Showing 10 changed files with 113 additions and 69 deletions.
@@ -0,0 +1,67 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.api.scan;

import org.apache.commons.lang3.ArrayUtils;

import java.io.IOException;
import java.util.function.Supplier;

import org.neo4j.helpers.collection.Visitor;
import org.neo4j.kernel.api.labelscan.LabelScanWriter;
import org.neo4j.kernel.api.labelscan.NodeLabelUpdate;
import org.neo4j.kernel.impl.api.index.IndexStoreView;
import org.neo4j.kernel.impl.api.index.StoreScan;

import static org.neo4j.function.Predicates.ALWAYS_TRUE_INT;

/**
* {@link FullStoreChangeStream} using a {@link IndexStoreView} to get its data.
*/
public class FullLabelStream implements FullStoreChangeStream, Visitor<NodeLabelUpdate,IOException>
{
private final Supplier<IndexStoreView> lazyIndexStoreView;
private LabelScanWriter writer;
private long count;

public FullLabelStream( Supplier<IndexStoreView> lazyIndexStoreView )
{
this.lazyIndexStoreView = lazyIndexStoreView;
}

@Override
public long applyTo( LabelScanWriter writer ) throws IOException
{
// Keep the write for using it in visit
this.writer = writer;
IndexStoreView view = lazyIndexStoreView.get();
StoreScan<IOException> scan = view.visitNodes( ArrayUtils.EMPTY_INT_ARRAY, ALWAYS_TRUE_INT, null, this );
scan.run();
return count;
}

@Override
public boolean visit( NodeLabelUpdate update ) throws IOException
{
writer.write( update );
count++;
return false;
}
}
@@ -0,0 +1,35 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.api.scan;

import java.io.IOException;

import org.neo4j.kernel.api.labelscan.LabelScanStore;
import org.neo4j.kernel.api.labelscan.LabelScanWriter;

/**
* Stream of changes used to rebuild a {@link LabelScanStore} from scratch.
*/
public interface FullStoreChangeStream
{
FullStoreChangeStream EMPTY = writer -> 0;

long applyTo( LabelScanWriter writer ) throws IOException;
}
Expand Up @@ -19,26 +19,16 @@
*/
package org.neo4j.kernel.impl.api.scan;

import org.apache.commons.lang3.ArrayUtils;

import java.io.File;
import java.io.IOException;
import java.util.function.Supplier;

import org.neo4j.helpers.collection.Visitor;
import org.neo4j.kernel.api.labelscan.LabelScanStore;
import org.neo4j.kernel.api.labelscan.LabelScanWriter;
import org.neo4j.kernel.api.labelscan.NodeLabelUpdate;
import org.neo4j.kernel.extension.KernelExtensionFactory;
import org.neo4j.kernel.extension.KernelExtensions;
import org.neo4j.kernel.impl.api.index.IndexStoreView;
import org.neo4j.kernel.impl.api.index.StoreScan;
import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;

import static org.neo4j.function.Predicates.ALWAYS_TRUE_INT;

/**
* Used by a {@link KernelExtensions} to provide access a {@link LabelScanStore} and prioritize against other.
*
Expand Down Expand Up @@ -86,56 +76,11 @@ public String toString()
return getClass().getSimpleName() + "[" + labelScanStore + ", prio:" + priority + "]";
}

public interface FullStoreChangeStream
{
long applyTo( LabelScanWriter writer ) throws IOException;
}

public static final FullStoreChangeStream EMPTY = writer -> 0;

public static FullStoreChangeStream fullStoreLabelUpdateStream( Supplier<IndexStoreView> lazyIndexStoreView )
{
// IndexStoreView provided as supplier because we only actually have that dependency available
// when it's time to rebuilt, not when we construct this object
return new FullLabelStream( lazyIndexStoreView );
}

public static long rebuild( LabelScanStore store, FullStoreChangeStream fullStoreStream ) throws IOException
{
try ( LabelScanWriter writer = store.newWriter() )
{
return fullStoreStream.applyTo( writer );
}
}

private static class FullLabelStream implements FullStoreChangeStream, Visitor<NodeLabelUpdate,IOException>
{
private final Supplier<IndexStoreView> lazyIndexStoreView;
private LabelScanWriter writer;
private long count;

public FullLabelStream( Supplier<IndexStoreView> lazyIndexStoreView )
{
this.lazyIndexStoreView = lazyIndexStoreView;
}

@Override
public long applyTo( LabelScanWriter writer ) throws IOException
{
// Keep the write for using it in visit
this.writer = writer;
IndexStoreView view = lazyIndexStoreView.get();
StoreScan<IOException> scan = view.visitNodes( ArrayUtils.EMPTY_INT_ARRAY, ALWAYS_TRUE_INT, null, this );
scan.run();
return count;
}

@Override
public boolean visit( NodeLabelUpdate update ) throws IOException
{
writer.write( update );
count++;
return false;
}
}
}
Expand Up @@ -28,8 +28,6 @@
import org.neo4j.kernel.impl.spi.KernelContext;
import org.neo4j.kernel.lifecycle.Lifecycle;

import static org.neo4j.kernel.impl.api.scan.LabelScanStoreProvider.fullStoreLabelUpdateStream;

public class NativeLabelScanStoreExtension extends
KernelExtensionFactory<NativeLabelScanStoreExtension.Dependencies>
{
Expand All @@ -49,6 +47,6 @@ public NativeLabelScanStoreExtension()
public Lifecycle newInstance( KernelContext context, Dependencies dependencies ) throws Throwable
{
return new LabelScanStoreProvider( new NativeLabelScanStore( dependencies.pageCache(),
context.storeDir(), Long.SIZE, 0, fullStoreLabelUpdateStream( dependencies.indexStoreView() ) ), 0 );
context.storeDir(), Long.SIZE, 0, new FullLabelStream( dependencies.indexStoreView() ) ), 0 );
}
}
Expand Up @@ -32,8 +32,8 @@
import org.neo4j.kernel.api.labelscan.AllEntriesLabelScanReader;
import org.neo4j.kernel.api.labelscan.LabelScanStore;
import org.neo4j.kernel.api.labelscan.LabelScanWriter;
import org.neo4j.kernel.impl.api.scan.FullStoreChangeStream;
import org.neo4j.kernel.impl.api.scan.LabelScanStoreProvider;
import org.neo4j.kernel.impl.api.scan.LabelScanStoreProvider.FullStoreChangeStream;
import org.neo4j.kernel.impl.store.UnderlyingStorageException;
import org.neo4j.storageengine.api.schema.LabelScanReader;

Expand Down
Expand Up @@ -51,7 +51,7 @@
import org.neo4j.kernel.api.labelscan.LabelScanWriter;
import org.neo4j.kernel.api.labelscan.NodeLabelRange;
import org.neo4j.kernel.api.labelscan.NodeLabelUpdate;
import org.neo4j.kernel.impl.api.scan.LabelScanStoreProvider.FullStoreChangeStream;
import org.neo4j.kernel.impl.api.scan.FullStoreChangeStream;
import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.storageengine.api.schema.LabelScanReader;
import org.neo4j.test.rule.RandomRule;
Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.api.labelscan.LabelScanWriter;
import org.neo4j.kernel.api.labelscan.NodeLabelUpdate;
import org.neo4j.kernel.impl.api.scan.FullStoreChangeStream;
import org.neo4j.kernel.lifecycle.LifeRule;
import org.neo4j.storageengine.api.schema.LabelScanReader;
import org.neo4j.test.rule.PageCacheRule;
Expand All @@ -43,7 +44,6 @@

import static org.neo4j.collection.primitive.PrimitiveLongCollections.asArray;
import static org.neo4j.kernel.api.labelscan.NodeLabelUpdate.labelChanges;
import static org.neo4j.kernel.impl.api.scan.LabelScanStoreProvider.EMPTY;

public class NativeLabelScanStoreTest
{
Expand All @@ -67,7 +67,7 @@ public void before()
8 << random.nextInt( 4 ),
// a bit of random pageSize
Math.min( pageCache.pageSize(), 256 << random.nextInt( 5 ) ),
EMPTY ) );
FullStoreChangeStream.EMPTY ) );
}

@Test
Expand Down
Expand Up @@ -31,10 +31,9 @@
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.factory.OperationalMode;
import org.neo4j.kernel.impl.api.index.IndexStoreView;

import org.neo4j.kernel.impl.api.scan.FullLabelStream;
import org.neo4j.logging.LogProvider;

import static org.neo4j.kernel.impl.api.scan.LabelScanStoreProvider.fullStoreLabelUpdateStream;
import static org.neo4j.kernel.impl.api.scan.LabelScanStoreProvider.getStoreDirectory;

/**
Expand All @@ -47,7 +46,7 @@
public class LuceneLabelScanStoreBuilder
{
private final File storeDir;
private Supplier<IndexStoreView> storeViewSupplier;
private final Supplier<IndexStoreView> storeViewSupplier;
private final FileSystemAbstraction fileSystem;
private final Config config;
private final OperationalMode operationalMode;
Expand Down Expand Up @@ -83,7 +82,7 @@ public LabelScanStore build()
.withConfig( config )
.withOperationalMode( operationalMode )
.build();
labelScanStore = new LuceneLabelScanStore( index, fullStoreLabelUpdateStream( storeViewSupplier ),
labelScanStore = new LuceneLabelScanStore( index, new FullLabelStream( storeViewSupplier ),
logProvider, LabelScanStore.Monitor.EMPTY );

try
Expand Down
Expand Up @@ -29,8 +29,8 @@
import org.neo4j.kernel.api.labelscan.AllEntriesLabelScanReader;
import org.neo4j.kernel.api.labelscan.LabelScanStore;
import org.neo4j.kernel.api.labelscan.LabelScanWriter;
import org.neo4j.kernel.impl.api.scan.FullStoreChangeStream;
import org.neo4j.kernel.impl.api.scan.LabelScanStoreProvider;
import org.neo4j.kernel.impl.api.scan.LabelScanStoreProvider.FullStoreChangeStream;
import org.neo4j.kernel.impl.store.UnderlyingStorageException;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;
Expand Down
Expand Up @@ -27,6 +27,7 @@
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.extension.KernelExtensionFactory;
import org.neo4j.kernel.impl.api.index.IndexStoreView;
import org.neo4j.kernel.impl.api.scan.FullLabelStream;
import org.neo4j.kernel.impl.api.scan.LabelScanStoreProvider;
import org.neo4j.kernel.impl.factory.GraphDatabaseFacadeFactory;
import org.neo4j.kernel.impl.logging.LogService;
Expand All @@ -35,7 +36,6 @@

import static org.neo4j.kernel.api.impl.index.LuceneKernelExtensions.directoryFactory;
import static org.neo4j.kernel.api.labelscan.LabelScanStore.Monitor;
import static org.neo4j.kernel.impl.api.scan.LabelScanStoreProvider.fullStoreLabelUpdateStream;

@Service.Implementation(KernelExtensionFactory.class)
public class LuceneLabelScanStoreExtension extends KernelExtensionFactory<LuceneLabelScanStoreExtension.Dependencies>
Expand Down Expand Up @@ -78,7 +78,7 @@ public LabelScanStoreProvider newInstance( KernelContext context, Dependencies d

LabelScanIndex index = getLuceneIndex( context, directoryFactory );
LuceneLabelScanStore scanStore = new LuceneLabelScanStore( index,
fullStoreLabelUpdateStream( dependencies.indexStoreView() ),
new FullLabelStream( dependencies.indexStoreView() ),
dependencies.getLogService().getInternalLogProvider(), monitor );

return new LabelScanStoreProvider( scanStore, priority );
Expand Down

0 comments on commit 331d4c5

Please sign in to comment.