Skip to content

Commit

Permalink
实现OnlineDDL
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed Jun 15, 2016
1 parent 97f7c1f commit 28e7297
Show file tree
Hide file tree
Showing 22 changed files with 950 additions and 126 deletions.
2 changes: 2 additions & 0 deletions lealone-common/src/main/java/org/lealone/sql/SQLParser.java
Expand Up @@ -25,4 +25,6 @@ public interface SQLParser {


ParsedStatement parse(String sql); ParsedStatement parse(String sql);


Object parseColumnForTable(String columnSql);

} }
107 changes: 105 additions & 2 deletions lealone-db/src/main/java/org/lealone/db/Database.java
Expand Up @@ -11,7 +11,9 @@
import java.lang.Thread.UncaughtExceptionHandler; import java.lang.Thread.UncaughtExceptionHandler;
import java.sql.Connection; import java.sql.Connection;
import java.sql.DriverManager; import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
Expand Down Expand Up @@ -54,6 +56,7 @@
import org.lealone.db.table.IndexColumn; import org.lealone.db.table.IndexColumn;
import org.lealone.db.table.MetaTable; import org.lealone.db.table.MetaTable;
import org.lealone.db.table.Table; import org.lealone.db.table.Table;
import org.lealone.db.table.TableAlterHistoryRecord;
import org.lealone.db.table.TableView; import org.lealone.db.table.TableView;
import org.lealone.db.util.SourceCompiler; import org.lealone.db.util.SourceCompiler;
import org.lealone.db.value.CaseInsensitiveMap; import org.lealone.db.value.CaseInsensitiveMap;
Expand Down Expand Up @@ -348,6 +351,7 @@ public synchronized void init(ConnectionInfo ci) {
initTraceSystem(ci); initTraceSystem(ci);
openDatabase(); openDatabase();
addShutdownHook(); addShutdownHook();
initDbObjectVersionTable();
} }


private void initTraceSystem(ConnectionInfo ci) { private void initTraceSystem(ConnectionInfo ci) {
Expand Down Expand Up @@ -1490,7 +1494,6 @@ public synchronized void removeSchemaObject(ServerSession session, SchemaObject
removeDatabaseObject(session, comment); removeDatabaseObject(session, comment);
} }
obj.getSchema().remove(obj); obj.getSchema().remove(obj);
int id = obj.getId();
if (!starting) { if (!starting) {
Table t = getDependentTable(obj, null); Table t = getDependentTable(obj, null);
if (t != null) { if (t != null) {
Expand All @@ -1499,7 +1502,7 @@ public synchronized void removeSchemaObject(ServerSession session, SchemaObject
} }
obj.removeChildrenAndResources(session); obj.removeChildrenAndResources(session);
} }
removeMeta(session, id); removeMeta(session, obj.getId());
} }


