Skip to content

Commit

Permalink
enhancing LiveQuery callback interface + fix to db.copy()
Browse files Browse the repository at this point in the history
  • Loading branch information
luigidellaquila authored and tglman committed Sep 7, 2015
1 parent 2281b3e commit 9598113
Show file tree
Hide file tree
Showing 17 changed files with 477 additions and 146 deletions.
Expand Up @@ -19,22 +19,6 @@
*/ */
package com.orientechnologies.orient.client.remote; package com.orientechnologies.orient.client.remote;


import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;

import javax.naming.NamingException;
import javax.naming.directory.Attribute;
import javax.naming.directory.Attributes;
import javax.naming.directory.DirContext;
import javax.naming.directory.InitialDirContext;

import com.orientechnologies.common.concur.lock.OModificationOperationProhibitedException; import com.orientechnologies.common.concur.lock.OModificationOperationProhibitedException;
import com.orientechnologies.common.exception.OException; import com.orientechnologies.common.exception.OException;
import com.orientechnologies.common.io.OIOException; import com.orientechnologies.common.io.OIOException;
Expand All @@ -58,11 +42,7 @@
import com.orientechnologies.orient.core.db.record.ORecordOperation; import com.orientechnologies.orient.core.db.record.ORecordOperation;
import com.orientechnologies.orient.core.db.record.ridbag.sbtree.OBonsaiCollectionPointer; import com.orientechnologies.orient.core.db.record.ridbag.sbtree.OBonsaiCollectionPointer;
import com.orientechnologies.orient.core.db.record.ridbag.sbtree.OSBTreeCollectionManager; import com.orientechnologies.orient.core.db.record.ridbag.sbtree.OSBTreeCollectionManager;
import com.orientechnologies.orient.core.exception.OCommandExecutionException; import com.orientechnologies.orient.core.exception.*;
import com.orientechnologies.orient.core.exception.ODatabaseException;
import com.orientechnologies.orient.core.exception.ORecordNotFoundException;
import com.orientechnologies.orient.core.exception.OStorageException;
import com.orientechnologies.orient.core.exception.OTransactionException;
import com.orientechnologies.orient.core.id.ORID; import com.orientechnologies.orient.core.id.ORID;
import com.orientechnologies.orient.core.id.ORecordId; import com.orientechnologies.orient.core.id.ORecordId;
import com.orientechnologies.orient.core.record.ORecord; import com.orientechnologies.orient.core.record.ORecord;
Expand All @@ -74,14 +54,7 @@
import com.orientechnologies.orient.core.serialization.serializer.stream.OStreamSerializerAnyStreamable; import com.orientechnologies.orient.core.serialization.serializer.stream.OStreamSerializerAnyStreamable;
import com.orientechnologies.orient.core.sql.query.OLiveQuery; import com.orientechnologies.orient.core.sql.query.OLiveQuery;
import com.orientechnologies.orient.core.sql.query.OLiveResultListener; import com.orientechnologies.orient.core.sql.query.OLiveResultListener;
import com.orientechnologies.orient.core.storage.OCluster; import com.orientechnologies.orient.core.storage.*;
import com.orientechnologies.orient.core.storage.OPhysicalPosition;
import com.orientechnologies.orient.core.storage.ORawBuffer;
import com.orientechnologies.orient.core.storage.ORecordCallback;
import com.orientechnologies.orient.core.storage.ORecordMetadata;
import com.orientechnologies.orient.core.storage.OStorageAbstract;
import com.orientechnologies.orient.core.storage.OStorageOperationResult;
import com.orientechnologies.orient.core.storage.OStorageProxy;
import com.orientechnologies.orient.core.storage.impl.local.paginated.ORecordSerializationContext; import com.orientechnologies.orient.core.storage.impl.local.paginated.ORecordSerializationContext;
import com.orientechnologies.orient.core.tx.OTransaction; import com.orientechnologies.orient.core.tx.OTransaction;
import com.orientechnologies.orient.core.tx.OTransactionAbstract; import com.orientechnologies.orient.core.tx.OTransactionAbstract;
Expand All @@ -91,6 +64,17 @@
import com.orientechnologies.orient.enterprise.channel.binary.OChannelBinaryProtocol; import com.orientechnologies.orient.enterprise.channel.binary.OChannelBinaryProtocol;
import com.orientechnologies.orient.enterprise.channel.binary.ORemoteServerEventListener; import com.orientechnologies.orient.enterprise.channel.binary.ORemoteServerEventListener;


import javax.naming.NamingException;
import javax.naming.directory.Attribute;
import javax.naming.directory.Attributes;
import javax.naming.directory.DirContext;
import javax.naming.directory.InitialDirContext;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.*;
import java.util.concurrent.*;

