Skip to content

Commit

Permalink
Truncate the counts store file before writing new values
Browse files Browse the repository at this point in the history
This should fix a problem during counts store rotation when the counts
store file is partially written but it cannot be recognized as
corrupted.  Hence it will be loaded on the next recovery and lead to
inconsistencies.
  • Loading branch information
davidegrohmann committed Aug 10, 2015
1 parent 3a6902f commit 12494da
Show file tree
Hide file tree
Showing 14 changed files with 246 additions and 68 deletions.
Expand Up @@ -195,4 +195,10 @@ public synchronized <K extends ThirdPartyFileSystem> K getOrCreateThirdPartyFile
} }
return clazz.cast( fileSystem ); return clazz.cast( fileSystem );
} }

@Override
public void truncate( File path, long size ) throws IOException
{
FileUtils.truncateFile( path, size );
}
} }
Expand Up @@ -33,47 +33,49 @@
public interface FileSystemAbstraction public interface FileSystemAbstraction
{ {
StoreChannel open( File fileName, String mode ) throws IOException; StoreChannel open( File fileName, String mode ) throws IOException;

OutputStream openAsOutputStream( File fileName, boolean append ) throws IOException; OutputStream openAsOutputStream( File fileName, boolean append ) throws IOException;

InputStream openAsInputStream( File fileName ) throws IOException; InputStream openAsInputStream( File fileName ) throws IOException;

Reader openAsReader( File fileName, String encoding ) throws IOException; Reader openAsReader( File fileName, String encoding ) throws IOException;

Writer openAsWriter( File fileName, String encoding, boolean append ) throws IOException; Writer openAsWriter( File fileName, String encoding, boolean append ) throws IOException;

FileLock tryLock( File fileName, StoreChannel channel ) throws IOException; FileLock tryLock( File fileName, StoreChannel channel ) throws IOException;

StoreChannel create( File fileName ) throws IOException; StoreChannel create( File fileName ) throws IOException;

boolean fileExists( File fileName ); boolean fileExists( File fileName );

boolean mkdir( File fileName ); boolean mkdir( File fileName );

void mkdirs( File fileName ) throws IOException; void mkdirs( File fileName ) throws IOException;

long getFileSize( File fileName ); long getFileSize( File fileName );


boolean deleteFile( File fileName ); boolean deleteFile( File fileName );

void deleteRecursively( File directory ) throws IOException; void deleteRecursively( File directory ) throws IOException;

boolean renameFile( File from, File to ) throws IOException; boolean renameFile( File from, File to ) throws IOException;

File[] listFiles( File directory ); File[] listFiles( File directory );


File[] listFiles( File directory, FilenameFilter filter ); File[] listFiles( File directory, FilenameFilter filter );


boolean isDirectory( File file ); boolean isDirectory( File file );

void moveToDirectory( File file, File toDirectory ) throws IOException; void moveToDirectory( File file, File toDirectory ) throws IOException;

void copyFile( File from, File to ) throws IOException; void copyFile( File from, File to ) throws IOException;

void copyRecursively( File fromDirectory, File toDirectory ) throws IOException; void copyRecursively( File fromDirectory, File toDirectory ) throws IOException;


<K extends ThirdPartyFileSystem> K getOrCreateThirdPartyFileSystem( Class<K> clazz, Function<Class<K>, K> creator ); <K extends ThirdPartyFileSystem> K getOrCreateThirdPartyFileSystem( Class<K> clazz, Function<Class<K>, K> creator );


void truncate( File path, long size ) throws IOException;

interface ThirdPartyFileSystem interface ThirdPartyFileSystem
{ {
void close(); void close();
Expand Down
Expand Up @@ -203,6 +203,14 @@ public synchronized <K extends ThirdPartyFileSystem> K getOrCreateThirdPartyFile
return (K) fileSystem; return (K) fileSystem;
} }


@Override
public void truncate( File path, long size ) throws IOException
{
adversary.injectFailure( FileNotFoundException.class, IOException.class, IllegalArgumentException.class,
SecurityException.class, NullPointerException.class );
delegate.truncate( path, size );
}

private <K extends ThirdPartyFileSystem> ThirdPartyFileSystem adversarialProxy( private <K extends ThirdPartyFileSystem> ThirdPartyFileSystem adversarialProxy(
final ThirdPartyFileSystem fileSystem, final ThirdPartyFileSystem fileSystem,
Class<K> clazz ) Class<K> clazz )
Expand Down
Expand Up @@ -72,6 +72,12 @@ public <K extends FileSystemAbstraction.ThirdPartyFileSystem> K getOrCreateThird
return delegate.getOrCreateThirdPartyFileSystem( clazz, creator ); return delegate.getOrCreateThirdPartyFileSystem( clazz, creator );
} }


@Override
public void truncate( File path, long size ) throws IOException
{
delegate.truncate( path, size );
}

