Skip to content

Commit

Permalink
ISPN-9110 StoreMigrator SingleFileStore support added
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanemerson authored and tristantarrant committed May 9, 2018
1 parent 30ba4c0 commit 117ee12
Show file tree
Hide file tree
Showing 16 changed files with 258 additions and 49 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -41,6 +41,7 @@ ObjectStore
*.idx
# Persisted metadata
*.dat
!tools/src/test/resources/singlefilestore/reader-test.dat
# OS X generated file
.DS_Store
# Checkstlye in Eclipse
Expand Down
Expand Up @@ -4,6 +4,7 @@

import java.util.Properties;

import org.infinispan.tools.store.migrator.file.SingleFileStoreReader;
import org.infinispan.tools.store.migrator.jdbc.JdbcStoreReader;
import org.infinispan.tools.store.migrator.rocksdb.RocksDBReader;

Expand All @@ -19,6 +20,8 @@ static StoreIterator get(Properties properties) {
case LEVELDB:
case ROCKSDB:
return new RocksDBReader(props);
case SINGLE_FILE_STORE:
return new SingleFileStoreReader(props);
}
return null;
}
Expand Down
Expand Up @@ -9,5 +9,6 @@ public enum StoreType {
JDBC_MIXED,
JDBC_STRING,
LEVELDB,
ROCKSDB
ROCKSDB,
SINGLE_FILE_STORE
}
Expand Up @@ -11,6 +11,7 @@
import org.infinispan.commons.CacheConfigurationException;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.cache.PersistenceConfigurationBuilder;
import org.infinispan.configuration.cache.SingleFileStoreConfigurationBuilder;
import org.infinispan.configuration.cache.StoreConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfiguration;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
Expand Down Expand Up @@ -71,6 +72,9 @@ private static StoreConfigurationBuilder getInitializedStoreBuilder(StorePropert
if (compressionType != null)
builder.compressionType(CompressionType.valueOf(compressionType.toUpperCase()));
return builder;
case SINGLE_FILE_STORE:
props.required(LOCATION);
return new SingleFileStoreConfigurationBuilder(persistenceBuilder).location(props.get(LOCATION));
default:
throw new CacheConfigurationException(String.format("Unknown store type '%s'", storeType));
}
Expand Down
@@ -0,0 +1,125 @@
package org.infinispan.tools.store.migrator.file;

import static org.infinispan.tools.store.migrator.Element.CACHE_NAME;
import static org.infinispan.tools.store.migrator.Element.LOCATION;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Iterator;
import java.util.NoSuchElementException;

import org.infinispan.commons.CacheException;
import org.infinispan.commons.io.ByteBufferImpl;
import org.infinispan.commons.marshall.StreamingMarshaller;
import org.infinispan.marshall.core.MarshalledEntry;
import org.infinispan.marshall.core.MarshalledEntryImpl;
import org.infinispan.tools.store.migrator.Element;
import org.infinispan.tools.store.migrator.StoreIterator;
import org.infinispan.tools.store.migrator.StoreProperties;
import org.infinispan.tools.store.migrator.marshaller.SerializationConfigUtil;