public void addPersistentMetaInfo(MetaTable mt, ArrayList<Row> rows) { public void addPersistentMetaInfo(MetaTable mt, ArrayList<Row> rows) {
Expand Down Expand Up @@ -2330,4 +2333,104 @@ public int[] getHostIds() {
hostIds = new int[0]; hostIds = new int[0];
return hostIds; return hostIds;
} }

private java.sql.PreparedStatement psGetVersion;
private java.sql.PreparedStatement psUpdateVersion;

private java.sql.PreparedStatement psAddTableAlterHistoryRecord;
private java.sql.PreparedStatement psGetTableAlterHistoryRecord;
private java.sql.PreparedStatement psDeleteTableAlterHistoryRecord;

private void initDbObjectVersionTable() {
try {
Connection conn = LealoneDatabase.getInstance().getInternalConnection();
Statement stmt = conn.createStatement();
stmt.execute("CREATE TABLE IF NOT EXISTS db_object_version (id int PRIMARY KEY, version int)");
stmt.execute("CREATE TABLE IF NOT EXISTS table_alter_history"
+ " (id int, version int, alter_type int, columns varchar, PRIMARY KEY(id, version))");
stmt.close();
psGetVersion = conn.prepareStatement("select version from db_object_version where id = ?");
psUpdateVersion = conn.prepareStatement("update db_object_version set version = ? where id = ?");

psAddTableAlterHistoryRecord = conn.prepareStatement("insert into table_alter_history values(?,?,?,?)");
psGetTableAlterHistoryRecord = conn
.prepareStatement("select id, version, alter_type, columns from table_alter_history "
+ "where id = ? and version between ? and ?");
psDeleteTableAlterHistoryRecord = conn.prepareStatement("delete from table_alter_history where id = ?");
} catch (SQLException e) {
throw DbException.convert(e);
}
}

public synchronized void addTableAlterHistoryRecord(int id, int version, int alterType, String columns) {
try {
psAddTableAlterHistoryRecord.setInt(1, id);
psAddTableAlterHistoryRecord.setInt(2, version);
psAddTableAlterHistoryRecord.setInt(3, alterType);
psAddTableAlterHistoryRecord.setString(4, columns);
psAddTableAlterHistoryRecord.executeUpdate();
} catch (SQLException e) {
throw DbException.convert(e);
}
}

public synchronized void deleteTableAlterHistoryRecord(int id) {
try {
psDeleteTableAlterHistoryRecord.setInt(1, id);
psDeleteTableAlterHistoryRecord.executeUpdate();
} catch (SQLException e) {
throw DbException.convert(e);
}
}

public synchronized ArrayList<TableAlterHistoryRecord> getTableAlterHistoryRecord(int id, int versionMin,
int versionMax) {
ArrayList<TableAlterHistoryRecord> records = new ArrayList<>();
if (psGetTableAlterHistoryRecord == null)
return records;
try {
psGetTableAlterHistoryRecord.setInt(1, id);
psGetTableAlterHistoryRecord.setInt(2, versionMin);
psGetTableAlterHistoryRecord.setInt(3, versionMax);
ResultSet rs = psGetTableAlterHistoryRecord.executeQuery();
while (rs.next()) {
records.add(new TableAlterHistoryRecord(rs.getInt(1), rs.getInt(2), rs.getInt(3), rs.getString(4)));
}
return records;
} catch (SQLException e) {
throw DbException.convert(e);
}
}

public synchronized void updateVersion(int id, int version) {
if (psUpdateVersion == null)
return;
try {
psUpdateVersion.setInt(1, version);
psUpdateVersion.setInt(2, id);
psUpdateVersion.executeUpdate();
} catch (SQLException e) {
throw DbException.convert(e);
}
}

public synchronized int getVersion(int id) {
if (psGetVersion == null)
return -1;
int version;
try {
psGetVersion.setInt(1, id);
ResultSet rs = psGetVersion.executeQuery();
if (rs.next()) {
version = rs.getInt(1);
} else {
version = 1;
updateVersion(id, version);
}
rs.close();
} catch (SQLException e) {
throw DbException.convert(e);
}
return version;
}
} }
Expand Up @@ -5,6 +5,7 @@
*/ */
package org.lealone.db.index; package org.lealone.db.index;


import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
Expand All @@ -22,6 +23,7 @@
import org.lealone.db.table.Column; import org.lealone.db.table.Column;
import org.lealone.db.table.IndexColumn; import org.lealone.db.table.IndexColumn;
import org.lealone.db.table.StandardTable; import org.lealone.db.table.StandardTable;
import org.lealone.db.table.TableAlterHistoryRecord;
import org.lealone.db.table.TableFilter; import org.lealone.db.table.TableFilter;
import org.lealone.db.value.Value; import org.lealone.db.value.Value;
import org.lealone.db.value.ValueArray; import org.lealone.db.value.ValueArray;
Expand All @@ -46,7 +48,7 @@ public class StandardPrimaryIndex extends IndexBase {


private final StandardTable table; private final StandardTable table;
private final String mapName; private final String mapName;
private final TransactionMap<Value, Value> dataMap; private final TransactionMap<Value, VersionedValue> dataMap;
private long lastKey; private long lastKey;
private int mainIndexColumn = -1; private int mainIndexColumn = -1;


Expand All @@ -61,12 +63,13 @@ public StandardPrimaryIndex(ServerSession session, StandardTable table) {
} }
ValueDataType keyType = new ValueDataType(null, null, null); ValueDataType keyType = new ValueDataType(null, null, null);
ValueDataType valueType = new ValueDataType(database, database.getCompareMode(), sortTypes); ValueDataType valueType = new ValueDataType(database, database.getCompareMode(), sortTypes);
VersionedValueType vvType = new VersionedValueType(valueType);


Storage storage = database.getStorage(table.getStorageEngine()); Storage storage = database.getStorage(table.getStorageEngine());
TransactionEngine transactionEngine = database.getTransactionEngine(); TransactionEngine transactionEngine = database.getTransactionEngine();
boolean isShardingMode = session.isShardingMode(); boolean isShardingMode = session.isShardingMode();
dataMap = transactionEngine.beginTransaction(false, isShardingMode).openMap(mapName, table.getMapType(), dataMap = transactionEngine.beginTransaction(false, isShardingMode).openMap(mapName, table.getMapType(),
keyType, valueType, storage, isShardingMode); keyType, vvType, storage, isShardingMode);


transactionEngine.addTransactionMap(dataMap); transactionEngine.addTransactionMap(dataMap);


