Skip to content

Commit

Permalink
优化事务实现,直接使用MVMap读写事务状态表不通过JDBC接口
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed Jan 9, 2015
1 parent dfc1b31 commit 12d1473
Show file tree
Hide file tree
Showing 9 changed files with 360 additions and 123 deletions.
Expand Up @@ -206,7 +206,7 @@ public void init(String... args) {
Driver.load();

DatabaseEngine.init(baseDir);
TransactionManager.init();
TransactionManager.init(baseDir);
}

@Override
Expand Down
Expand Up @@ -28,7 +28,7 @@ public class TransactionTest extends TestBase {

@Test
public void run() throws Exception {
//create();
create();
// insert();
// select();

Expand Down
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.lealone.transaction;

import org.lealone.engine.Session;

public class LocalTransaction extends TransactionBase {
public LocalTransaction(Session session) {
super(session);
}
}
Expand Up @@ -17,16 +17,11 @@
*/
package org.lealone.transaction;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.concurrent.atomic.AtomicLong;

import org.lealone.engine.Constants;
import org.lealone.engine.SystemDatabase;
import org.lealone.message.DbException;
import org.lealone.util.JdbcUtils;
import org.lealone.mvstore.MVMap;
import org.lealone.mvstore.MVStore;

public class TimestampServiceTable {

Expand All @@ -36,71 +31,94 @@ private TimestampServiceTable() {
private static final long TIMESTAMP_BATCH = Long.valueOf(System.getProperty(Constants.PROJECT_NAME_PREFIX
+ "transaction.timestamp.batch", "100000"));

private static PreparedStatement updateLastMaxTimestamp;
private static PreparedStatement getLastMaxTimestamp;
// private static PreparedStatement updateLastMaxTimestamp;
// private static PreparedStatement getLastMaxTimestamp;

private static long first;
private static long last;
private static final AtomicLong last = new AtomicLong();
private static long maxTimestamp;

public static synchronized void init() {
if (getLastMaxTimestamp != null)
private static MVMap<String, Long> map;

// public static synchronized void init() {
// if (getLastMaxTimestamp != null)
// return;
//
// createTableIfNotExists();
//
// first = last = maxTimestamp = getLastMaxTimestamp();
// addBatch();
// }
//
// private static void createTableIfNotExists() {
// ResultSet rs = null;
// Statement stmt = null;
// Connection conn = SystemDatabase.getConnection();
// try {
// stmt = conn.createStatement();
// stmt.execute("CREATE TABLE IF NOT EXISTS timestamp_service_table" //
// + "(last_max_timestamp BIGINT PRIMARY KEY)");
//
// updateLastMaxTimestamp = conn.prepareStatement("UPDATE timestamp_service_table SET last_max_timestamp = ?");
// getLastMaxTimestamp = conn.prepareStatement("SELECT last_max_timestamp FROM timestamp_service_table");
//
// rs = getLastMaxTimestamp.executeQuery();
// if (!rs.next()) {
// stmt.executeUpdate("INSERT INTO timestamp_service_table VALUES(0)");
// }
// } catch (SQLException e) {
// throw DbException.convert(e);
// } finally {
// JdbcUtils.closeSilently(rs);
// JdbcUtils.closeSilently(stmt);
// }
// }

// private static void updateLastMaxTimestamp0(long lastMaxTimestamp) {
// try {
// updateLastMaxTimestamp.setLong(1, lastMaxTimestamp);
// updateLastMaxTimestamp.executeUpdate();
// } catch (SQLException e) {
// throw DbException.convert(e);
// }
// }

// private static long getLastMaxTimestamp0() {
// long lastMaxTimestamp = 0;
// ResultSet rs = null;
// try {
// rs = getLastMaxTimestamp.executeQuery();
// if (rs.next()) {
// lastMaxTimestamp = rs.getLong(1);
// }
// } catch (SQLException e) {
// throw DbException.convert(e);
// } finally {
// JdbcUtils.closeSilently(rs);
// }
//
// return lastMaxTimestamp;
// }
public static synchronized void init(MVStore store) {
if (map != null)
return;

createTableIfNotExists();
map = store.openMap("timestampServiceTable", new MVMap.Builder<String, Long>());

first = last = maxTimestamp = getLastMaxTimestamp();
first = maxTimestamp = getLastMaxTimestamp();
last.set(first);
addBatch();
}

private static void createTableIfNotExists() {
ResultSet rs = null;
Statement stmt = null;
Connection conn = SystemDatabase.getConnection();
try {
stmt = conn.createStatement();
stmt.execute("CREATE TABLE IF NOT EXISTS timestamp_service_table" //
+ "(last_max_timestamp BIGINT PRIMARY KEY)");

updateLastMaxTimestamp = conn.prepareStatement("UPDATE timestamp_service_table SET last_max_timestamp = ?");
getLastMaxTimestamp = conn.prepareStatement("SELECT last_max_timestamp FROM timestamp_service_table");

rs = getLastMaxTimestamp.executeQuery();
if (!rs.next()) {
stmt.executeUpdate("INSERT INTO timestamp_service_table VALUES(0)");
}
} catch (SQLException e) {
throw DbException.convert(e);
} finally {
JdbcUtils.closeSilently(rs);
JdbcUtils.closeSilently(stmt);
}
}

private static void updateLastMaxTimestamp(long lastMaxTimestamp) {
try {
updateLastMaxTimestamp.setLong(1, lastMaxTimestamp);
updateLastMaxTimestamp.executeUpdate();
} catch (SQLException e) {
throw DbException.convert(e);
}
map.put("1", lastMaxTimestamp);
}

private static long getLastMaxTimestamp() {
long lastMaxTimestamp = 0;
ResultSet rs = null;
try {
rs = getLastMaxTimestamp.executeQuery();
if (rs.next()) {
lastMaxTimestamp = rs.getLong(1);
}
} catch (SQLException e) {
throw DbException.convert(e);
} finally {
JdbcUtils.closeSilently(rs);
}

return lastMaxTimestamp;
Long lastMaxTimestamp = map.get("1");
if (lastMaxTimestamp == null)
return 0;
return lastMaxTimestamp.longValue();
}

private static void addBatch() {
Expand All @@ -109,37 +127,56 @@ private static void addBatch() {
}

public synchronized static void reset() {
first = last = maxTimestamp = 0;
first = maxTimestamp = 0;
last.set(first);
updateLastMaxTimestamp(0);
addBatch();
}

//事务用奇数版本号
public synchronized static long nextOdd() {
if (last >= maxTimestamp)
addBatch();
public static long nextOdd() {
if (last.get() >= maxTimestamp) {
synchronized (TimestampServiceTable.class) {
addBatch();
}
}

long oldLast;
long last;
long delta;
if (last % 2 == 0)
delta = 1;
else
delta = 2;

last += delta;
do {
oldLast = TimestampServiceTable.last.get();
last = oldLast;
if (last % 2 == 0)
delta = 1;
else
delta = 2;

last += delta;
} while (!TimestampServiceTable.last.compareAndSet(oldLast, last));
return last;
}

//非事务用偶数版本号
public synchronized static long nextEven() {
if (last >= maxTimestamp)
addBatch();
public static long nextEven() {
if (last.get() >= maxTimestamp) {
synchronized (TimestampServiceTable.class) {
addBatch();
}
}

long oldLast;
long last;
long delta;
if (last % 2 == 0)
delta = 2;
else
delta = 1;
last += delta;
do {
oldLast = TimestampServiceTable.last.get();
last = oldLast;
if (last % 2 == 0)
delta = 2;
else
delta = 1;
last += delta;
} while (!TimestampServiceTable.last.compareAndSet(oldLast, last));
return last;
}

Expand All @@ -148,6 +185,6 @@ public static long first() {
}

public static String toS() {
return "TimestampService(first: " + first + ", last: " + last + ", max: " + maxTimestamp + ")";
return "TimestampServiceTable(first: " + first + ", last: " + last + ", max: " + maxTimestamp + ")";
}
}

0 comments on commit 12d1473

Please sign in to comment.