public class SingleFileStoreReader implements StoreIterator {

private final FileChannel channel;
private final StreamingMarshaller marshaller;

public SingleFileStoreReader(StoreProperties props) {
props.required(Element.LOCATION);
String location = props.get(LOCATION) + props.get(CACHE_NAME) + ".dat";
File file = new File(location);
if (!file.exists() || file.isDirectory())
throw new CacheException(String.format("Unable to read SingleFileStore at '%s'", location));

try {
channel = new RandomAccessFile(file, "rw").getChannel();
} catch (FileNotFoundException e) {
throw new CacheException(e);
}
this.marshaller = SerializationConfigUtil.getMarshaller(props);
}

@Override
public void close() throws Exception {
channel.close();
}

@Override
public Iterator<MarshalledEntry> iterator() {
return new SingleFileIterator();
}

class SingleFileIterator implements Iterator<MarshalledEntry> {

// CONSTANTS taken from the SingleFileStore impl we do not expose and reference
// these variables as if the current impl changes then it will break the iterator
private static final int KEY_POS = 4 + 4 + 4 + 4 + 8;
int filePos = 4;

@Override
public boolean hasNext() {
// return if end of file is reached
ByteBuffer buf = readFileEntry();
return buf.remaining() <= 0;
}

@Override
public MarshalledEntry next() {
for (;;) {
// read next entry using same logic as SingleFileStore#rebuildIndex
ByteBuffer buf = readFileEntry();
if (buf.remaining() > 0)
throw new NoSuchElementException();
buf.flip();
// initialize FileEntry from buffer
int entrySize = buf.getInt();
int keyLen = buf.getInt();
int dataLen = buf.getInt();
int metadataLen = buf.getInt();

// get expiryTime but ignore
buf.getLong();

// sanity check
if (entrySize < KEY_POS + keyLen + dataLen + metadataLen)
throw new CacheException(String.format("Failed to read entries from file. Error at offset %d", filePos));

if (keyLen > 0) {
try {
// load the key from file
if (buf.capacity() < keyLen)
buf = ByteBuffer.allocate(keyLen);

buf.clear().limit(keyLen);
byte[] data = new byte[keyLen + dataLen + metadataLen];
channel.read(ByteBuffer.wrap(data), filePos + KEY_POS);
filePos += entrySize;

org.infinispan.commons.io.ByteBuffer keyBb = new ByteBufferImpl(data, 0, keyLen);
org.infinispan.commons.io.ByteBuffer valueBb = new ByteBufferImpl(data, keyLen, dataLen);
return new MarshalledEntryImpl<>(keyBb, valueBb, (org.infinispan.commons.io.ByteBuffer) null, marshaller);
} catch (IOException e) {
throw new CacheException(String.format("Unable to read file entry at offset %d", filePos), e);
}
} else {
filePos += entrySize;
}
}
}

ByteBuffer readFileEntry() {
final ByteBuffer buf = ByteBuffer.allocate(KEY_POS);
// read FileEntry fields from file (size, keyLen etc.)
buf.clear().limit(KEY_POS);
try {
channel.read(buf, filePos);
} catch (IOException e) {
throw new CacheException(e);
}
return buf;
}
}
}
6 changes: 6 additions & 0 deletions tools/src/main/resources/migrator.properties
Expand Up @@ -33,6 +33,9 @@ source.key_to_string_mapper=org.infinispan.persistence.keymappers.DefaultTwoWayK
#source.location=source/Infinispan-LevelDBStore
#source.compression=SNAPPY

#source.type=SINGLE_FILE_STORE
#source.location=source/sfs

target.type=STRING
target.cache_name=target
target.dialect=POSTGRES
Expand All @@ -55,3 +58,6 @@ target.key_to_string_mapper=org.infinispan.persistence.keymappers.DefaultTwoWayK
#target.cache_name=target
#target.location=target/Infinispan-LevelDBStore
#target.compression=NONE

#target.type=SINGLE_FILE_STORE
#target.location=target/sfs
@@ -0,0 +1,64 @@
package org.infinispan.tools.store.migrator;

import static org.infinispan.tools.store.migrator.Element.CACHE_NAME;
import static org.infinispan.tools.store.migrator.Element.EXTERNALIZERS;
import static org.infinispan.tools.store.migrator.Element.MARSHALLER;
import static org.infinispan.tools.store.migrator.Element.SOURCE;
import static org.infinispan.tools.store.migrator.Element.TARGET;
import static org.infinispan.tools.store.migrator.Element.TYPE;
import static org.infinispan.tools.store.migrator.TestUtil.propKey;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertNotNull;

import java.util.Properties;

import org.infinispan.Cache;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.global.GlobalConfiguration;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.tools.store.migrator.marshaller.MarshallerType;
import org.testng.annotations.Test;