Expand Down Expand Up @@ -113,8 +116,8 @@ public void add(ServerSession session, Row row) {
row.setKey(++lastKey); row.setKey(++lastKey);
} }
} else { } else {
long c = row.getValue(mainIndexColumn).getLong(); long k = row.getValue(mainIndexColumn).getLong();
row.setKey(c); row.setKey(k);
} }


if (table.getContainsLargeObject()) { if (table.getContainsLargeObject()) {
Expand All @@ -130,9 +133,9 @@ public void add(ServerSession session, Row row) {
} }
} }


TransactionMap<Value, Value> map = getMap(session); TransactionMap<Value, VersionedValue> map = getMap(session);
Value key = ValueLong.get(row.getKey()); Value key = ValueLong.get(row.getKey());
Value old = map.get(key); VersionedValue old = map.get(key);
if (old != null) { if (old != null) {
String sql = "PRIMARY KEY ON " + table.getSQL(); String sql = "PRIMARY KEY ON " + table.getSQL();
if (mainIndexColumn >= 0 && mainIndexColumn < indexColumns.length) { if (mainIndexColumn >= 0 && mainIndexColumn < indexColumns.length) {
Expand All @@ -143,7 +146,8 @@ public void add(ServerSession session, Row row) {
throw e; throw e;
} }
try { try {
map.put(key, ValueArray.get(row.getValueList())); VersionedValue value = new VersionedValue(row.getVersion(), ValueArray.get(row.getValueList()));
map.put(key, value);
} catch (IllegalStateException e) { } catch (IllegalStateException e) {
throw DbException.get(ErrorCode.CONCURRENT_UPDATE_1, e, table.getName()); throw DbException.get(ErrorCode.CONCURRENT_UPDATE_1, e, table.getName());
} }
Expand All @@ -160,9 +164,9 @@ public void remove(ServerSession session, Row row) {
} }
} }
} }
TransactionMap<Value, Value> map = getMap(session); TransactionMap<Value, VersionedValue> map = getMap(session);
try { try {
Value old = map.remove(ValueLong.get(row.getKey())); VersionedValue old = map.remove(ValueLong.get(row.getKey()));
if (old == null) { if (old == null) {
throw DbException.get(ErrorCode.ROW_NOT_FOUND_WHEN_DELETING_1, getSQL() + ": " + row.getKey()); throw DbException.get(ErrorCode.ROW_NOT_FOUND_WHEN_DELETING_1, getSQL() + ": " + row.getKey());
} }
Expand Down Expand Up @@ -198,15 +202,15 @@ public Cursor find(ServerSession session, SearchRow first, SearchRow last) {
max = v; max = v;
} }
} }
TransactionMap<Value, Value> map = getMap(session); TransactionMap<Value, VersionedValue> map = getMap(session);
return new StandardPrimaryIndexCursor(map.entryIterator(min), max); return new StandardPrimaryIndexCursor(session, table, this, map.entryIterator(min), max);
} }