@Override @Override
public boolean renameFile( File from, File to ) throws IOException public boolean renameFile( File from, File to ) throws IOException
{ {
Expand Down
Expand Up @@ -569,6 +569,17 @@ public synchronized <K extends ThirdPartyFileSystem> K getOrCreateThirdPartyFile
return clazz.cast( fileSystem ); return clazz.cast( fileSystem );
} }


@Override
public void truncate( File file, long size ) throws IOException
{
EphemeralFileData data = files.get( file );
if ( data == null )
{
throw new FileNotFoundException( "File " + file + " not found" );
}
data.truncate( size );
}

@SuppressWarnings( "serial" ) @SuppressWarnings( "serial" )
private static class FileStillOpenException extends Exception private static class FileStillOpenException extends Exception
{ {
Expand Down
Expand Up @@ -51,7 +51,7 @@ public StoreChannel open( File fileName, String mode ) throws IOException
{ {
return new LimitedFileChannel( inner.open( fileName, mode ), this ); return new LimitedFileChannel( inner.open( fileName, mode ), this );
} }

@Override @Override
public OutputStream openAsOutputStream( File fileName, boolean append ) throws IOException public OutputStream openAsOutputStream( File fileName, boolean append ) throws IOException
{ {
Expand All @@ -69,7 +69,7 @@ public Reader openAsReader( File fileName, String encoding ) throws IOException
{ {
return new InputStreamReader( openAsInputStream( fileName ), encoding ); return new InputStreamReader( openAsInputStream( fileName ), encoding );
} }

@Override @Override
public Writer openAsWriter( File fileName, String encoding, boolean append ) throws IOException public Writer openAsWriter( File fileName, String encoding, boolean append ) throws IOException
{ {
Expand Down Expand Up @@ -106,7 +106,7 @@ public boolean deleteFile( File fileName )
{ {
return inner.deleteFile( fileName ); return inner.deleteFile( fileName );
} }

@Override @Override
public void deleteRecursively( File directory ) throws IOException public void deleteRecursively( File directory ) throws IOException
{ {
Expand All @@ -118,7 +118,7 @@ public boolean mkdir( File fileName )
{ {
return inner.mkdir( fileName ); return inner.mkdir( fileName );
} }

@Override @Override
public void mkdirs( File fileName ) throws IOException public void mkdirs( File fileName ) throws IOException
{ {
Expand Down Expand Up @@ -157,25 +157,25 @@ public File[] listFiles( File directory, FilenameFilter filter )
{ {
return inner.listFiles( directory, filter ); return inner.listFiles( directory, filter );
} }

@Override @Override
public boolean isDirectory( File file ) public boolean isDirectory( File file )
{ {
return inner.isDirectory( file ); return inner.isDirectory( file );
} }

@Override @Override
public void moveToDirectory( File file, File toDirectory ) throws IOException public void moveToDirectory( File file, File toDirectory ) throws IOException
{ {
inner.moveToDirectory( file, toDirectory ); inner.moveToDirectory( file, toDirectory );
} }

@Override @Override
public void copyFile( File from, File to ) throws IOException public void copyFile( File from, File to ) throws IOException
{ {
inner.copyFile( from, to ); inner.copyFile( from, to );
} }

@Override @Override
public void copyRecursively( File fromDirectory, File toDirectory ) throws IOException public void copyRecursively( File fromDirectory, File toDirectory ) throws IOException
{ {
Expand All @@ -188,4 +188,10 @@ public <K extends ThirdPartyFileSystem> K getOrCreateThirdPartyFileSystem(
{ {
return inner.getOrCreateThirdPartyFileSystem( clazz, creator ); return inner.getOrCreateThirdPartyFileSystem( clazz, creator );
} }

@Override
public void truncate( File path, long size ) throws IOException
{
inner.truncate( path, size );
}
} }
Expand Up @@ -146,6 +146,12 @@ private KeyValueStoreFile create(
"Invalid sizes: keySize=%d, valueSize=%d, format maxSize=%d", "Invalid sizes: keySize=%d, valueSize=%d, format maxSize=%d",
keySize, valueSize, maxSize ) ); keySize, valueSize, maxSize ) );
} }

if ( fs.fileExists( path ) )
{
fs.truncate( path, 0 );
}

BigEndianByteArrayBuffer key = new BigEndianByteArrayBuffer( new byte[keySize] ); BigEndianByteArrayBuffer key = new BigEndianByteArrayBuffer( new byte[keySize] );
BigEndianByteArrayBuffer value = new BigEndianByteArrayBuffer( new byte[valueSize] ); BigEndianByteArrayBuffer value = new BigEndianByteArrayBuffer( new byte[valueSize] );
writeFormatSpecifier( value ); writeFormatSpecifier( value );
Expand Down
Expand Up @@ -265,11 +265,7 @@ static abstract class Writer


abstract void close() throws IOException; abstract void close() throws IOException;


void writeTrailer( String trailer ) throws IOException abstract void writeTrailer( String trailer ) throws IOException;
{
write( UTF8.encode( trailer ) );
}

static Writer create( FileSystemAbstraction fs, PageCache pages, File path, int pageSize ) throws IOException static Writer create( FileSystemAbstraction fs, PageCache pages, File path, int pageSize ) throws IOException
{ {
if ( pages == null ) if ( pages == null )
Expand Down Expand Up @@ -302,6 +298,12 @@ void write( byte[] data ) throws IOException
out.write( data ); out.write( data );
} }


@Override
void writeTrailer( String trailer ) throws IOException
{
write( UTF8.encode( trailer ) );
}

@Override @Override
KeyValueStoreFile open( Metadata metadata, int keySize, int valueSize ) throws IOException KeyValueStoreFile open( Metadata metadata, int keySize, int valueSize ) throws IOException
{ {
Expand Down
Expand Up @@ -186,4 +186,9 @@ public <K extends ThirdPartyFileSystem> K getOrCreateThirdPartyFileSystem( Class
return inner.getOrCreateThirdPartyFileSystem( clazz, creator ); return inner.getOrCreateThirdPartyFileSystem( clazz, creator );
} }


@Override
public void truncate( File path, long size ) throws IOException
{
inner.truncate( path, size );
}
} }
Expand Up @@ -44,7 +44,7 @@ public void run()
{ {
} }
}; };

public static Runnable callCounter( final AtomicInteger count ) public static Runnable callCounter( final AtomicInteger count )
{ {
return new Runnable() return new Runnable()
Expand All @@ -56,7 +56,7 @@ public void run()
} }
}; };
} }