@Test(testName = "org.infinispan.tools.store.migrator.AbstractReaderTest", groups = "functional")
public abstract class AbstractReaderTest {

private static final String TEST_CACHE_NAME = "reader-test";

abstract public Configuration getTargetCacheConfig();

protected void configureStoreProperties(Properties properties, Element type) {
MarshallerType marshallerType = type == SOURCE ? MarshallerType.LEGACY : MarshallerType.CURRENT;
properties.put(propKey(type, CACHE_NAME), TEST_CACHE_NAME);
properties.put(propKey(type, MARSHALLER, TYPE), marshallerType.toString());
properties.put(propKey(type, MARSHALLER, EXTERNALIZERS), "256:" + TestUtil.TestObjectExternalizer.class.getName());
}

@Test
public void readerCompatibilityTest() throws Exception {
Properties properties = new Properties();
configureStoreProperties(properties, SOURCE);
configureStoreProperties(properties, TARGET);
// Read from the legacy LevelDB store and populate the new RocksDBStore using latest marshaller
new StoreMigrator(properties).run();

GlobalConfiguration globalConfig = new GlobalConfigurationBuilder()
.serialization().addAdvancedExternalizer(256, new TestUtil.TestObjectExternalizer())
.build();

Configuration config = getTargetCacheConfig();

// Create a new cache instance, with the required externalizers, to ensure that the new RocksDbStore can be
// loaded and contains all of the expected values.
EmbeddedCacheManager manager = new DefaultCacheManager(globalConfig, config);
Cache cache = manager.getCache(TEST_CACHE_NAME);
for (String key : TestUtil.TEST_MAP.keySet()) {
Object stored = cache.get(key);
assertNotNull(String.format("Key=%s", key), stored);
Object expected = TestUtil.TEST_MAP.get(key);
assertNotNull(String.format("Key=%s", key), stored);
assertEquals(expected, stored);
}
}
}
@@ -0,0 +1,38 @@
package org.infinispan.tools.store.migrator.file;

import static org.infinispan.tools.store.migrator.Element.LOCATION;
import static org.infinispan.tools.store.migrator.Element.SOURCE;
import static org.infinispan.tools.store.migrator.Element.TYPE;
import static org.infinispan.tools.store.migrator.TestUtil.propKey;

import java.util.Properties;

import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.cache.SingleFileStoreConfigurationBuilder;
import org.infinispan.tools.store.migrator.AbstractReaderTest;
import org.infinispan.tools.store.migrator.Element;
import org.infinispan.tools.store.migrator.StoreType;
import org.testng.annotations.Test;