@Override @Override
public Row getRow(ServerSession session, long key) { public Row getRow(ServerSession session, long key) {
TransactionMap<Value, Value> map = getMap(session); TransactionMap<Value, VersionedValue> map = getMap(session);
Value v = map.get(ValueLong.get(key)); VersionedValue v = map.get(ValueLong.get(key));
ValueArray array = (ValueArray) v; ValueArray array = v.value;
Row row = new Row(array.getList(), 0); Row row = new Row(array.getList(), 0);
row.setKey(key); row.setKey(key);
return row; return row;
Expand All @@ -230,15 +234,15 @@ public int getColumnIndex(Column col) {


@Override @Override
public void remove(ServerSession session) { public void remove(ServerSession session) {
TransactionMap<Value, Value> map = getMap(session); TransactionMap<Value, VersionedValue> map = getMap(session);
if (!map.isClosed()) { if (!map.isClosed()) {
map.remove(); map.remove();
} }
} }


@Override @Override
public void truncate(ServerSession session) { public void truncate(ServerSession session) {
TransactionMap<Value, Value> map = getMap(session); TransactionMap<Value, VersionedValue> map = getMap(session);
if (table.getContainsLargeObject()) { if (table.getContainsLargeObject()) {
database.getLobStorage().removeAllForTable(table.getId()); database.getLobStorage().removeAllForTable(table.getId());
} }
Expand All @@ -252,15 +256,16 @@ public boolean canGetFirstOrLast() {


@Override @Override
public Cursor findFirstOrLast(ServerSession session, boolean first) { public Cursor findFirstOrLast(ServerSession session, boolean first) {
TransactionMap<Value, Value> map = getMap(session); TransactionMap<Value, VersionedValue> map = getMap(session);
ValueLong v = (ValueLong) (first ? map.firstKey() : map.lastKey()); ValueLong v = (ValueLong) (first ? map.firstKey() : map.lastKey());
if (v == null) { if (v == null) {
return new StandardPrimaryIndexCursor(Collections.<Entry<Value, Value>> emptyList().iterator(), null); return new StandardPrimaryIndexCursor(session, table, this, Collections
.<Entry<Value, VersionedValue>> emptyList().iterator(), null);
} }
Value value = map.get(v); VersionedValue value = map.get(v);
Entry<Value, Value> e = new DataUtils.MapEntry<Value, Value>(v, value); Entry<Value, VersionedValue> e = new DataUtils.MapEntry<Value, VersionedValue>(v, value);
List<Entry<Value, Value>> list = Arrays.asList(e); List<Entry<Value, VersionedValue>> list = Arrays.asList(e);
StandardPrimaryIndexCursor c = new StandardPrimaryIndexCursor(list.iterator(), v); StandardPrimaryIndexCursor c = new StandardPrimaryIndexCursor(session, table, this, list.iterator(), v);
c.next(); c.next();
return c; return c;
} }
Expand All @@ -272,7 +277,7 @@ public boolean needRebuild() {


@Override @Override
public long getRowCount(ServerSession session) { public long getRowCount(ServerSession session) {
TransactionMap<Value, Value> map = getMap(session); TransactionMap<Value, VersionedValue> map = getMap(session);
return map.sizeAsLong(); return map.sizeAsLong();
} }


Expand Down Expand Up @@ -330,8 +335,8 @@ ValueLong getKey(SearchRow row, ValueLong ifEmpty, ValueLong ifNull) {
* @return the cursor * @return the cursor
*/ */
Cursor find(ServerSession session, ValueLong first, ValueLong last) { Cursor find(ServerSession session, ValueLong first, ValueLong last) {
TransactionMap<Value, Value> map = getMap(session); TransactionMap<Value, VersionedValue> map = getMap(session);
return new StandardPrimaryIndexCursor(map.entryIterator(first), last); return new StandardPrimaryIndexCursor(session, table, this, map.entryIterator(first), last);
} }


@Override @Override
Expand All @@ -345,7 +350,7 @@ public boolean isRowIdIndex() {
* @param session the session * @param session the session
* @return the map * @return the map
*/ */
TransactionMap<Value, Value> getMap(ServerSession session) { TransactionMap<Value, VersionedValue> getMap(ServerSession session) {
if (session == null) { if (session == null) {
return dataMap; return dataMap;
} }
Expand All @@ -366,12 +371,19 @@ boolean isInMemory() {
*/ */
private static class StandardPrimaryIndexCursor implements Cursor { private static class StandardPrimaryIndexCursor implements Cursor {


private final Iterator<Entry<Value, Value>> it; private final ServerSession session;
private final StandardTable table;
private final StandardPrimaryIndex index;
private final Iterator<Entry<Value, VersionedValue>> it;
private final ValueLong last; private final ValueLong last;
private Entry<Value, Value> current; private Entry<Value, VersionedValue> current;
private Row row; private Row row;


public StandardPrimaryIndexCursor(Iterator<Entry<Value, Value>> it, ValueLong last) { public StandardPrimaryIndexCursor(ServerSession session, StandardTable table, StandardPrimaryIndex index,
Iterator<Entry<Value, VersionedValue>> it, ValueLong last) {
this.session = session;
this.table = table;
this.index = index;
this.it = it; this.it = it;
this.last = last; this.last = last;
} }
Expand All @@ -380,9 +392,28 @@ public StandardPrimaryIndexCursor(Iterator<Entry<Value, Value>> it, ValueLong la
public Row get() { public Row get() {
if (row == null) { if (row == null) {
if (current != null) { if (current != null) {
ValueArray array = (ValueArray) current.getValue(); VersionedValue value = current.getValue();
row = new Row(array.getList(), 0); Value[] data = value.value.getList();
int version = value.vertion;
row = new Row(data, 0);
row.setKey(current.getKey().getLong()); row.setKey(current.getKey().getLong());
row.setVersion(version);

if (table.getVersion() != version) {
ArrayList<TableAlterHistoryRecord> records = table.getDatabase().getTableAlterHistoryRecord(
table.getId(), version, table.getVersion());
Value[] newValues = data;
for (TableAlterHistoryRecord record : records) {
newValues = record.redo(session, newValues);
}
if (newValues != data) {
index.remove(session, row);
row = new Row(newValues, 0);
row.setKey(current.getKey().getLong());
row.setVersion(table.getVersion());
index.add(session, row);
}
}
} }
} }
return row; return row;
Expand Down

0 comments on commit 28e7297

Please sign in to comment.