Skip to content

Commit

Permalink
避免创建两个AggregateFunction实例
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed Jun 15, 2015
1 parent 8a37ef0 commit 5bee0ff
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 9 deletions.
12 changes: 6 additions & 6 deletions lealone-sql/src/main/java/org/lealone/command/Parser.java
Expand Up @@ -4877,12 +4877,12 @@ protected Table readTableOrView(String tableName) {
}
}
}
//TODO 在分布式环境下,如果先在一个JVM上执行create table,再执行insert这样的dml,
//或者执行create table和insert的是不同JVM,这时由于表的元数据未及时更新到执行insert的JVM,
//所以有可能出现此异常,因为不同JVM上的表元数据通过zookeeper异步更新,
//有可能执行create table的线程很快结束了,但是zookeeper还未通知,这时insert时就找不到表。
//对于这种情况,client重视即可解决,不过还有没有更好的办法呢?
//client在使用h2作为内存数据库对SQL预解析时也会碰到这样的情况。
// TODO 在分布式环境下,如果先在一个JVM上执行create table,再执行insert这样的dml,
// 或者执行create table和insert的是不同JVM,这时由于表的元数据未及时更新到执行insert的JVM,
// 所以有可能出现此异常,因为不同JVM上的表元数据通过zookeeper异步更新,
// 有可能执行create table的线程很快结束了,但是zookeeper还未通知,这时insert时就找不到表。
// 对于这种情况,client重视即可解决,不过还有没有更好的办法呢?
// client在使用h2作为内存数据库对SQL预解析时也会碰到这样的情况。
throw DbException.get(ErrorCode.TABLE_OR_VIEW_NOT_FOUND_1, tableName);
}

Expand Down
Expand Up @@ -30,6 +30,7 @@ public CreateAggregate(Session session) {
super(session);
}

@Override
public int update() {
session.commit(true);
session.getUser().checkAdmin();
Expand Down Expand Up @@ -66,6 +67,7 @@ public void setForce(boolean force) {
this.force = force;
}

@Override
public int getType() {
return CommandInterface.CREATE_AGGREGATE;
}
Expand Down
Expand Up @@ -26,6 +26,7 @@ public DropAggregate(Session session) {
super(session);
}

@Override
public int update() {
session.getUser().checkAdmin();
session.commit(true);
Expand All @@ -49,6 +50,7 @@ public void setIfExists(boolean ifExists) {
this.ifExists = ifExists;
}

@Override
public int getType() {
return CommandInterface.DROP_AGGREGATE;
}
Expand Down
Expand Up @@ -45,29 +45,35 @@ public AggregateFunction getInstance() {
}
}

@Override
public String getCreateSQLForCopy(Table table, String quotedName) {
throw DbException.throwInternalError();
}

@Override
public String getDropSQL() {
return "DROP AGGREGATE IF EXISTS " + getSQL();
}

@Override
public String getCreateSQL() {
return "CREATE FORCE AGGREGATE " + getSQL() + " FOR " + Parser.quoteIdentifier(className);
}

@Override
public int getType() {
return DbObject.AGGREGATE;
}

@Override
public synchronized void removeChildrenAndResources(Session session) {
database.removeMeta(session, getId());
className = null;
javaClass = null;
invalidate();
}

@Override
public void checkRename() {
throw DbException.getUnsupportedException("AGGREGATE");
}
Expand Down
Expand Up @@ -36,13 +36,15 @@ public class JavaAggregate extends Expression {
private int dataType;
private Connection userConnection;
private int lastGroupRowId;
private AggregateFunction aggregateFunction;

public JavaAggregate(UserAggregate userAggregate, Expression[] args, Select select) {
this.userAggregate = userAggregate;
this.args = args;
this.select = select;
}

@Override
public int getCost() {
int cost = 5;
for (Expression e : args) {
Expand All @@ -51,18 +53,22 @@ public int getCost() {
return cost;
}

@Override
public long getPrecision() {
return Integer.MAX_VALUE;
}

@Override
public int getDisplaySize() {
return Integer.MAX_VALUE;
}

@Override
public int getScale() {
return DataType.getDataType(dataType).defaultScale;
}

@Override
public String getSQL(boolean isDistributed) {
StatementBuilder buff = new StatementBuilder();
buff.append(Parser.quoteIdentifier(userAggregate.getName())).append('(');
Expand All @@ -73,10 +79,12 @@ public String getSQL(boolean isDistributed) {
return buff.append(')').toString();
}

@Override
public int getType() {
return dataType;
}

@Override
public boolean isEverything(ExpressionVisitor visitor) {
switch (visitor.getType()) {
case ExpressionVisitor.DETERMINISTIC:
Expand All @@ -98,12 +106,14 @@ public boolean isEverything(ExpressionVisitor visitor) {
return true;
}

@Override
public void mapColumns(ColumnResolver resolver, int level) {
for (Expression arg : args) {
arg.mapColumns(resolver, level);
}
}

@Override
public Expression optimize(Session session) {
userConnection = session.createConnection(false);
int len = args.length;
Expand All @@ -125,18 +135,22 @@ public Expression optimize(Session session) {
return this;
}

@Override
public void setEvaluatable(TableFilter tableFilter, boolean b) {
for (Expression e : args) {
e.setEvaluatable(tableFilter, b);
}
}

private AggregateFunction getInstance() throws SQLException {
AggregateFunction agg = userAggregate.getInstance();
agg.init(userConnection);
return agg;
if (aggregateFunction == null) {
aggregateFunction = userAggregate.getInstance();
aggregateFunction.init(userConnection);
}
return aggregateFunction;
}

@Override
public Value getValue(Session session) {
HashMap<Expression, Object> group = select.getCurrentGroup();
if (group == null) {
Expand All @@ -157,6 +171,7 @@ public Value getValue(Session session) {
}
}

@Override
public void updateAggregate(Session session) {
HashMap<Expression, Object> group = select.getCurrentGroup();
if (group == null) {
Expand Down

0 comments on commit 5bee0ff

Please sign in to comment.