@Test(testName = "tools.store.migrator.file.SingleFileStoreReaderTest", groups = "functional")
public class SingleFileStoreReaderTest extends AbstractReaderTest {

private static final String SOURCE_DIR = "target/test-classes/singlefilestore/";
private static final String TARGET_DIR = SOURCE_DIR + "/target-sfs/";

@Override
public Configuration getTargetCacheConfig() {
return new ConfigurationBuilder().persistence()
.addStore(SingleFileStoreConfigurationBuilder.class).location(TARGET_DIR)
.preload(true).ignoreModifications(true)
.build();
}

@Override
protected void configureStoreProperties(Properties properties, Element type) {
super.configureStoreProperties(properties, type);
properties.put(propKey(type, TYPE), StoreType.SINGLE_FILE_STORE.toString());
properties.put(propKey(type, LOCATION), type == SOURCE ? SOURCE_DIR : TARGET_DIR);
}
}
Expand Up @@ -60,9 +60,12 @@ public class ByteOutputGenerator {
public static void main(String[] args) throws Exception {
GlobalConfiguration globalConfig = new GlobalConfigurationBuilder()
.serialization().addAdvancedExternalizer(new TestObjectExternalizer()).build();
Configuration config = new ConfigurationBuilder().persistence()
.addStore(LevelDBStoreConfigurationBuilder.class)
.build();
PersistenceConfigurationBuilder pb = new ConfigurationBuilder().persistence();
pb.addStore(LevelDBStoreConfigurationBuilder.class);
pb.addStore(SingleFileStoreConfigurationBuilder.class);
Configuration config = pb.build();
EmbeddedCacheManager manager = new DefaultCacheManager(globalConfig, config);
ComponentRegistry registry = manager.getCache().getAdvancedCache().getComponentRegistry();
StreamingMarshaller marshaller = registry.getCacheMarshaller();
Expand Down
@@ -1,73 +1,37 @@
package org.infinispan.tools.store.migrator.rocksdb;

import static org.infinispan.tools.store.migrator.Element.CACHE_NAME;
import static org.infinispan.tools.store.migrator.Element.EXTERNALIZERS;
import static org.infinispan.tools.store.migrator.Element.LOCATION;
import static org.infinispan.tools.store.migrator.Element.MARSHALLER;
import static org.infinispan.tools.store.migrator.Element.SOURCE;
import static org.infinispan.tools.store.migrator.Element.TARGET;
import static org.infinispan.tools.store.migrator.Element.TYPE;
import static org.infinispan.tools.store.migrator.TestUtil.propKey;

import java.util.Properties;

import org.infinispan.Cache;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfiguration;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.persistence.rocksdb.configuration.RocksDBStoreConfigurationBuilder;
import org.infinispan.tools.store.migrator.AbstractReaderTest;
import org.infinispan.tools.store.migrator.Element;
import org.infinispan.tools.store.migrator.StoreMigrator;
import org.infinispan.tools.store.migrator.StoreType;
import org.infinispan.tools.store.migrator.TestUtil;
import org.infinispan.tools.store.migrator.marshaller.MarshallerType;
import org.testng.annotations.Test;

@Test(testName = "tools.store.migrator.RocksDBReaderTest", groups = "functional")
public class RocksDBReaderTest {
@Test(testName = "tools.store.migrator.rocksdb.RocksDBReaderTest", groups = "functional")
public class RocksDBReaderTest extends AbstractReaderTest {

private static final String TEST_CACHE_NAME = "leveldbstore";
private static final String SOURCE_DIR = "target/test-classes/";
private static final String SOURCE_DIR = "target/test-classes/leveldbstore/";
private static final String TARGET_DIR = SOURCE_DIR + "/rocksdbstore/";

public void testLevelDbCompatibility() throws Exception {
Properties properties = new Properties();
configureStoreProperties(properties, SOURCE);
configureStoreProperties(properties, TARGET);
// Read from the legacy LevelDB store and populate the new RocksDBStore using latest marshaller
new StoreMigrator(properties).run();

GlobalConfiguration globalConfig = new GlobalConfigurationBuilder()
.serialization().addAdvancedExternalizer(256, new TestUtil.TestObjectExternalizer())
.build();

Configuration config = new ConfigurationBuilder().persistence()
public Configuration getTargetCacheConfig() {
return new ConfigurationBuilder().persistence()
.addStore(RocksDBStoreConfigurationBuilder.class).location(TARGET_DIR).expiredLocation(TARGET_DIR + "-expired-")
.preload(true).ignoreModifications(true)
.build();

// Create a new cache instance, with the required externalizers, to ensure that the new RocksDbStore can be
// loaded and contains all of the expected values.
EmbeddedCacheManager manager = new DefaultCacheManager(globalConfig, config);
Cache cache = manager.getCache(TEST_CACHE_NAME);
for (String key : TestUtil.TEST_MAP.keySet()) {
Object stored = cache.get(key);
assert stored != null;
Object expected = TestUtil.TEST_MAP.get(key);
assert expected != null;
assert expected.equals(stored);
}
}

private void configureStoreProperties(Properties properties, Element type) {
MarshallerType marshallerType = type == SOURCE ? MarshallerType.LEGACY : MarshallerType.CURRENT;
@Override
protected void configureStoreProperties(Properties properties, Element type) {
super.configureStoreProperties(properties, type);
properties.put(propKey(type, TYPE), StoreType.ROCKSDB.toString());
properties.put(propKey(type, CACHE_NAME), TEST_CACHE_NAME);
properties.put(propKey(type, LOCATION), type == SOURCE ? SOURCE_DIR : TARGET_DIR);
properties.put(propKey(type, MARSHALLER, TYPE), marshallerType.toString());
properties.put(propKey(type, MARSHALLER, EXTERNALIZERS), "256:" + TestUtil.TestObjectExternalizer.class.getName());
}
}
Binary file removed tools/src/test/resources/leveldbstore/000003.log
Binary file not shown.
File renamed without changes.
File renamed without changes.
Binary file not shown.

0 comments on commit 117ee12

Please sign in to comment.