Skip to content

Commit

Permalink
Move state logic into SpatialKnownIndex
Browse files Browse the repository at this point in the history
  • Loading branch information
Lojjs authored and OliviaYtterbrink committed Feb 14, 2018
1 parent e818de7 commit 6854b86
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 103 deletions.
Expand Up @@ -117,35 +117,60 @@ public SpatialKnownIndex( IndexDirectoryStructure directoryStructure, Coordinate
state = State.NONE;
}

public void initialize( IndexDescriptor descriptor, IndexSamplingConfig samplingConfig )
public void initIfNeeded( IndexDescriptor descriptor, IndexSamplingConfig samplingConfig )
{
assert state == State.NONE;
layout = layout( descriptor );
treeKey = layout.newKey();
treeValue = layout.newValue();
schemaIndex = new NativeSchemaIndex<>( pageCache, fs, indexFile, layout, monitor, descriptor, indexId );
if ( uniqueSampler( descriptor ) )
if ( state == State.NONE )
{
uniqueSampler = new UniqueIndexSampler();
initialize( descriptor, samplingConfig );
}
else
}

public void createIfNeeded () throws IOException
{
if ( state == State.INIT )
{
generalSampler = new DefaultNonUniqueIndexSampler( samplingConfig.sampleSizeLimit() );
// First add to sub-index, make sure to create
create();
}
state = State.INIT;
}

public void online() throws IOException
public void takeOnline( IndexDescriptor descriptor, IndexSamplingConfig samplingConfig ) throws IOException
{
assert state == State.POPULATED || state == State.INIT;
singleUpdater = new NativeSchemaIndexUpdater<>( treeKey, treeValue );
schemaIndex.instantiateTree( recoveryCleanupWorkCollector, NO_HEADER_WRITER );
state = State.ONLINE;
initIfNeeded( descriptor, samplingConfig );
if ( state == State.INIT || state == State.POPULATED )
{
online();
}
if ( state != State.ONLINE )
{
throw new IllegalStateException( "" );
}
}

public State getState()
public IndexUpdater updateWithCreate(IndexDescriptor descriptor, IndexSamplingConfig samplingConfig, boolean populating ) throws IOException
{
return state;
if ( populating )
{
if ( state == State.NONE )
{
// sub-index didn't exist, create in populating mode
initialize( descriptor, samplingConfig );
create();
}
return newPopulatingUpdater();
}
else
{
if ( state == State.NONE )
{
// sub-index didn't exist, create and make it online
initialize( descriptor, samplingConfig );
create();
close( true );
online();
}
return newUpdater();
}
}

public synchronized void drop() throws IOException
Expand All @@ -163,17 +188,6 @@ public synchronized void drop() throws IOException
}
}

public synchronized void create() throws IOException
{
assert state == State.INIT;
// extra check here???
schemaIndex.gbpTreeFileUtil.deleteFileIfPresent( indexFile ); /// TODO <- warning
schemaIndex.instantiateTree( RecoveryCleanupWorkCollector.IMMEDIATE, new NativeSchemaIndexHeaderWriter( BYTE_POPULATING ) );
instantiateWriter();
workSync = new WorkSync<>( new IndexUpdateApply<>( treeKey, treeValue, singleTreeWriter, new ConflictDetectingValueMerger<>() ) );
state = State.POPULATING;
}

public void close() throws IOException
{
schemaIndex.closeTree();
Expand Down Expand Up @@ -343,6 +357,72 @@ public IndexReader newReader( IndexSamplingConfig samplingConfig, IndexDescripto
return new SpatialSchemaIndexReader<>( schemaIndex.tree, layout, samplingConfig, descriptor );
}

public boolean indexExists()
{
return fs.fileExists( indexFile );
}

public String readPopulationFailure( IndexDescriptor descriptor ) throws IOException
{
NativeSchemaIndexHeaderReader headerReader = new NativeSchemaIndexHeaderReader();
GBPTree.readHeader( pageCache, indexFile, layout( descriptor ), headerReader );
return headerReader.failureMessage;
}

public InternalIndexState readState( IndexDescriptor descriptor ) throws IOException
{
NativeSchemaIndexHeaderReader headerReader = new NativeSchemaIndexHeaderReader();
GBPTree.readHeader( pageCache, indexFile, layout( descriptor ), headerReader );
switch ( headerReader.state )
{
case BYTE_FAILED:
return InternalIndexState.FAILED;
case BYTE_ONLINE:
return InternalIndexState.ONLINE;
case BYTE_POPULATING:
return InternalIndexState.POPULATING;
default:
throw new IllegalStateException( "Unexpected initial state byte value " + headerReader.state );
}
}

private synchronized void create() throws IOException
{
assert state == State.INIT;
// extra check here???
schemaIndex.gbpTreeFileUtil.deleteFileIfPresent( indexFile ); /// TODO <- warning
schemaIndex.instantiateTree( RecoveryCleanupWorkCollector.IMMEDIATE, new NativeSchemaIndexHeaderWriter( BYTE_POPULATING ) );
instantiateWriter();
workSync = new WorkSync<>( new IndexUpdateApply<>( treeKey, treeValue, singleTreeWriter, new ConflictDetectingValueMerger<>() ) );
state = State.POPULATING;
}

private void initialize( IndexDescriptor descriptor, IndexSamplingConfig samplingConfig )
{
assert state == State.NONE;
layout = layout( descriptor );
treeKey = layout.newKey();
treeValue = layout.newValue();
schemaIndex = new NativeSchemaIndex<>( pageCache, fs, indexFile, layout, monitor, descriptor, indexId );
if ( uniqueSampler( descriptor ) )
{
uniqueSampler = new UniqueIndexSampler();
}
else
{
generalSampler = new DefaultNonUniqueIndexSampler( samplingConfig.sampleSizeLimit() );
}
state = State.INIT;
}

private void online() throws IOException
{
assert state == State.POPULATED || state == State.INIT;
singleUpdater = new NativeSchemaIndexUpdater<>( treeKey, treeValue );
schemaIndex.instantiateTree( recoveryCleanupWorkCollector, NO_HEADER_WRITER );
state = State.ONLINE;
}

private void instantiateWriter() throws IOException
{
assert singleTreeWriter == null;
Expand Down Expand Up @@ -434,35 +514,6 @@ private static String indexFileName( long indexId )
return "index-" + indexId;
}

public boolean indexExists()
{
return fs.fileExists( indexFile );
}

public String readPopulationFailure( IndexDescriptor descriptor ) throws IOException
{
NativeSchemaIndexHeaderReader headerReader = new NativeSchemaIndexHeaderReader();
GBPTree.readHeader( pageCache, indexFile, layout( descriptor ), headerReader );
return headerReader.failureMessage;
}

public InternalIndexState readState( IndexDescriptor descriptor ) throws IOException
{
NativeSchemaIndexHeaderReader headerReader = new NativeSchemaIndexHeaderReader();
GBPTree.readHeader( pageCache, indexFile, layout( descriptor ), headerReader );
switch ( headerReader.state )
{
case BYTE_FAILED:
return InternalIndexState.FAILED;
case BYTE_ONLINE:
return InternalIndexState.ONLINE;
case BYTE_POPULATING:
return InternalIndexState.POPULATING;
default:
throw new IllegalStateException( "Unexpected initial state byte value " + headerReader.state );
}
}

private SpatialLayout layout( IndexDescriptor descriptor )
{
SpatialLayout layout;
Expand Down Expand Up @@ -490,7 +541,7 @@ private boolean uniqueSampler( IndexDescriptor descriptor )
}
}

