Skip to content

Commit

Permalink
lastKey(_rowid_)的递增不在StandardPrimaryIndex层控制,转到StorageMap层,手动指定_rowid…
Browse files Browse the repository at this point in the history
…_时需要检查重复,不指定_rowid_时总是从lastKey开始递增
  • Loading branch information
codefollower committed Apr 1, 2017
1 parent 6de04e6 commit aa4d3bb
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 37 deletions.
Expand Up @@ -29,6 +29,7 @@
import org.lealone.common.util.DataUtils; import org.lealone.common.util.DataUtils;
import org.lealone.storage.Storage; import org.lealone.storage.Storage;
import org.lealone.storage.StorageMap; import org.lealone.storage.StorageMap;
import org.lealone.storage.StorageMapBase;
import org.lealone.storage.StorageMapCursor; import org.lealone.storage.StorageMapCursor;
import org.lealone.storage.type.DataType; import org.lealone.storage.type.DataType;


Expand Down Expand Up @@ -78,6 +79,9 @@ public V get(K key) {


@Override @Override
public V put(K key, V value) { public V put(K key, V value) {
if (map instanceof StorageMapBase) {
((StorageMapBase<K, V>) map).setLastKey(key);
}
return (V) buffer.put(key, value); return (V) buffer.put(key, value);
} }


Expand Down
Expand Up @@ -13,7 +13,6 @@
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;


import org.lealone.aose.config.ConfigDescriptor; import org.lealone.aose.config.ConfigDescriptor;
import org.lealone.aose.gms.Gossiper; import org.lealone.aose.gms.Gossiper;
Expand Down Expand Up @@ -82,9 +81,6 @@ public BTreeMap<K, V> openMap() {
*/ */
protected volatile BTreePage root; protected volatile BTreePage root;


// TODO 考虑是否要使用总是递增的数字
protected final AtomicLong lastKey = new AtomicLong(0);

@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
protected BTreeMap(String name, DataType keyType, DataType valueType, Map<String, Object> config, protected BTreeMap(String name, DataType keyType, DataType valueType, Map<String, Object> config,
AOStorage aoStorage) { AOStorage aoStorage) {
Expand All @@ -99,9 +95,10 @@ protected BTreeMap(String name, DataType keyType, DataType valueType, Map<String


storage = new BTreeStorage((BTreeMap<Object, Object>) this); storage = new BTreeStorage((BTreeMap<Object, Object>) this);


if (storage.lastChunk != null) if (storage.lastChunk != null) {
root = storage.readPage(storage.lastChunk.rootPagePos); root = storage.readPage(storage.lastChunk.rootPagePos);
else { setLastKey(lastKey());
} else {
root = BTreePage.createEmpty(this); root = BTreePage.createEmpty(this);
if (isShardingMode) { if (isShardingMode) {
String initReplicationEndpoints = (String) config.get("initReplicationEndpoints"); String initReplicationEndpoints = (String) config.get("initReplicationEndpoints");
Expand All @@ -115,11 +112,6 @@ protected BTreeMap(String name, DataType keyType, DataType valueType, Map<String
} }
} }
} }

K lastKey = lastKey();
if (lastKey != null && lastKey instanceof ValueLong) {
this.lastKey.set(((ValueLong) lastKey).getLong());
}
} }


@Override @Override
Expand Down Expand Up @@ -223,6 +215,7 @@ protected Object put(BTreePage p, Object key, Object value) {
if (index < 0) { if (index < 0) {
index = -index - 1; index = -index - 1;
p.insertLeaf(index, key, value); p.insertLeaf(index, key, value);
setLastKey(key);
return null; return null;
} }
return p.setValue(index, value); return p.setValue(index, value);
Expand Down
Expand Up @@ -17,7 +17,10 @@
*/ */
package org.lealone.storage; package org.lealone.storage;


import java.util.concurrent.atomic.AtomicLong;

import org.lealone.common.util.DataUtils; import org.lealone.common.util.DataUtils;
import org.lealone.db.value.ValueLong;
import org.lealone.storage.type.DataType; import org.lealone.storage.type.DataType;
import org.lealone.storage.type.ObjectDataType; import org.lealone.storage.type.ObjectDataType;


Expand All @@ -26,6 +29,8 @@ public abstract class StorageMapBase<K, V> implements StorageMap<K, V> {
protected final String name; protected final String name;
protected final DataType keyType; protected final DataType keyType;
protected final DataType valueType; protected final DataType valueType;
// TODO 考虑是否要使用总是递增的数字
protected final AtomicLong lastKey = new AtomicLong(0);


protected StorageMapBase(String name, DataType keyType, DataType valueType) { protected StorageMapBase(String name, DataType keyType, DataType valueType) {
DataUtils.checkArgument(name != null, "The name may not be null"); DataUtils.checkArgument(name != null, "The name may not be null");
Expand Down Expand Up @@ -55,4 +60,21 @@ public DataType getValueType() {
return valueType; return valueType;
} }


// 如果新key比lastKey大就更新lastKey
// 允许多线程并发更新
public void setLastKey(Object key) {
if (key instanceof ValueLong) {
long k = ((ValueLong) key).getLong();
while (true) {
long old = lastKey.get();
if (k > old) {
if (lastKey.compareAndSet(old, k))
break;
} else {
break;
}
}
}
}

} }
Expand Up @@ -22,7 +22,6 @@
import java.nio.channels.WritableByteChannel; import java.nio.channels.WritableByteChannel;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;


import org.lealone.db.value.ValueLong; import org.lealone.db.value.ValueLong;
import org.lealone.storage.Storage; import org.lealone.storage.Storage;
Expand Down Expand Up @@ -77,11 +76,13 @@ public V get(K key) {


@Override @Override
public V put(K key, V value) { public V put(K key, V value) {
setLastKey(key);
return skipListMap.put(key, value); return skipListMap.put(key, value);
} }


@Override @Override
public V putIfAbsent(K key, V value) { public V putIfAbsent(K key, V value) {
setLastKey(key);
return skipListMap.putIfAbsent(key, value); return skipListMap.putIfAbsent(key, value);
} }


Expand Down Expand Up @@ -228,8 +229,6 @@ public Storage getStorage() {
return memoryStorage; return memoryStorage;
} }


private final AtomicLong lastKey = new AtomicLong(0);

@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public K append(V value) { public K append(V value) {
Expand Down
Expand Up @@ -11,7 +11,6 @@
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicLong;


import org.lealone.api.ErrorCode; import org.lealone.api.ErrorCode;
import org.lealone.common.exceptions.DbException; import org.lealone.common.exceptions.DbException;
Expand Down Expand Up @@ -50,7 +49,6 @@ public class StandardPrimaryIndex extends IndexBase {
private final StandardTable table; private final StandardTable table;
private final String mapName; private final String mapName;
private final TransactionMap<Value, VersionedValue> dataMap; private final TransactionMap<Value, VersionedValue> dataMap;
private final AtomicLong lastKey = new AtomicLong(0);
private int mainIndexColumn = -1; private int mainIndexColumn = -1;


public StandardPrimaryIndex(ServerSession session, StandardTable table) { public StandardPrimaryIndex(ServerSession session, StandardTable table) {
Expand Down Expand Up @@ -81,9 +79,6 @@ public StandardPrimaryIndex(ServerSession session, StandardTable table) {
dataMap = transactionEngine.beginTransaction(false, isShardingMode).openMap(mapName, table.getMapType(), dataMap = transactionEngine.beginTransaction(false, isShardingMode).openMap(mapName, table.getMapType(),
keyType, vvType, storage, isShardingMode, initReplicationEndpoints); keyType, vvType, storage, isShardingMode, initReplicationEndpoints);
transactionEngine.addTransactionMap(dataMap); transactionEngine.addTransactionMap(dataMap);

Value k = dataMap.lastKey();
lastKey.set(k == null ? 0 : k.getLong());
} }


@Override @Override
Expand Down Expand Up @@ -125,7 +120,6 @@ public void add(ServerSession session, Row row) {
boolean checkDuplicateKey = true; boolean checkDuplicateKey = true;
if (mainIndexColumn == -1) { if (mainIndexColumn == -1) {
if (row.getKey() == 0) { if (row.getKey() == 0) {
row.setKey(lastKey.incrementAndGet());
checkDuplicateKey = false; checkDuplicateKey = false;
} }
} else { } else {
Expand Down Expand Up @@ -167,23 +161,9 @@ public void add(ServerSession session, Row row) {
throw DbException.get(ErrorCode.CONCURRENT_UPDATE_1, e, table.getName()); throw DbException.get(ErrorCode.CONCURRENT_UPDATE_1, e, table.getName());
} }
} else { } else {
if (!session.isLocal() && session.isShardingMode()) { key = map.append(value);
key = map.append(value); row.setKey(key.getLong());
row.setKey(key.getLong());
} else {
key = ValueLong.get(row.getKey());
try {
map.put(key, value);
} catch (IllegalStateException e) {
throw DbException.get(ErrorCode.CONCURRENT_UPDATE_1, e, table.getName());
}
}
}
// because it's possible to directly update the key using the _rowid_ syntax
if (row.getKey() > lastKey.get()) {
lastKey.set(row.getKey());
} }

session.setLastRow(row); session.setLastRow(row);
session.setLastIndex(this); session.setLastIndex(this);
} }
Expand Down
Expand Up @@ -553,6 +553,8 @@ public K append(V value) { // 追加新记录时不会产生事务冲突
TransactionalValue newValue = new TransactionalValue(transaction, value); TransactionalValue newValue = new TransactionalValue(transaction, value);
K key = map.append(newValue); K key = map.append(newValue);
String mapName = getName(); String mapName = getName();
// 记事务log和append新值都是更新内存中的相应数据结构,所以不必把log调用放在append前面
// 放在前面的话调用log方法时就不知道key是什么,当事务要rollback时就不知道如何修改map的内存数据
transaction.log(mapName, key, null, newValue); transaction.log(mapName, key, null, newValue);
transaction.lastKey = key; transaction.lastKey = key;
transaction.lastValue = newValue; transaction.lastValue = newValue;
Expand Down
Expand Up @@ -47,6 +47,16 @@ void insert() throws Exception {
stmt.executeUpdate("insert into PseudoColumnTest(f1, f2, f3) values(5,2,3)"); stmt.executeUpdate("insert into PseudoColumnTest(f1, f2, f3) values(5,2,3)");
stmt.executeUpdate("insert into PseudoColumnTest(f1, f2, f3) values(3,2,3)"); stmt.executeUpdate("insert into PseudoColumnTest(f1, f2, f3) values(3,2,3)");
stmt.executeUpdate("insert into PseudoColumnTest(f1, f2, f3) values(8,2,3)"); stmt.executeUpdate("insert into PseudoColumnTest(f1, f2, f3) values(8,2,3)");

stmt.executeUpdate("drop table IF EXISTS PseudoColumnTest2");
stmt.executeUpdate("create table IF NOT EXISTS PseudoColumnTest2(f1 int, f2 int, f3 int)");
// 手动指定_rowid_为2
stmt.executeUpdate("insert into PseudoColumnTest2(_rowid_, f1, f2, f3) values(2,8,2,3)");
// 自动生成的_rowid_从3开始
stmt.executeUpdate("insert into PseudoColumnTest2(f1, f2, f3) values(8,2,3)");

sql = "SELECT count(*) FROM PseudoColumnTest2 WHERE _rowid_=3";
assertEquals(1, getIntValue(1, true));
} }


void select() throws Exception { void select() throws Exception {
Expand Down
Expand Up @@ -92,7 +92,7 @@ void select() throws Exception {
} }


void testMultiThread(String dbName) throws Exception { void testMultiThread(String dbName) throws Exception {
// 启动两个新事务更新同一行,可以用来测试Replication冲突的场景 // 启动两个线程,可以用来测试_rowid_递增的场景
Thread t1 = new Thread(new MultiThreadShardingCrudTest(dbName)); Thread t1 = new Thread(new MultiThreadShardingCrudTest(dbName));
Thread t2 = new Thread(new MultiThreadShardingCrudTest(dbName)); Thread t2 = new Thread(new MultiThreadShardingCrudTest(dbName));
t1.start(); t1.start();
Expand Down

0 comments on commit aa4d3bb

Please sign in to comment.