/** /**
* This object is bound to each remote ODatabase instances. * This object is bound to each remote ODatabase instances.
*/ */
Expand Down Expand Up @@ -405,8 +389,8 @@ public OStorageOperationResult<OPhysicalPosition> createRecord(final ORecordId i
ORecordVersion iRecordVersion, final byte iRecordType, int iMode, final ORecordCallback<Long> iCallback) { ORecordVersion iRecordVersion, final byte iRecordType, int iMode, final ORecordCallback<Long> iCallback) {


if (iMode == 1 && iCallback == null) if (iMode == 1 && iCallback == null)
// ASYNCHRONOUS MODE NO ANSWER
iMode = 2; iMode = 2;
// ASYNCHRONOUS MODE NO ANSWER


final OPhysicalPosition ppos = new OPhysicalPosition(iRecordType); final OPhysicalPosition ppos = new OPhysicalPosition(iRecordType);


Expand Down Expand Up @@ -2362,4 +2346,14 @@ public Object call() throws Exception {
} }
return false; return false;
} }

@Override public OStorageRemote copy() {
try {
openRemoteDatabase();
} catch (IOException e) {
e.printStackTrace();
}
return this;
}

} }
Expand Up @@ -19,16 +19,6 @@
*/ */
package com.orientechnologies.orient.client.remote; package com.orientechnologies.orient.client.remote;


import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;

import com.orientechnologies.orient.core.Orient; import com.orientechnologies.orient.core.Orient;
import com.orientechnologies.orient.core.command.OCommandOutputListener; import com.orientechnologies.orient.core.command.OCommandOutputListener;
import com.orientechnologies.orient.core.command.OCommandRequestText; import com.orientechnologies.orient.core.command.OCommandRequestText;
Expand All @@ -40,20 +30,23 @@
import com.orientechnologies.orient.core.id.ORID; import com.orientechnologies.orient.core.id.ORID;
import com.orientechnologies.orient.core.id.ORecordId; import com.orientechnologies.orient.core.id.ORecordId;
import com.orientechnologies.orient.core.record.impl.ODocument; import com.orientechnologies.orient.core.record.impl.ODocument;
import com.orientechnologies.orient.core.storage.OCluster; import com.orientechnologies.orient.core.storage.*;
import com.orientechnologies.orient.core.storage.OPhysicalPosition;
import com.orientechnologies.orient.core.storage.ORawBuffer;
import com.orientechnologies.orient.core.storage.ORecordCallback;
import com.orientechnologies.orient.core.storage.ORecordMetadata;
import com.orientechnologies.orient.core.storage.OStorage;
import com.orientechnologies.orient.core.storage.OStorageOperationResult;
import com.orientechnologies.orient.core.storage.OStorageProxy;
import com.orientechnologies.orient.core.tx.OTransaction; import com.orientechnologies.orient.core.tx.OTransaction;
import com.orientechnologies.orient.core.version.ORecordVersion; import com.orientechnologies.orient.core.version.ORecordVersion;
import com.orientechnologies.orient.core.version.OVersionFactory; import com.orientechnologies.orient.core.version.OVersionFactory;
import com.orientechnologies.orient.enterprise.channel.binary.OChannelBinaryAsynchClient; import com.orientechnologies.orient.enterprise.channel.binary.OChannelBinaryAsynchClient;
import com.orientechnologies.orient.enterprise.channel.binary.ORemoteServerEventListener; import com.orientechnologies.orient.enterprise.channel.binary.ORemoteServerEventListener;


import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;

/** /**
* Wrapper of OStorageRemote that maintains the sessionId. It's bound to the ODatabase and allow to use the shared OStorageRemote. * Wrapper of OStorageRemote that maintains the sessionId. It's bound to the ODatabase and allow to use the shared OStorageRemote.
*/ */
Expand Down Expand Up @@ -176,6 +169,19 @@ public int removeUser() {
} }
} }


@Override
public OStorageProxy copy() {
try {
OStorageRemoteThread a = new OStorageRemoteThread(delegate);
delegate.openRemoteDatabase();
a.popSession();
return a;
} catch (IOException e) {
e.printStackTrace();
}
return null;
}