public enum State
private enum State
{
NONE,
INIT,
Expand Down
Expand Up @@ -62,18 +62,7 @@ class SpatialFusionIndexAccessor implements IndexAccessor
this.indexFactory = indexFactory;
for ( SpatialKnownIndex index : indexMap.values() )
{
if ( index.getState() == SpatialKnownIndex.State.NONE )
{
index.initialize( descriptor, samplingConfig );
}
if ( index.getState() == SpatialKnownIndex.State.INIT || index.getState() == SpatialKnownIndex.State.POPULATED )
{
index.online();
}
if ( index.getState() != SpatialKnownIndex.State.ONLINE )
{
throw new IllegalStateException( "" );
}
index.takeOnline(descriptor, samplingConfig);
}
}

Expand Down
Expand Up @@ -88,11 +88,7 @@ public void add( Collection<? extends IndexEntryUpdate<?>> updates ) throws Inde
for ( CoordinateReferenceSystem crs : batchMap.keySet() )
{
SpatialKnownIndex index = getOrCreateInitializedIndex( crs );
if ( index.getState() == SpatialKnownIndex.State.INIT )
{
// First add to sub-index, make sure to create
index.create();
}
index.createIfNeeded();
index.add( batchMap.get( crs ) );
}
}
Expand Down Expand Up @@ -142,10 +138,7 @@ public void includeSample( IndexEntryUpdate<?> update )
private SpatialKnownIndex getOrCreateInitializedIndex( CoordinateReferenceSystem crs )
{
SpatialKnownIndex index = indexFactory.selectAndCreate( indexMap, indexId, crs );
if ( index.getState() == SpatialKnownIndex.State.NONE )
{
index.initialize( descriptor, samplingConfig );
}
index.initIfNeeded( descriptor, samplingConfig );
return index;
}

Expand Down
Expand Up @@ -113,28 +113,8 @@ private IndexUpdater selectUpdater( Value... values ) throws IOException
return updater;
}
SpatialKnownIndex index = indexFactory.selectAndCreate( indexMap, indexId, crs );
if ( populating )
{
if ( index.getState() == SpatialKnownIndex.State.NONE )
{
// sub-index didn't exist, create in populating mode
index.initialize( descriptor, samplingConfig );
index.create();
}
return remember( crs, index.newPopulatingUpdater() );
}
else
{
if ( index.getState() == SpatialKnownIndex.State.NONE )
{
// sub-index didn't exist, create and make it online
index.initialize( descriptor, samplingConfig );
index.create();
index.close( true );
index.online();
}
return remember( crs, index.newUpdater() );
}
IndexUpdater indexUpdater = index.updateWithCreate( descriptor, samplingConfig, populating );
return remember( crs, indexUpdater );
}

private IndexUpdater remember( CoordinateReferenceSystem crs, IndexUpdater indexUpdater )
Expand Down
Expand Up @@ -73,7 +73,6 @@ public void setup() throws Exception
for ( CoordinateReferenceSystem crs : asList( WGS84, Cartesian ) )
{
indexMap.put( crs, mock( SpatialKnownIndex.class ) );
when( indexMap.get( crs ).getState() ).thenReturn( SpatialKnownIndex.State.ONLINE );
}

fusionIndexAccessor = new SpatialFusionIndexAccessor( indexMap, 0, mock( IndexDescriptor.class ), null, indexFactory );
Expand Down

0 comments on commit 6854b86

Please sign in to comment.