Skip to content

Commit

Permalink
Code: Improve count optimize, fix #196
Browse files Browse the repository at this point in the history
  • Loading branch information
minborg committed Oct 17, 2016
1 parent 4376ec6 commit 27abcac
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 86 deletions.
Expand Up @@ -49,95 +49,104 @@
import java.util.stream.Stream;

import static com.speedment.common.invariant.NullUtil.requireNonNulls;
import com.speedment.common.logger.Logger;
import com.speedment.common.logger.LoggerManager;
import static com.speedment.runtime.config.util.DocumentDbUtil.isSame;
import static com.speedment.runtime.core.internal.db.AsynchronousQueryResultImpl.LOGGER_SELECT_NAME;
import java.util.List;
import static java.util.Objects.requireNonNull;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toMap;

/**
* Default implementation of the {@link SqlStreamSupplier}-interface.
*
* @author Emil Forslund
* @since 3.0.1
*
* @author Emil Forslund
* @since 3.0.1
*/
final class SqlStreamSupplierImpl<ENTITY> implements SqlStreamSupplier<ENTITY> {


private static final Logger LOGGER_SELECT = LoggerManager.getLogger(LOGGER_SELECT_NAME); // Hold an extra reference to this logger

private final SqlFunction<ResultSet, ENTITY> entityMapper;
private final Dbms dbms;
private final DbmsType dbmsType;
private final Map<ColumnIdentifier<ENTITY>, String> columnNameMap;
private final String sqlSelect;
private final String sqlSelectCount;
private final String sqlTableReference;

SqlStreamSupplierImpl(
TableIdentifier<ENTITY> tableId,
SqlFunction<ResultSet, ENTITY> entityMapper,
ProjectComponent projectComponent,
DbmsHandlerComponent dbmsHandlerComponent,
ManagerComponent managerComponent) {
TableIdentifier<ENTITY> tableId,
SqlFunction<ResultSet, ENTITY> entityMapper,
ProjectComponent projectComponent,
DbmsHandlerComponent dbmsHandlerComponent,
ManagerComponent managerComponent) {

requireNonNulls(tableId, projectComponent, dbmsHandlerComponent);

this.entityMapper = requireNonNull(entityMapper);

final Project project = projectComponent.getProject();
final Table table = DocumentDbUtil.referencedTable(project, tableId);
this.dbms = DocumentDbUtil.referencedDbms(project, tableId);
final Table table = DocumentDbUtil.referencedTable(project, tableId);

this.dbms = DocumentDbUtil.referencedDbms(project, tableId);
this.dbmsType = DatabaseUtil.dbmsTypeOf(dbmsHandlerComponent, dbms);

@SuppressWarnings("unchecked")
final Manager<ENTITY> manager = (Manager<ENTITY>) managerComponent.stream()
.filter(m -> tableId.equals(m.getTableIdentifier()))
.findAny().orElseThrow(() -> new SpeedmentException(
"Could not find any manager for table '" + tableId + "'."
));
"Could not find any manager for table '" + tableId + "'."
));

final DatabaseNamingConvention naming = dbmsType.getDatabaseNamingConvention();
final String sqlColumnList = table.columns()
.filter(Column::isEnabled)
.map(Column::getName)
.map(naming::encloseField)
.collect(joining(","));

this.sqlTableReference = naming.fullNameOf(table);
this.sqlSelect = "SELECT " + sqlColumnList + " FROM " + sqlTableReference;
this.sqlSelectCount = "SELECT COUNT(*) FROM " + sqlTableReference;

this.columnNameMap = manager.fields()
.filter(f -> f.findColumn(project)
.map(c -> c.getParent())
.map(t -> isSame(table, t.get()))
.orElse(false)
.map(c -> c.getParent())
.map(t -> isSame(table, t.get()))
.orElse(false)
)
.map(Field::identifier)
.collect(toMap(identity(), naming::fullNameOf));
}

@Override
public Stream<ENTITY> stream(ParallelStrategy parallelStrategy) {
final AsynchronousQueryResult<ENTITY> asynchronousQueryResult =
dbmsType.getOperationHandler().executeQueryAsync(
dbms,
sqlSelect,
Collections.emptyList(),
entityMapper,
final AsynchronousQueryResult<ENTITY> asynchronousQueryResult
= dbmsType.getOperationHandler().executeQueryAsync(
dbms,
sqlSelect,
Collections.emptyList(),
entityMapper,
parallelStrategy
);

final SqlStreamTerminator<ENTITY> terminator = new SqlStreamTerminator<>(
dbmsType,
sqlSelect,
this::sqlCount,
this::sqlColumnNamer,
dbmsType,
sqlSelect,
sqlSelectCount,
this::executeAndGetLong,
this::sqlColumnNamer,
asynchronousQueryResult
);
final Supplier<BaseStream<?, ?>> initialSupplier =
() -> asynchronousQueryResult.stream();

final Supplier<BaseStream<?, ?>> initialSupplier
= () -> asynchronousQueryResult.stream();

final Stream<ENTITY> result = new ReferenceStreamBuilder<>(
new PipelineImpl<>(initialSupplier),
new PipelineImpl<>(initialSupplier),
terminator
);

Expand All @@ -154,15 +163,20 @@ public <V extends Comparable<? super V>> Optional<ENTITY> findAny(HasComparableO
.findAny();
}

private long sqlCount() {
public String getSqlTableReference() {
return sqlTableReference;
}

public long executeAndGetLong(String sql, List<Object> values) {
LOGGER_SELECT.debug("%s, values:%s", sql, values);
return dbmsType.getOperationHandler().executeQuery(dbms,
"SELECT COUNT(*) FROM " + sqlTableReference,
Collections.emptyList(),
sql,
values,
rs -> rs.getLong(1)
).findAny().get();
}

private String sqlColumnNamer(Field<ENTITY> field) {
return columnNameMap.get(field.identifier());
}
}
}

0 comments on commit 27abcac

Please sign in to comment.