public void close() { public void close() {
pushSession(); pushSession();
try { try {
Expand Down
Expand Up @@ -34,30 +34,12 @@
import com.orientechnologies.orient.core.config.OContextConfiguration; import com.orientechnologies.orient.core.config.OContextConfiguration;
import com.orientechnologies.orient.core.config.OGlobalConfiguration; import com.orientechnologies.orient.core.config.OGlobalConfiguration;
import com.orientechnologies.orient.core.conflict.ORecordConflictStrategy; import com.orientechnologies.orient.core.conflict.ORecordConflictStrategy;
import com.orientechnologies.orient.core.db.ODatabase; import com.orientechnologies.orient.core.db.*;
import com.orientechnologies.orient.core.db.ODatabaseDocumentInternal; import com.orientechnologies.orient.core.db.record.*;
import com.orientechnologies.orient.core.db.ODatabaseInternal;
import com.orientechnologies.orient.core.db.ODatabaseLifecycleListener;
import com.orientechnologies.orient.core.db.ODatabaseListener;
import com.orientechnologies.orient.core.db.ODatabaseRecordThreadLocal;
import com.orientechnologies.orient.core.db.OHookReplacedRecordThreadLocal;
import com.orientechnologies.orient.core.db.OScenarioThreadLocal;
import com.orientechnologies.orient.core.db.record.OClassTrigger;
import com.orientechnologies.orient.core.db.record.OCurrentStorageComponentsFactory;
import com.orientechnologies.orient.core.db.record.OIdentifiable;
import com.orientechnologies.orient.core.db.record.ORecordElement;
import com.orientechnologies.orient.core.db.record.ORecordOperation;
import com.orientechnologies.orient.core.db.record.ridbag.sbtree.ORidBagDeleteHook; import com.orientechnologies.orient.core.db.record.ridbag.sbtree.ORidBagDeleteHook;
import com.orientechnologies.orient.core.db.record.ridbag.sbtree.OSBTreeCollectionManager; import com.orientechnologies.orient.core.db.record.ridbag.sbtree.OSBTreeCollectionManager;
import com.orientechnologies.orient.core.dictionary.ODictionary; import com.orientechnologies.orient.core.dictionary.ODictionary;
import com.orientechnologies.orient.core.exception.OConcurrentModificationException; import com.orientechnologies.orient.core.exception.*;
import com.orientechnologies.orient.core.exception.ODatabaseException;
import com.orientechnologies.orient.core.exception.ORecordNotFoundException;
import com.orientechnologies.orient.core.exception.OSchemaException;
import com.orientechnologies.orient.core.exception.OSecurityAccessException;
import com.orientechnologies.orient.core.exception.OTransactionBlockedException;
import com.orientechnologies.orient.core.exception.OTransactionException;
import com.orientechnologies.orient.core.exception.OValidationException;
import com.orientechnologies.orient.core.fetch.OFetchHelper; import com.orientechnologies.orient.core.fetch.OFetchHelper;
import com.orientechnologies.orient.core.hook.ORecordHook; import com.orientechnologies.orient.core.hook.ORecordHook;
import com.orientechnologies.orient.core.id.ORID; import com.orientechnologies.orient.core.id.ORID;
Expand Down Expand Up @@ -88,13 +70,7 @@
import com.orientechnologies.orient.core.serialization.serializer.record.ORecordSerializerFactory; import com.orientechnologies.orient.core.serialization.serializer.record.ORecordSerializerFactory;
import com.orientechnologies.orient.core.serialization.serializer.record.string.ORecordSerializerSchemaAware2CSV; import com.orientechnologies.orient.core.serialization.serializer.record.string.ORecordSerializerSchemaAware2CSV;
import com.orientechnologies.orient.core.sql.parser.OStatement; import com.orientechnologies.orient.core.sql.parser.OStatement;
import com.orientechnologies.orient.core.storage.OPhysicalPosition; import com.orientechnologies.orient.core.storage.*;
import com.orientechnologies.orient.core.storage.ORawBuffer;
import com.orientechnologies.orient.core.storage.ORecordCallback;
import com.orientechnologies.orient.core.storage.ORecordMetadata;
import com.orientechnologies.orient.core.storage.OStorage;
import com.orientechnologies.orient.core.storage.OStorageOperationResult;
import com.orientechnologies.orient.core.storage.OStorageProxy;
import com.orientechnologies.orient.core.storage.impl.local.OAbstractPaginatedStorage; import com.orientechnologies.orient.core.storage.impl.local.OAbstractPaginatedStorage;
import com.orientechnologies.orient.core.storage.impl.local.OFreezableStorage; import com.orientechnologies.orient.core.storage.impl.local.OFreezableStorage;
import com.orientechnologies.orient.core.storage.impl.local.paginated.OLocalPaginatedStorage; import com.orientechnologies.orient.core.storage.impl.local.paginated.OLocalPaginatedStorage;
Expand Down Expand Up @@ -475,6 +451,7 @@ public void drop() {
} }


public ODatabaseDocumentTx copy() { public ODatabaseDocumentTx copy() {
ODatabaseDocumentInternal dbInThreadLocal = ODatabaseRecordThreadLocal.INSTANCE.getIfDefined();
if (this.isClosed()) if (this.isClosed())
throw new ODatabaseException("Cannot copy a closed db"); throw new ODatabaseException("Cannot copy a closed db");


Expand All @@ -487,16 +464,23 @@ public ODatabaseDocumentTx copy() {
db.metadata = new OMetadataDefault(db); db.metadata = new OMetadataDefault(db);


db.initialized = true; db.initialized = true;
db.storage = storage; if (storage instanceof OStorageProxy) {

db.storage = ((OStorageProxy) storage).copy();
if (storage instanceof OStorageProxy)
((OStorageProxy) db.storage).addUser(); ((OStorageProxy) db.storage).addUser();
} else {
db.storage = storage;
}


db.setStatus(STATUS.OPEN); db.setStatus(STATUS.OPEN);
db.activateOnCurrentThread();
db.metadata.load(); db.metadata.load();
// callOnOpenListeners();
// activateOnCurrentThread(); if (dbInThreadLocal != null) {
dbInThreadLocal.activateOnCurrentThread();
}else{
if(ODatabaseRecordThreadLocal.INSTANCE.isDefined()) {
ODatabaseRecordThreadLocal.INSTANCE.remove();
}
}
return db; return db;
} }


Expand Down

0 comments on commit 9598113

Please sign in to comment.