private final boolean fileExists; private final boolean fileExists;
private final IOException cannotCreateStoreDir; private final IOException cannotCreateStoreDir;
private final IOException cannotOpenLockFile; private final IOException cannotOpenLockFile;
Expand Down Expand Up @@ -86,7 +86,7 @@ public StoreChannel open( File fileName, String mode ) throws IOException


return emptyFileChannel; return emptyFileChannel;
} }

private final StoreChannel emptyFileChannel = new AbstractStoreChannel() private final StoreChannel emptyFileChannel = new AbstractStoreChannel()
{ {
@Override @Override
Expand Down Expand Up @@ -153,7 +153,7 @@ public void close() throws IOException
{ {
onClose.run(); onClose.run();
} }

private IOException unsupported() private IOException unsupported()
{ {
return new IOException( "Unsupported" ); return new IOException( "Unsupported" );
Expand All @@ -177,7 +177,7 @@ public Reader openAsReader( File fileName, String encoding ) throws IOException
{ {
throw new UnsupportedOperationException( "TODO" ); throw new UnsupportedOperationException( "TODO" );
} }

@Override @Override
public Writer openAsWriter( File fileName, String encoding, boolean append ) throws IOException public Writer openAsWriter( File fileName, String encoding, boolean append ) throws IOException
{ {
Expand All @@ -191,7 +191,7 @@ public FileLock tryLock( File fileName, StoreChannel channel ) throws IOExceptio
{ {
throw new IOException( "Unable to create lock file " + fileName ); throw new IOException( "Unable to create lock file " + fileName );
} }

return SYMBOLIC_FILE_LOCK; return SYMBOLIC_FILE_LOCK;
} }


Expand Down Expand Up @@ -262,7 +262,7 @@ public File[] listFiles( File directory, FilenameFilter filter )
{ {
return new File[0]; return new File[0];
} }

@Override @Override
public void moveToDirectory( File file, File toDirectory ) throws IOException public void moveToDirectory( File file, File toDirectory ) throws IOException
{ {
Expand All @@ -274,7 +274,7 @@ public void copyFile( File file, File toDirectory ) throws IOException
{ {
throw new UnsupportedOperationException( "TODO" ); throw new UnsupportedOperationException( "TODO" );
} }

@Override @Override
public void copyRecursively( File fromDirectory, File toDirectory ) throws IOException public void copyRecursively( File fromDirectory, File toDirectory ) throws IOException
{ {
Expand All @@ -288,6 +288,12 @@ public <K extends ThirdPartyFileSystem> K getOrCreateThirdPartyFileSystem( Class
throw new UnsupportedOperationException( "not implemented" ); throw new UnsupportedOperationException( "not implemented" );
} }


@Override
public void truncate( File path, long size ) throws IOException
{
throw new UnsupportedOperationException( "TODO" );
}

private static final FileLock SYMBOLIC_FILE_LOCK = new FileLock() private static final FileLock SYMBOLIC_FILE_LOCK = new FileLock()
{ {
@Override @Override
Expand Down

0 comments on commit 12494da

Please sign in to comment.