Skip to content

Commit

Permalink
Fixed problem part of #4313
Browse files Browse the repository at this point in the history
  • Loading branch information
lvca committed Jun 10, 2015
1 parent 3dc8271 commit 945a5fc
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 29 deletions.
Expand Up @@ -19,6 +19,12 @@
*/
package com.orientechnologies.orient.core.db;

import java.io.Closeable;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import com.orientechnologies.orient.core.cache.OLocalRecordCache;
import com.orientechnologies.orient.core.command.OCommandRequest;
import com.orientechnologies.orient.core.config.OContextConfiguration;
Expand All @@ -41,12 +47,6 @@
import com.orientechnologies.orient.core.util.OBackupable;
import com.orientechnologies.orient.core.version.ORecordVersion;

import java.io.Closeable;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

/**
* Generic Database interface. Represents the lower level of the Database providing raw API to access to the raw records.<br>
* Limits:
Expand Down Expand Up @@ -104,7 +104,7 @@ enum ATTRIBUTES {
* Activate current database instance on current thread. Call this method before using the database if you switch between multiple
* databas instances on the same thread or if you pass them across threads.
*/
void activateOnCurrentThread();
ODatabase activateOnCurrentThread();

/**
* Returns true if the current database instance is active on current thread, otherwise false.
Expand Down
Expand Up @@ -19,15 +19,6 @@
*/
package com.orientechnologies.orient.core.db;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Callable;

import com.orientechnologies.orient.core.Orient;
import com.orientechnologies.orient.core.cache.OLocalRecordCache;
import com.orientechnologies.orient.core.command.OCommandOutputListener;
Expand All @@ -41,6 +32,15 @@
import com.orientechnologies.orient.core.storage.ORecordMetadata;
import com.orientechnologies.orient.core.storage.OStorage;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Callable;

@SuppressWarnings("unchecked")
public abstract class ODatabaseWrapperAbstract<DB extends ODatabaseInternal, T> implements ODatabaseInternal<T> {
protected DB underlying;
Expand All @@ -64,8 +64,8 @@ public <THISDB extends ODatabase> THISDB open(final OToken iToken) {
}

@Override
public void activateOnCurrentThread() {
underlying.activateOnCurrentThread();
public ODatabase activateOnCurrentThread() {
return underlying.activateOnCurrentThread();
}

@Override
Expand Down
Expand Up @@ -2747,8 +2747,9 @@ public void setCurrentDatabaseInThreadLocal() {
* Activates current database instance on current thread.
*/
@Override
public void activateOnCurrentThread() {
public ODatabaseDocumentTx activateOnCurrentThread() {
ODatabaseRecordThreadLocal.INSTANCE.set(this);
return this;
}

@Override
Expand Down
Expand Up @@ -19,6 +19,11 @@
*/
package com.orientechnologies.orient.server.hazelcast;

import java.io.Serializable;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

import com.hazelcast.core.HazelcastException;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.core.IMap;
Expand Down Expand Up @@ -49,11 +54,6 @@
import com.orientechnologies.orient.server.distributed.task.OTxTask;
import com.orientechnologies.orient.server.distributed.task.OUpdateRecordTask;

import java.io.Serializable;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
* Hazelcast implementation of distributed peer. There is one instance per database. Each node creates own instance to talk with
* each others.
Expand Down
Expand Up @@ -787,8 +787,7 @@ protected void shutdownConnection() throws IOException {
return;
}

OLogManager.instance().error(this, "Authentication error of remote client %s:%d: shutdown is aborted.",
channel.socket.getInetAddress(), channel.socket.getPort());
OLogManager.instance().error(this, "Authentication error of remote client %s:%d: shutdown is aborted.", channel.socket.getInetAddress(), channel.socket.getPort());

sendErrorOrDropConnection(clientTxId, new OSecurityAccessException("Invalid user/password to shutdown the server"));
}
Expand Down Expand Up @@ -1464,8 +1463,7 @@ protected void readRecord() throws IOException {
}

} else {
final ORecord record = connection.database.load(rid, fetchPlanString, ignoreCache, loadTombstones,
OStorage.LOCKING_STRATEGY.NONE);
final ORecord record = connection.database.load(rid, fetchPlanString, ignoreCache, loadTombstones, OStorage.LOCKING_STRATEGY.NONE);

beginResponse();
try {
Expand Down Expand Up @@ -1617,8 +1615,10 @@ protected void beginResponse() {
protected void endResponse() throws IOException {
// resetting transaction state. Commands are stateless and connection should be cleared
// otherwise reused connection (connections pool) may lead to unpredicted errors
if (connection != null && connection.database != null && connection.database.getTransaction() != null)
if (connection != null && connection.database != null && connection.database.activateOnCurrentThread().getTransaction() != null) {
connection.database.activateOnCurrentThread();
connection.database.getTransaction().rollback();
}
channel.flush();
channel.releaseWriteLock();
}
Expand Down

0 comments on commit 945a5fc

Please sign in to comment.