Skip to content

Commit

Permalink
Sharding模式下用独立的MVCCShardingTransactionMap
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed Mar 27, 2017
1 parent ff18dab commit fbae72f
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 13 deletions.
Expand Up @@ -25,6 +25,7 @@
import org.lealone.common.exceptions.DbException;
import org.lealone.common.util.DataUtils;
import org.lealone.db.Session;
import org.lealone.mvcc.MVCCTransactionMap.MVCCShardingTransactionMap;
import org.lealone.mvcc.log.RedoLogValue;
import org.lealone.storage.Storage;
import org.lealone.storage.StorageMap;
Expand Down Expand Up @@ -170,7 +171,10 @@ public <K, V> MVCCTransactionMap<K, V> openMap(String name, String mapType, Data
}

protected <K, V> MVCCTransactionMap<K, V> createTransactionMap(StorageMap<K, TransactionalValue> map) {
return new MVCCTransactionMap<>(this, map);
if (isShardingMode())
return new MVCCShardingTransactionMap<>(this, map);
else
return new MVCCTransactionMap<>(this, map);
}

@Override
Expand Down
45 changes: 33 additions & 12 deletions lealone-mvcc/src/main/java/org/lealone/mvcc/MVCCTransactionMap.java
Expand Up @@ -14,6 +14,7 @@

import org.lealone.common.util.DataUtils;
import org.lealone.db.ConnectionInfo;
import org.lealone.db.Session;
import org.lealone.mvcc.MVCCTransaction.LogRecord;
import org.lealone.replication.Replication;
import org.lealone.storage.Storage;
Expand All @@ -35,8 +36,34 @@
*/
public class MVCCTransactionMap<K, V> implements TransactionMap<K, V> {

public static class MVCCShardingTransactionMap<K, V> extends MVCCTransactionMap<K, V> {

private final Replication replication;
private final Session session;
private final DataType valueType;

public MVCCShardingTransactionMap(MVCCTransaction transaction, StorageMap<K, TransactionalValue> map) {
super(transaction, map);
replication = (Replication) map;
session = transaction.getSession();
valueType = getValueType();
ConnectionInfo.setInternalSession(session);
}

@SuppressWarnings("unchecked")
@Override
public V get(K key) {
return (V) replication.get(key, session);
}

@SuppressWarnings("unchecked")
@Override
public V put(K key, V value) {
return (V) replication.put(key, value, valueType, session);
}
}

private final MVCCTransaction transaction;
private final boolean isShardingMode;

/**
* The map used for writing (the latest version).
Expand All @@ -49,9 +76,6 @@ public class MVCCTransactionMap<K, V> implements TransactionMap<K, V> {
public MVCCTransactionMap(MVCCTransaction transaction, StorageMap<K, TransactionalValue> map) {
this.transaction = transaction;
this.map = map;
isShardingMode = transaction.isShardingMode();
if (isShardingMode)
ConnectionInfo.setInternalSession(transaction.getSession());
}

@Override
Expand All @@ -78,9 +102,6 @@ public DataType getValueType() {
@SuppressWarnings("unchecked")
@Override
public V get(K key) {
if (isShardingMode && (map instanceof Replication)) {
return (V) ((Replication) map).get(key, transaction.getSession());
}
TransactionalValue data = map.get(key);
data = getValue(key, data);
return data == null ? null : (V) data.value;
Expand Down Expand Up @@ -150,12 +171,8 @@ protected TransactionalValue getValue(K key, TransactionalValue data, long tid)
* @return the old value
* @throws IllegalStateException if a lock timeout occurs
*/
@SuppressWarnings("unchecked")
@Override
public V put(K key, V value) {
if (isShardingMode && (map instanceof Replication)) {
return (V) ((Replication) map).put(key, value, getValueType(), transaction.getSession());
}
DataUtils.checkArgument(value != null, "The value may not be null");
return set(key, value);
}
Expand Down Expand Up @@ -532,7 +549,11 @@ public long rawSize() {

@Override
public MVCCTransactionMap<K, V> getInstance(Transaction transaction) {
return new MVCCTransactionMap<K, V>((MVCCTransaction) transaction, map);
MVCCTransaction t = (MVCCTransaction) transaction;
if (t.isShardingMode())
return new MVCCShardingTransactionMap<>(t, map);
else
return new MVCCTransactionMap<>(t, map);
}

@Override
Expand Down

0 comments on commit fbae72f

Please sign in to comment.