Skip to content

Commit

Permalink
Change queries to ones that encourage quickly transforming results so…
Browse files Browse the repository at this point in the history
… that connections can be closed

 Allow a "streaming" version of queries to use the old style of streaming rows directly
  • Loading branch information
rchodava committed Oct 5, 2016
1 parent 3563d82 commit 0d29261
Show file tree
Hide file tree
Showing 17 changed files with 102 additions and 46 deletions.
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
</dependency>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP-java6</artifactId>
<artifactId>HikariCP</artifactId>
</dependency>
<dependency>
<groupId>io.reactivex</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.functions.Func1;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;

/**
* @author Ravi Chodavarapu (rchodava@gmail.com)
Expand Down Expand Up @@ -113,13 +115,13 @@ public void migrate() {
}

@Override
public Observable<Row> query(String sql) {
return getDatabase().select(sql).get(resultSet -> new RowImpl(resultSet));
public ResultBuilder query(String sql) {
return new Results(getDatabase().select(sql).get(resultSet -> new RowImpl(resultSet)));
}

@Override
public Observable<Row> query(String sql, Object... parameters) {
return getDatabase().select(sql).parameters(parameters).get(resultSet -> new RowImpl(resultSet));
public ResultBuilder query(String sql, Object... parameters) {
return new Results(getDatabase().select(sql).parameters(parameters).get(resultSet -> new RowImpl(resultSet)));
}

@Override
Expand Down Expand Up @@ -186,4 +188,27 @@ public void close() {
wrapped.close();
}
}

private static class Results implements ResultBuilder {
private final Observable<Row> results;

public Results(Observable<Row> results) {
this.results = results;
}

@Override
public <T> Observable<List<T>> getAs(Func1<Row, T> transformer) {
return stream().map(transformer).toList();
}

@Override
public <T> Observable<T> getFirstAs(Func1<Row, T> transformer) {
return stream().map(transformer).first();
}

@Override
public Observable<Row> stream() {
return results;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package foundation.stack.datamill;
package foundation.stack.datamill.db;

/**
* @author Ravi Chodavarapu (rchodava@gmail.com)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package foundation.stack.datamill.db;

import foundation.stack.datamill.LimitBuilder;
import foundation.stack.datamill.reflection.Outline;

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package foundation.stack.datamill.db;

import rx.Observable;

/**
* @author Ravi Chodavarapu (rchodava@gmail.com)
*/
public interface QueryRunner {
Observable<Row> query(String sql);
Observable<Row> query(String sql, Object... parameters);
ResultBuilder query(String sql);
ResultBuilder query(String sql, Object... parameters);
UpdateQueryExecution update(String sql, Object... parameters);
}
15 changes: 15 additions & 0 deletions core/src/main/java/foundation/stack/datamill/db/ResultBuilder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package foundation.stack.datamill.db;

import rx.Observable;
import rx.functions.Func1;

import java.util.List;

