Skip to content
This repository has been archived by the owner on Sep 28, 2021. It is now read-only.

Commit

Permalink
do not use Response for both request entity and deser errors
Browse files Browse the repository at this point in the history
  • Loading branch information
rouzwawi committed Apr 4, 2016
1 parent fc64e83 commit 4d38163
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 103 deletions.
5 changes: 5 additions & 0 deletions apollo-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@
<artifactId>config</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>io.javaslang</groupId>
<artifactId>javaslang</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
Expand Down
4 changes: 4 additions & 0 deletions apollo-entity/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
<artifactId>apollo-api</artifactId>
</dependency>

<dependency>
<groupId>io.javaslang</groupId>
<artifactId>javaslang</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/
package com.spotify.apollo.entity;

import com.spotify.apollo.RequestContext;
import com.spotify.apollo.Response;
import com.spotify.apollo.Status;
import com.spotify.apollo.route.AsyncHandler;
Expand All @@ -34,6 +35,7 @@
import java.util.concurrent.CompletionStage;
import java.util.function.Function;

import javaslang.control.Either;
import okio.ByteString;

import static java.util.concurrent.CompletableFuture.completedFuture;
Expand Down Expand Up @@ -65,30 +67,32 @@ class CodecEntityMiddleware implements EntityMiddleware {

@Override
public <R> Middleware<SyncHandler<R>, SyncHandler<Response<ByteString>>>
serializerDirect(Class<? extends R> entityClass) {
serializerDirect(Class<? extends R> entityResponseClass) {
return inner -> inner
.map(this::ensureResponse)
.map(serialize(entityClass));
.map(Response::forPayload)
.map(serialize(entityResponseClass));
}

@Override
public <R> Middleware<SyncHandler<Response<R>>, SyncHandler<Response<ByteString>>>
serializerResponse(Class<? extends R> entityClass) {
return inner -> inner.map(serialize(entityClass));
serializerResponse(Class<? extends R> entityResponseClass) {
return inner -> inner
.map(serialize(entityResponseClass));
}

@Override
public <R> Middleware<AsyncHandler<R>, AsyncHandler<Response<ByteString>>>
asyncSerializerDirect(Class<? extends R> entityClass) {
asyncSerializerDirect(Class<? extends R> entityResponseClass) {
return inner -> inner
.map(this::ensureResponse)
.map(serialize(entityClass));
.map(Response::forPayload)
.map(serialize(entityResponseClass));
}

@Override
public <R> Middleware<AsyncHandler<Response<R>>, AsyncHandler<Response<ByteString>>>
asyncSerializerResponse(Class<? extends R> entityClass) {
return inner -> inner.map(serialize(entityClass));
asyncSerializerResponse(Class<? extends R> entityResponseClass) {
return inner -> inner
.map(serialize(entityResponseClass));
}

@Override
Expand All @@ -100,10 +104,8 @@ class CodecEntityMiddleware implements EntityMiddleware {
@Override
public <E, R> Middleware<EntityHandler<E, R>, SyncHandler<Response<ByteString>>>
direct(Class<? extends E> entityClass, Class<? extends R> entityResponseClass) {
return inner ->
deserialize(entityClass)
.flatMap(r -> ctx -> mapPayload(r, inner.invoke(ctx)))
.map(serialize(entityResponseClass));
return inner -> withEntity(inner.asResponseHandler(), entityClass)
.map(serialize(entityResponseClass));
}

@Override
Expand All @@ -115,10 +117,8 @@ class CodecEntityMiddleware implements EntityMiddleware {
@Override
public <E, R> Middleware<EntityResponseHandler<E, R>, SyncHandler<Response<ByteString>>>
response(Class<? extends E> entityClass, Class<? extends R> entityResponseClass) {
return inner ->
deserialize(entityClass)
.flatMap(r -> ctx -> flatMapPayload(r, inner.invoke(ctx)))
.map(serialize(entityResponseClass));
return inner -> withEntity(inner, entityClass)
.map(serialize(entityResponseClass));
}

@Override
Expand All @@ -130,10 +130,8 @@ class CodecEntityMiddleware implements EntityMiddleware {
@Override
public <E, R> Middleware<EntityAsyncHandler<E, R>, AsyncHandler<Response<ByteString>>>
asyncDirect(Class<? extends E> entityClass, Class<? extends R> entityResponseClass) {
return inner ->
Middleware.syncToAsync(deserialize(entityClass))
.flatMap(r -> ctx -> asyncMapPayload(r, inner.invoke(ctx)))
.map(serialize(entityResponseClass));
return inner -> withEntityAsync(inner.asResponseHandler(), entityClass)
.map(serialize(entityResponseClass));
}

@Override
Expand All @@ -145,39 +143,61 @@ class CodecEntityMiddleware implements EntityMiddleware {
@Override
public <E, R> Middleware<EntityAsyncResponseHandler<E, R>, AsyncHandler<Response<ByteString>>>
asyncResponse(Class<? extends E> entityClass, Class<? extends R> entityResponseClass) {
return inner ->
Middleware.syncToAsync(deserialize(entityClass))
.flatMap(r -> ctx -> asyncFlatMapPayload(r, inner.invoke(ctx)))
.map(serialize(entityResponseClass));
return inner -> withEntityAsync(inner, entityClass)
.map(serialize(entityResponseClass));
}

private <E> SyncHandler<Response<E>> deserialize(Class<? extends E> entityClass) {
return requestContext -> {
final Optional<ByteString> payloadOpt = requestContext.request().payload();
if (!payloadOpt.isPresent()) {
return Response.forStatus(
Status.BAD_REQUEST
.withReasonPhrase("Missing payload"));
}
private <E, R> SyncHandler<Response<R>> withEntity(
EntityResponseHandler<E, R> inner,
Class<? extends E> entityClass) {
return rc -> {
final Either<Response<?>, Response<R>> response =
deserialize(rc, entityClass)
.map(inner.invoke(rc));

final E entity;
try {
final ByteString byteString = payloadOpt.get();
entity = codec.read(byteString.toByteArray(), entityClass);
} catch (IOException e) {
LOG.warn("error", e);
return Response.forStatus(
Status.BAD_REQUEST
.withReasonPhrase("Payload parsing failed: " + e.getMessage()));
}
//noinspection unchecked
return response.getOrElseGet(left -> (Response<R>) left);
};
}

private <E, R> AsyncHandler<Response<R>> withEntityAsync(
EntityAsyncResponseHandler<E, R> inner,
Class<? extends E> entityClass) {
return rc -> {
final Either<Response<?>, CompletionStage<Response<R>>> response =
deserialize(rc, entityClass)
.map(inner.invoke(rc));

return Response.forPayload(entity);
//noinspection unchecked
return response.getOrElseGet(left -> completedFuture((Response<R>) left));
};
}

private <E> Function<Response<E>, Response<ByteString>> serialize(Class<? extends E> entityClass) {
private <E> Either<Response<?>, E> deserialize(RequestContext rc, Class<? extends E> entityClass) {
final Optional<ByteString> payloadOpt = rc.request().payload();
if (!payloadOpt.isPresent()) {
return Either.left(Response.forStatus(
Status.BAD_REQUEST
.withReasonPhrase("Missing payload")));
}

final E entity;
try {
final ByteString byteString = payloadOpt.get();
entity = codec.read(byteString.toByteArray(), entityClass);
} catch (IOException e) {
LOG.warn("error", e);
return Either.left(Response.forStatus(
Status.BAD_REQUEST
.withReasonPhrase("Payload parsing failed: " + e.getMessage())));
}

return Either.right(entity);
}

private <R> Function<Response<R>, Response<ByteString>> serialize(Class<? extends R> entityClass) {
return response -> {
final Optional<E> entityOpt = response.payload();
final Optional<R> entityOpt = response.payload();

if (!entityOpt.isPresent()) {
//noinspection unchecked
Expand All @@ -198,53 +218,4 @@ private <E> Function<Response<E>, Response<ByteString>> serialize(Class<? extend
.withHeader(CONTENT_TYPE, contentType);
};
}

private <E> Response<E> ensureResponse(E response) {
if (response instanceof Response) {
//noinspection unchecked
return (Response<E>) response;
}

return Response.forPayload(response);
}

private <E, R> Response<R> mapPayload(
Response<E> in,
Function<? super E, ? extends R> fn) {

//noinspection unchecked
return (in.payload().map(fn).isPresent())
? in.withPayload(in.payload().map(fn).get())
: (Response<R>) in;
}

private <E, R> Response<R> flatMapPayload(
Response<E> in,
Function<? super E, ? extends Response<? extends R>> fn) {

//noinspection unchecked
return (in.payload().isPresent())
? (Response<R>) fn.apply(in.payload().get()).withHeaders(in.headers())
: (Response<R>) in;
}

private <E, R> CompletionStage<Response<R>> asyncMapPayload(
Response<E> in,
Function<? super E, ? extends CompletionStage<? extends R>> fn) {

//noinspection unchecked
return (in.payload().isPresent())
? fn.apply(in.payload().get()).thenApply(in::withPayload)
: completedFuture((Response<R>) in);
}

private <E, R> CompletionStage<Response<R>> asyncFlatMapPayload(
Response<E> in,
Function<? super E, ? extends CompletionStage<? extends Response<? extends R>>> fn) {

//noinspection unchecked
return (in.payload().isPresent())
? fn.apply(in.payload().get()).thenApply(ret -> (Response<R>) ret.withHeaders(in.headers()))
: completedFuture((Response<R>) in);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,30 +76,38 @@ static EntityMiddleware jackson(ObjectMapper objectMapper, String contentType) {
asyncResponse(Class<? extends E> entityClass, Class<? extends R> entityResponseClass);

<R> Middleware<SyncHandler<R>, SyncHandler<Response<ByteString>>>
serializerDirect(Class<? extends R> entityClass);
serializerDirect(Class<? extends R> entityResponseClass);

<R> Middleware<SyncHandler<Response<R>>, SyncHandler<Response<ByteString>>>
serializerResponse(Class<? extends R> entityClass);
serializerResponse(Class<? extends R> entityResponseClass);

<R> Middleware<AsyncHandler<R>, AsyncHandler<Response<ByteString>>>
asyncSerializerDirect(Class<? extends R> entityClass);
asyncSerializerDirect(Class<? extends R> entityResponseClass);

<R> Middleware<AsyncHandler<Response<R>>, AsyncHandler<Response<ByteString>>>
asyncSerializerResponse(Class<? extends R> entityClass);
asyncSerializerResponse(Class<? extends R> entityResponseClass);

interface EntityHandler<E, R>
extends SyncHandler<Function<? super E, ? extends R>> {

default EntityResponseHandler<E, R> asResponseHandler() {
return rc -> e -> invoke(rc).andThen(Response::forPayload).apply(e);
}
}

interface EntityResponseHandler<E, R>
extends SyncHandler<Function<? super E, ? extends Response<? extends R>>> {
extends SyncHandler<Function<? super E, ? extends Response<R>>> {
}

interface EntityAsyncHandler<E, R>
extends SyncHandler<Function<? super E, ? extends CompletionStage<? extends R>>> {
extends SyncHandler<Function<? super E, ? extends CompletionStage<R>>> {

default EntityAsyncResponseHandler<E, R> asResponseHandler() {
return rc -> e -> invoke(rc).andThen(s -> s.thenApply(Response::forPayload)).apply(e);
}
}

interface EntityAsyncResponseHandler<E, R>
extends SyncHandler<Function<? super E, ? extends CompletionStage<? extends Response<? extends R>>>> {
extends SyncHandler<Function<? super E, ? extends CompletionStage<Response<R>>>> {
}
}

0 comments on commit 4d38163

Please sign in to comment.