Skip to content

Commit

Permalink
storage remote refactor for centralize connection reopen
Browse files Browse the repository at this point in the history
  • Loading branch information
tglman committed Dec 22, 2015
1 parent eec10c1 commit a9fa986
Show file tree
Hide file tree
Showing 4 changed files with 461 additions and 526 deletions.
Expand Up @@ -32,6 +32,7 @@
import com.orientechnologies.orient.enterprise.channel.binary.OChannelBinaryAsynchClient; import com.orientechnologies.orient.enterprise.channel.binary.OChannelBinaryAsynchClient;
import com.orientechnologies.orient.enterprise.channel.binary.OChannelBinaryProtocol; import com.orientechnologies.orient.enterprise.channel.binary.OChannelBinaryProtocol;


import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
Expand Down Expand Up @@ -71,14 +72,14 @@ public OBonsaiCollectionPointer getCollectionPointer() {


@Override @Override
public V get(K key) { public V get(K key) {
OStorageRemote storage = (OStorageRemote) ODatabaseRecordThreadLocal.INSTANCE.get().getStorage().getUnderlying(); final OStorageRemote storage = (OStorageRemote) ODatabaseRecordThreadLocal.INSTANCE.get().getStorage().getUnderlying();


final byte[] keyStream = new byte[keySerializer.getObjectSize(key)]; final byte[] keyStream = new byte[keySerializer.getObjectSize(key)];
keySerializer.serialize(key, keyStream, 0); keySerializer.serialize(key, keyStream, 0);
OChannelBinaryAsynchClient client = null; return storage.networkOperation(new OStorageRemoteOperation<V>() {
while (true) { @Override
try { public V execute(final OChannelBinaryAsynchClient client) throws IOException {
client = storage.beginRequest(OChannelBinaryProtocol.REQUEST_SBTREE_BONSAI_GET); storage.beginRequest(client,OChannelBinaryProtocol.REQUEST_SBTREE_BONSAI_GET);
OCollectionNetworkSerializer.INSTANCE.writeCollectionPointer(client, getCollectionPointer()); OCollectionNetworkSerializer.INSTANCE.writeCollectionPointer(client, getCollectionPointer());
client.writeBytes(keyStream); client.writeBytes(keyStream);


Expand All @@ -94,12 +95,10 @@ public V get(K key) {


final byte serializerId = OByteSerializer.INSTANCE.deserializeLiteral(stream, 0); final byte serializerId = OByteSerializer.INSTANCE.deserializeLiteral(stream, 0);
final OBinarySerializer<V> serializer = (OBinarySerializer<V>) OBinarySerializerFactory.getInstance().getObjectSerializer( final OBinarySerializer<V> serializer = (OBinarySerializer<V>) OBinarySerializerFactory.getInstance().getObjectSerializer(
serializerId); serializerId);
return serializer.deserialize(stream, OByteSerializer.BYTE_SIZE); return serializer.deserialize(stream, OByteSerializer.BYTE_SIZE);
} catch (Exception e) {
storage.handleException(client, "Cannot get by key from sb-tree bonsai", e);
} }
} },"Cannot get by key from sb-tree bonsai");
} }


@Override @Override
Expand Down Expand Up @@ -166,23 +165,24 @@ private boolean pushEntriesToListener(RangeResultListener<K, V> listener, List<M
return more; return more;
} }


private List<Map.Entry<K, V>> fetchEntriesMajor(K key, boolean inclusive) { private List<Map.Entry<K, V>> fetchEntriesMajor(final K key,final boolean inclusive) {
byte[] keyStream = new byte[keySerializer.getObjectSize(key)]; final byte[] keyStream = new byte[keySerializer.getObjectSize(key)];
keySerializer.serialize(key, keyStream, 0); keySerializer.serialize(key, keyStream, 0);

final OStorageRemote storage = (OStorageRemote) ODatabaseRecordThreadLocal.INSTANCE.get().getStorage().getUnderlying();
OStorageRemote storage = (OStorageRemote) ODatabaseRecordThreadLocal.INSTANCE.get().getStorage().getUnderlying(); return storage.networkOperation(new OStorageRemoteOperation<List<Map.Entry<K, V>>>() {
OChannelBinaryAsynchClient client = null; @Override
while (true) { public List<Map.Entry<K, V>> execute(final OChannelBinaryAsynchClient client) throws IOException {
try { try {
client = storage.beginRequest(OChannelBinaryProtocol.REQUEST_SBTREE_BONSAI_GET_ENTRIES_MAJOR); storage.beginRequest(client, OChannelBinaryProtocol.REQUEST_SBTREE_BONSAI_GET_ENTRIES_MAJOR);
OCollectionNetworkSerializer.INSTANCE.writeCollectionPointer(client, getCollectionPointer()); OCollectionNetworkSerializer.INSTANCE.writeCollectionPointer(client, getCollectionPointer());
client.writeBytes(keyStream); client.writeBytes(keyStream);
client.writeBoolean(inclusive); client.writeBoolean(inclusive);


if (client.getSrvProtocolVersion() >= 21) if (client.getSrvProtocolVersion() >= 21)
client.writeInt(128); client.writeInt(128);

}finally {
storage.endRequest(client); storage.endRequest(client);
}
List<Map.Entry<K, V>> list = null; List<Map.Entry<K, V>> list = null;
try { try {
storage.beginResponse(client); storage.beginResponse(client);
Expand All @@ -203,11 +203,8 @@ private List<Map.Entry<K, V>> fetchEntriesMajor(K key, boolean inclusive) {
} }


return list; return list;

} catch (Exception e) {
storage.handleException(client, "Cannot get first key from sb-tree bonsai", e);
} }
} },"Cannot get first key from sb-tree bonsai");
} }


@Override @Override
Expand All @@ -218,12 +215,11 @@ public Collection<V> getValuesBetween(K keyFrom, boolean fromInclusive, K keyTo,


@Override @Override
public K firstKey() { public K firstKey() {
OStorageRemote storage = (OStorageRemote) ODatabaseRecordThreadLocal.INSTANCE.get().getStorage().getUnderlying(); final OStorageRemote storage = (OStorageRemote) ODatabaseRecordThreadLocal.INSTANCE.get().getStorage().getUnderlying();

return storage.networkOperation(new OStorageRemoteOperation<K>() {
OChannelBinaryAsynchClient client = null; @Override
while (true) { public K execute(final OChannelBinaryAsynchClient client) throws IOException {
try { storage.beginRequest(client,OChannelBinaryProtocol.REQUEST_SBTREE_BONSAI_FIRST_KEY);
client = storage.beginRequest(OChannelBinaryProtocol.REQUEST_SBTREE_BONSAI_FIRST_KEY);
OCollectionNetworkSerializer.INSTANCE.writeCollectionPointer(client, getCollectionPointer()); OCollectionNetworkSerializer.INSTANCE.writeCollectionPointer(client, getCollectionPointer());
storage.endRequest(client); storage.endRequest(client);
byte[] stream; byte[] stream;
Expand All @@ -236,12 +232,10 @@ public K firstKey() {


final byte serializerId = OByteSerializer.INSTANCE.deserializeLiteral(stream, 0); final byte serializerId = OByteSerializer.INSTANCE.deserializeLiteral(stream, 0);
final OBinarySerializer<K> serializer = (OBinarySerializer<K>) OBinarySerializerFactory.getInstance().getObjectSerializer( final OBinarySerializer<K> serializer = (OBinarySerializer<K>) OBinarySerializerFactory.getInstance().getObjectSerializer(
serializerId); serializerId);
return serializer.deserialize(stream, OByteSerializer.BYTE_SIZE); return serializer.deserialize(stream, OByteSerializer.BYTE_SIZE);
} catch (Exception e) {
storage.handleException(client, "Cannot get first key from sb-tree bonsai", e);
} }
} },"Cannot get first key from sb-tree bonsai");
} }


@Override @Override
Expand All @@ -256,34 +250,33 @@ public void loadEntriesBetween(K keyFrom, boolean fromInclusive, K keyTo, boolea
} }


@Override @Override
public int getRealBagSize(Map<K, OSBTreeRidBag.Change> changes) { public int getRealBagSize(final Map<K, OSBTreeRidBag.Change> changes) {
OStorageRemote storage = (OStorageRemote) ODatabaseRecordThreadLocal.INSTANCE.get().getStorage().getUnderlying(); final OStorageRemote storage = (OStorageRemote) ODatabaseRecordThreadLocal.INSTANCE.get().getStorage().getUnderlying();
OChannelBinaryAsynchClient client = null; return storage.networkOperation(new OStorageRemoteOperation<Integer>() {
while (true) { @Override
try { public Integer execute(OChannelBinaryAsynchClient client) throws IOException {
client = storage.beginRequest(OChannelBinaryProtocol.REQUEST_RIDBAG_GET_SIZE); try {
OCollectionNetworkSerializer.INSTANCE.writeCollectionPointer(client, getCollectionPointer()); storage.beginRequest(client, OChannelBinaryProtocol.REQUEST_RIDBAG_GET_SIZE);

OCollectionNetworkSerializer.INSTANCE.writeCollectionPointer(client, getCollectionPointer());
final OSBTreeRidBag.ChangeSerializationHelper changeSerializer = OSBTreeRidBag.ChangeSerializationHelper.INSTANCE;
final byte[] stream = new byte[OIntegerSerializer.INT_SIZE + changeSerializer.getChangesSerializedSize(changes.size())];
changeSerializer.serializeChanges(changes, keySerializer, stream, 0);


client.writeBytes(stream); final OSBTreeRidBag.ChangeSerializationHelper changeSerializer = OSBTreeRidBag.ChangeSerializationHelper.INSTANCE;
final byte[] stream = new byte[OIntegerSerializer.INT_SIZE + changeSerializer.getChangesSerializedSize(changes.size())];
changeSerializer.serializeChanges(changes, keySerializer, stream, 0);


storage.endRequest(client); client.writeBytes(stream);
} finally {
storage.endRequest(client);
}
int result; int result;
try { try {
storage.beginResponse(client); storage.beginResponse(client);
result = client.readInt(); result = client.readInt();
} finally { } finally {
storage.endResponse(client); storage.endResponse(client);
} }

return result; return result;
} catch (Exception e) {
storage.handleException(client, "Cannot get by real bag size sb-tree bonsai", e);
} }
} }, "Cannot get by real bag size sb-tree bonsai");
} }


@Override @Override
Expand Down
Expand Up @@ -36,6 +36,7 @@
import com.orientechnologies.orient.enterprise.channel.binary.OChannelBinaryAsynchClient; import com.orientechnologies.orient.enterprise.channel.binary.OChannelBinaryAsynchClient;
import com.orientechnologies.orient.enterprise.channel.binary.OChannelBinaryProtocol; import com.orientechnologies.orient.enterprise.channel.binary.OChannelBinaryProtocol;


import java.io.IOException;
import java.lang.ref.WeakReference; import java.lang.ref.WeakReference;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
Expand Down Expand Up @@ -80,15 +81,18 @@ public void onStartup() {
} }


@Override @Override
protected OSBTreeBonsaiRemote<OIdentifiable, Integer> createTree(int clusterId) { protected OSBTreeBonsaiRemote<OIdentifiable, Integer> createTree(final int clusterId) {
if (remoteCreationAllowed) { if (remoteCreationAllowed) {
OStorageRemote storage = (OStorageRemote) ODatabaseRecordThreadLocal.INSTANCE.get().getStorage().getUnderlying(); final OStorageRemote storage = (OStorageRemote) ODatabaseRecordThreadLocal.INSTANCE.get().getStorage().getUnderlying();
OChannelBinaryAsynchClient client = null; return storage.networkOperation(new OStorageRemoteOperation<OSBTreeBonsaiRemote<OIdentifiable, Integer>>() {
while (true) { @Override
try { public OSBTreeBonsaiRemote<OIdentifiable, Integer> execute(final OChannelBinaryAsynchClient client) throws IOException {
client = storage.beginRequest(OChannelBinaryProtocol.REQUEST_CREATE_SBTREE_BONSAI); try {
client.writeInt(clusterId); storage.beginRequest(client, OChannelBinaryProtocol.REQUEST_CREATE_SBTREE_BONSAI);
storage.endRequest(client); client.writeInt(clusterId);
}finally {
storage.endRequest(client);
}
OBonsaiCollectionPointer pointer; OBonsaiCollectionPointer pointer;
try { try {
storage.beginResponse(client); storage.beginResponse(client);
Expand All @@ -101,10 +105,8 @@ protected OSBTreeBonsaiRemote<OIdentifiable, Integer> createTree(int clusterId)
OBinarySerializer<Integer> valueSerializer = OIntegerSerializer.INSTANCE; OBinarySerializer<Integer> valueSerializer = OIntegerSerializer.INSTANCE;


return new OSBTreeBonsaiRemote<OIdentifiable, Integer>(pointer, keySerializer, valueSerializer); return new OSBTreeBonsaiRemote<OIdentifiable, Integer>(pointer, keySerializer, valueSerializer);
} catch (Exception e2) {
storage.handleException(client, "Cannot create sb-tree bonsai", e2);
} }
} },"Cannot create sb-tree bonsai");
} else { } else {
throw new UnsupportedOperationException("Creation of SB-Tree from remote storage is not allowed"); throw new UnsupportedOperationException("Creation of SB-Tree from remote storage is not allowed");
} }
Expand Down

0 comments on commit a9fa986

Please sign in to comment.