/**
* @author Ravi Chodavarapu (rchodava@gmail.com)
*/
public interface ResultBuilder {
<T> Observable<List<T>> getAs(Func1<Row, T> transformer);
<T> Observable<T> getFirstAs(Func1<Row, T> transformer);
Observable<Row> stream();
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package foundation.stack.datamill.db;

import foundation.stack.datamill.reflection.Outline;
import rx.Observable;

/**
* @author Ravi Chodavarapu (rchodava@gmail.com)
*/
public interface SelectBuilder {
SelectWhereBuilder<Observable<Row>> from(String table);
SelectWhereBuilder<Observable<Row>> from(Outline<?> outline);
SelectWhereBuilder<ResultBuilder> from(String table);
SelectWhereBuilder<ResultBuilder> from(Outline<?> outline);
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package foundation.stack.datamill.db;

import foundation.stack.datamill.LimitBuilder;
import foundation.stack.datamill.reflection.Member;

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package foundation.stack.datamill.db;

import foundation.stack.datamill.LimitBuilder;

import java.util.Map;
import java.util.function.Function;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package foundation.stack.datamill.db;

import foundation.stack.datamill.LimitBuilder;
import foundation.stack.datamill.reflection.Outline;
import rx.functions.Func1;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package foundation.stack.datamill.db.impl;

import com.google.common.base.Joiner;
import foundation.stack.datamill.LimitBuilder;
import foundation.stack.datamill.db.LimitBuilder;
import foundation.stack.datamill.db.*;
import foundation.stack.datamill.reflection.Member;
import foundation.stack.datamill.reflection.Outline;
Expand Down Expand Up @@ -219,13 +219,13 @@ public UpdateQueryExecution execute() {
}
}

private class SelectWhereClause extends WhereBuilderImpl<Observable<Row>> {
private class SelectWhereClause extends WhereBuilderImpl<ResultBuilder> {
public SelectWhereClause(StringBuilder query) {
super(query);
}

@Override
public Observable<Row> execute() {
public ResultBuilder execute() {
if (!parameters.isEmpty()) {
return QueryBuilderImpl.this.query(query.toString(), parameters.toArray(new Object[parameters.size()]));
} else {
Expand All @@ -248,14 +248,14 @@ public SelectQuery(Iterable<String> columns) {
}

@Override
public SelectWhereBuilder<Observable<Row>> from(String table) {
public SelectWhereBuilder<ResultBuilder> from(String table) {
query.append(SqlSyntax.SQL_FROM);
query.append(table);
return new SelectWhereClause(query);
}

@Override
public SelectWhereBuilder<Observable<Row>> from(Outline<?> outline) {
public SelectWhereBuilder<ResultBuilder> from(Outline<?> outline) {
return from(outline.pluralName());
}
}
Expand Down Expand Up @@ -295,8 +295,8 @@ public InsertBuilder insertInto(Outline<?> outline) {
return insertInto(outline.pluralName());
}

protected abstract Observable<Row> query(String query);
protected abstract Observable<Row> query(String query, Object... parameters);
protected abstract ResultBuilder query(String query);
protected abstract ResultBuilder query(String query, Object... parameters);
protected abstract UpdateQueryExecution update(String query, Object... parameters);

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
package foundation.stack.datamill.db.test;

import foundation.stack.datamill.db.DatabaseClient;
import foundation.stack.datamill.db.ResultBuilder;
import foundation.stack.datamill.db.Row;
import foundation.stack.datamill.db.UpdateQueryExecution;
import rx.Observable;
import rx.functions.Func1;

import java.util.List;

/**
* @author Ravi Chodavarapu (rchodava@gmail.com)
*/
public class TestDatabaseClient extends DatabaseClient {
private static final Object[] EMPTY_ARRAY = new Object[0];

private final Database database;

public TestDatabaseClient(Database database) {
Expand Down Expand Up @@ -38,13 +44,28 @@ public void migrate() {
}

@Override
public Observable<Row> query(String sql) {
return database.query(sql);
public ResultBuilder query(String sql) {
return this.query(sql, EMPTY_ARRAY);
}

@Override
public Observable<Row> query(String sql, Object... parameters) {
return database.query(sql, parameters);
public ResultBuilder query(String sql, Object... parameters) {
return new ResultBuilder() {
@Override
public <T> Observable<List<T>> getAs(Func1<Row, T> transformer) {
return stream().map(transformer).toList();
}

@Override
public <T> Observable<T> getFirstAs(Func1<Row, T> transformer) {
return stream().map(transformer).first();
}

@Override
public Observable<Row> stream() {
return database.query(sql, parameters);
}
};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package foundation.stack.datamill.http.impl;

import com.google.common.collect.Multimap;
import foundation.stack.datamill.http.Body;
import foundation.stack.datamill.http.Response;
import foundation.stack.datamill.http.Route;
import foundation.stack.datamill.http.ServerRequest;
import foundation.stack.datamill.http.*;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
Expand Down Expand Up @@ -118,14 +115,19 @@ private void processRequest(ChannelHandlerContext context, HttpRequest originalR
if (responseObservable != null) {
threadPool.execute(() -> {
Response response = responseObservable.onErrorResumeNext(throwable -> {
Observable<Response> errorResponse = errorResponseConstructor.apply(serverRequest, throwable);
if (errorResponse != null) {
logger.debug("Error occurred handling request, invoking application error handler");
return errorResponse.onErrorResumeNext(Observable.just(null));
if (errorResponseConstructor != null) {
Observable<Response> errorResponse =
errorResponseConstructor.apply(serverRequest, throwable);
if (errorResponse != null) {
logger.debug("Error occurred handling request, invoking application error handler");
return errorResponse.onErrorResumeNext(Observable.just(null));
}
} else {
logger.debug("Error occurred handling request - no application error handler was available to handle it - {}", throwable);
}

return Observable.just(null);
}).toBlocking().lastOrDefault(null);
return Observable.just(new ResponseImpl(Status.INTERNAL_SERVER_ERROR));
}).toBlocking().lastOrDefault(new ResponseImpl(Status.NOT_FOUND));

sendResponse(context, originalRequest, response);
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package foundation.stack.datamill.db.impl;

import com.google.common.collect.ImmutableMap;
import foundation.stack.datamill.db.ResultBuilder;
import foundation.stack.datamill.db.Row;
import foundation.stack.datamill.db.UpdateQueryExecution;
import foundation.stack.datamill.reflection.Outline;
Expand Down Expand Up @@ -77,15 +78,15 @@ public boolean getLastWasUpdate() {
}

@Override
protected Observable<Row> query(String query) {
protected ResultBuilder query(String query) {
lastQuery = query;
lastWasUpdate = false;
lastParameters = new Object[0];
return null;
}

@Override
protected Observable<Row> query(String query, Object... parameters) {
protected ResultBuilder query(String query, Object... parameters) {
lastQuery = query;
lastParameters = parameters;
lastWasUpdate = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void databaseMethodsInvoked() {
client.changeCatalog("catalog");
verify(database).changeCatalog("catalog");

client.query("SELECT * FROM table");
client.query("SELECT * FROM table").stream();
verify(database).query("SELECT * FROM table", new Object[0]);

client.update("UPDATE table SET column = NULL", new Object[0]).count();
Expand All @@ -90,7 +90,7 @@ public void rowBuilding() {
client.select(testModelOutline.member(m -> m.getProperty()))
.from(testModelOutline)
.all()
.map(row -> testModelOutline.wrap(new TestModel())
.getFirstAs(row -> testModelOutline.wrap(new TestModel())
.set(m -> m.getProperty(), row.column(testModelOutline.member(m -> m.getProperty())))
.set(m -> m.getStringProperty(), row.column(testModelOutline.member(m -> m.getStringProperty())))
.unwrap())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void checkDatabaseRowExists(String tableName, String databaseUrl, String
}

private Observable<Row> executeSelect(String resolvedUrl, String sql, Object... parameters) {
return new DatabaseClient(resolvedUrl).query(sql, parameters);
return new DatabaseClient(resolvedUrl).query(sql, parameters).stream();
}

protected String buildQuery(String tableName, String testFragment, String criteriaFragment) {
Expand Down
4 changes: 2 additions & 2 deletions parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@
</dependency>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP-java6</artifactId>
<version>2.3.2</version>
<artifactId>HikariCP</artifactId>
<version>2.5.1</version>
</dependency>
<dependency>
<groupId>io.reactivex</groupId>
Expand Down

0 comments on commit 0d29261

Please sign in to comment.