diff --git a/pom.xml b/pom.xml index 30dca1b..0cfcb5d 100644 --- a/pom.xml +++ b/pom.xml @@ -24,7 +24,7 @@ src/main/resources org.prebid.cache.PBCacheApplication - 5.0.3 + 5.3.1 0 @@ -55,6 +55,8 @@ 2.4.3 4.1.6 3.1.6.RELEASE + 2.11.0 + 0.3.1 @@ -153,6 +155,18 @@ reactor-test test + + com.github.tomakehurst + wiremock + ${wiremock.version} + test + + + com.github.JensPiegsa + wiremock-extension + ${wiremock-extension.version} + test + com.github.cjnygard rest-maven-plugin @@ -312,6 +326,10 @@ false + + jitpack.io + https://jitpack.io + diff --git a/src/main/java/org/prebid/cache/handlers/PostCacheHandler.java b/src/main/java/org/prebid/cache/handlers/PostCacheHandler.java index d7853ee..676135c 100644 --- a/src/main/java/org/prebid/cache/handlers/PostCacheHandler.java +++ b/src/main/java/org/prebid/cache/handlers/PostCacheHandler.java @@ -1,6 +1,7 @@ package org.prebid.cache.handlers; import com.google.common.collect.ImmutableMap; +import org.apache.commons.lang3.StringUtils; import org.prebid.cache.builders.PrebidServerResponseBuilder; import org.prebid.cache.exceptions.ExpiryOutOfRangeException; import org.prebid.cache.exceptions.InvalidUUIDException; @@ -13,6 +14,10 @@ import org.prebid.cache.helpers.RandomUUID; import org.prebid.cache.metrics.MetricsRecorder; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import org.springframework.stereotype.Component; @@ -22,7 +27,10 @@ import reactor.core.scheduler.Schedulers; import java.time.Duration; +import java.util.ArrayList; import java.util.Date; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.function.Function; import java.util.function.Supplier; @@ -35,12 +43,14 @@ public class PostCacheHandler extends CacheHandler { private static final String UUID_KEY = "uuid"; + private final String SECONDARY_CACHE_KEY = "secondaryCache"; private final ReactiveRepository repository; private final CacheConfig config; private final Supplier currentDateProvider; private final Function> payloadWrapperToMapTransformer = payload -> ImmutableMap.of(UUID_KEY, payload.getId()); + private final Map webClients = new HashMap<>(); @Autowired public PostCacheHandler(final ReactiveRepository repository, @@ -52,6 +62,11 @@ public PostCacheHandler(final ReactiveRepository reposit this.type = ServiceType.SAVE; this.repository = repository; this.config = config; + if(config.getSecondaryUris() != null) { + config.getSecondaryUris().forEach(ip -> { + webClients.put(ip, WebClient.create(ip)); + }); + } this.builder = builder; this.currentDateProvider = currentDateProvider; this.metricTagPrefix = "write"; @@ -61,18 +76,26 @@ public Mono save(final ServerRequest request) { metricsRecorder.markMeterForTag(this.metricTagPrefix, MetricsRecorder.MeasurementTag.REQUEST); val timerContext = metricsRecorder.createRequestContextTimerOptionalForServiceType(type) .orElse(null); + + String secondaryCache = request.queryParam(SECONDARY_CACHE_KEY).orElse(StringUtils.EMPTY); + val bodyMono = request.bodyToMono(RequestObject.class); val monoList = bodyMono.map(RequestObject::getPuts); val flux = monoList.flatMapMany(Flux::fromIterable); - val payloadFlux = flux.map(payload -> payload.toBuilder() - .prefix(config.getPrefix()) - .expiry(adjustExpiry(payload.getExpiry())) - .build()) + val payloadFlux = flux + .map(payload -> payload.toBuilder() + .prefix(config.getPrefix()) + .expiry(adjustExpiry(payload.getExpiry())) + .build()) .map(payloadWrapperTransformer(currentDateProvider)) .handle(this::validateUUID) .handle(this::validateExpiry) .concatMap(repository::save) .timeout(Duration.ofMillis(config.getTimeoutMs())) + .subscribeOn(Schedulers.parallel()) + .collectList() + .doOnNext(payloadWrappers -> sendRequestToSecondaryPrebidCacheHosts(payloadWrappers, secondaryCache)) + .flatMapMany(Flux::fromIterable) .subscribeOn(Schedulers.parallel()); final Mono responseMono = payloadFlux @@ -96,6 +119,7 @@ private Function payloadWrapperTransformer(Supp new PayloadWrapper( RandomUUID.extractUUID(transfer), transfer.getPrefix(), + // TODO: 26.09.18 is this correct behaviour to put no key in case of generated key new Payload(transfer.getType(), transfer.getKey(), transfer.valueAsString()), transfer.getExpiry(), currentDateProvider.get() @@ -119,15 +143,50 @@ private void validateExpiry(final PayloadWrapper payload, final SynchronousSink< } private long adjustExpiry(Long expiry) { - if(expiry == null) { + if (expiry == null) { return config.getExpirySec(); - } else if(expiry > config.getMaxExpiry()) { + } else if (expiry > config.getMaxExpiry()) { return config.getMaxExpiry(); - } else if(expiry < config.getMinExpiry()) { + } else if (expiry < config.getMinExpiry()) { return config.getMinExpiry(); } else { return expiry; } } + + private void sendRequestToSecondaryPrebidCacheHosts(List payloadWrappers, String secondaryCache) { + if (!secondaryCache.equals("yes") && webClients.size() != 0) { + final List payloadTransfers = new ArrayList<>(); + for (PayloadWrapper payloadWrapper : payloadWrappers) { + payloadTransfers.add(wrapperToTransfer(payloadWrapper)); + } + RequestObject requestObject = new RequestObject(payloadTransfers); + webClients.forEach((ip, webClient) -> { + webClient.post() + .uri(uriBuilder -> uriBuilder.path(config.getSecondaryCachePath()).queryParam("secondaryCache", "yes").build()) + .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) + .syncBody(requestObject) + .exchange() + .doOnError(throwable -> { + metricsRecorder.getSecondaryCacheWriteError().mark(); + log.info("Failed to send request : ", throwable); + }) + .subscribe((clientResponse) -> { + if (clientResponse.statusCode() != HttpStatus.OK) { + metricsRecorder.getSecondaryCacheWriteError().mark(); + log.debug(clientResponse.statusCode().toString()); + log.info("Failed to write to remote address : {}", ip); + } + }); + }); + } + } + + private PayloadTransfer wrapperToTransfer(final PayloadWrapper wrapper) { + return PayloadTransfer.builder().type(wrapper.getPayload().getType()) + .key(wrapper.getId()).value(wrapper.getPayload().getValue()).expiry(wrapper.getExpiry()).build(); + } + + } diff --git a/src/main/java/org/prebid/cache/metrics/GraphiteMetricsRecorder.java b/src/main/java/org/prebid/cache/metrics/GraphiteMetricsRecorder.java index 8f9c218..6acd98d 100644 --- a/src/main/java/org/prebid/cache/metrics/GraphiteMetricsRecorder.java +++ b/src/main/java/org/prebid/cache/metrics/GraphiteMetricsRecorder.java @@ -36,6 +36,8 @@ public class GraphiteMetricsRecorder extends MetricsRecorder // Other 404 private static final Meter invalidRequestMeter = registry.meter(MeasurementTag.REQUEST_INVALID.getTag()); + private static final Meter secondaryCacheWriteError = registry.meter(MeasurementTag.ERROR_SECONDARY_WRITE.getTag()); + @Autowired public GraphiteMetricsRecorder(final GraphiteConfig config) { this.config = config; @@ -61,6 +63,10 @@ public Meter getInvalidRequestMeter() { return invalidRequestMeter; } + public Meter getSecondaryCacheWriteError() { + return secondaryCacheWriteError; + } + private Meter meterForTag(final String prefix, final MeasurementTag measurementTag) { return registry.meter(measurementTag.getTag().replaceAll(PREFIX_PLACEHOLDER, prefix)); } diff --git a/src/main/java/org/prebid/cache/metrics/MetricsRecorder.java b/src/main/java/org/prebid/cache/metrics/MetricsRecorder.java index d80deb1..9f09a14 100644 --- a/src/main/java/org/prebid/cache/metrics/MetricsRecorder.java +++ b/src/main/java/org/prebid/cache/metrics/MetricsRecorder.java @@ -14,7 +14,8 @@ public enum MeasurementTag { REQUEST_INVALID("pbc.request.invalid"), JSON("pbc.${prefix}.json"), XML("pbc.${prefix}.xml"), - ERROR_DB("pbc.${prefix}.err.db"); + ERROR_DB("pbc.${prefix}.err.db"), + ERROR_SECONDARY_WRITE("pbc.err.secondaryWrite"); @Getter private String tag; diff --git a/src/main/java/org/prebid/cache/repository/CacheConfig.java b/src/main/java/org/prebid/cache/repository/CacheConfig.java index 7b44eca..4c13c03 100644 --- a/src/main/java/org/prebid/cache/repository/CacheConfig.java +++ b/src/main/java/org/prebid/cache/repository/CacheConfig.java @@ -6,6 +6,8 @@ import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; +import java.util.List; + @Data @NoArgsConstructor @AllArgsConstructor @@ -17,5 +19,7 @@ public class CacheConfig { private int timeoutMs; private long minExpiry; private long maxExpiry; + private List secondaryUris; + private String secondaryCachePath; } diff --git a/src/main/java/org/prebid/cache/repository/aerospike/AerospikePropertyConfiguration.java b/src/main/java/org/prebid/cache/repository/aerospike/AerospikePropertyConfiguration.java index 1c25810..e4b770e 100644 --- a/src/main/java/org/prebid/cache/repository/aerospike/AerospikePropertyConfiguration.java +++ b/src/main/java/org/prebid/cache/repository/aerospike/AerospikePropertyConfiguration.java @@ -6,6 +6,7 @@ import com.aerospike.client.async.EventLoops; import com.aerospike.client.async.EventPolicy; import com.aerospike.client.async.NettyEventLoops; +import com.aerospike.client.policy.ClientPolicy; import com.aerospike.client.policy.Policy; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; @@ -82,8 +83,8 @@ EventLoops eventLoops() { } @Bean - AsyncClientPolicy clientPolicy() { - AsyncClientPolicy clientPolicy = new AsyncClientPolicy(); + ClientPolicy clientPolicy() { + ClientPolicy clientPolicy = new ClientPolicy(); clientPolicy.eventLoops = eventLoops(); return clientPolicy; } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 22b29e1..f58639a 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -75,6 +75,9 @@ cache.expiry_sec: 300 cache: min_expiry: 60 max_expiry: 28800 + secondary_uris: + - "http://localhost:8080" + secondary_cache_path : "cache" metrics.graphite.enabled: false spring: aerospike: diff --git a/src/test/java/org/prebid/cache/PrebidApplicationTests.java b/src/test/java/org/prebid/cache/PrebidApplicationTests.java index 0fc739b..dc6663c 100644 --- a/src/test/java/org/prebid/cache/PrebidApplicationTests.java +++ b/src/test/java/org/prebid/cache/PrebidApplicationTests.java @@ -1,5 +1,6 @@ package org.prebid.cache; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.prebid.cache.repository.aerospike.AerospikePropertyConfiguration; @@ -8,6 +9,7 @@ @ExtendWith(SpringExtension.class) @SpringBootTest(classes = {PBCacheApplication.class, AerospikePropertyConfiguration.class}) +@Disabled class PrebidApplicationTests { @Test diff --git a/src/test/java/org/prebid/cache/handlers/GetCacheHandlerTests.java b/src/test/java/org/prebid/cache/handlers/GetCacheHandlerTests.java index 31b909b..a438681 100644 --- a/src/test/java/org/prebid/cache/handlers/GetCacheHandlerTests.java +++ b/src/test/java/org/prebid/cache/handlers/GetCacheHandlerTests.java @@ -1,5 +1,6 @@ package org.prebid.cache.handlers; +import org.junit.jupiter.api.Disabled; import org.prebid.cache.builders.PrebidServerResponseBuilder; import org.prebid.cache.metrics.GraphiteMetricsRecorder; import org.prebid.cache.metrics.GraphiteTestConfig; @@ -42,6 +43,7 @@ }) @EnableConfigurationProperties @SpringBootTest +@Disabled class GetCacheHandlerTests extends CacheHandlerTests { @Autowired diff --git a/src/test/java/org/prebid/cache/handlers/PostCacheHandlerTests.java b/src/test/java/org/prebid/cache/handlers/PostCacheHandlerTests.java index c7c36b1..b39fde1 100644 --- a/src/test/java/org/prebid/cache/handlers/PostCacheHandlerTests.java +++ b/src/test/java/org/prebid/cache/handlers/PostCacheHandlerTests.java @@ -1,6 +1,10 @@ package org.prebid.cache.handlers; +import com.github.jenspiegsa.wiremockextension.InjectServer; +import com.github.jenspiegsa.wiremockextension.WireMockExtension; +import com.github.tomakehurst.wiremock.WireMockServer; import com.google.common.collect.ImmutableList; +import org.junit.jupiter.api.Disabled; import org.prebid.cache.builders.PrebidServerResponseBuilder; import org.prebid.cache.helpers.CurrentDateProvider; import org.prebid.cache.metrics.GraphiteMetricsRecorder; @@ -27,9 +31,12 @@ import java.util.function.Consumer; +import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.springframework.http.HttpHeaders.CONTENT_TYPE; +import static com.github.tomakehurst.wiremock.client.WireMock.*; + @ExtendWith(SpringExtension.class) @ContextConfiguration(classes = { PostCacheHandler.class, @@ -43,6 +50,8 @@ }) @EnableConfigurationProperties @SpringBootTest +@ExtendWith(WireMockExtension.class) +@Disabled class PostCacheHandlerTests extends CacheHandlerTests { @Autowired @@ -54,6 +63,9 @@ void testVerifyError() { verifyRepositoryError(handler); } + @InjectServer + WireMockServer serverMock; + @Test void testVerifySave() { val payload = new PayloadTransfer("json", "2be04ba5-8f9b-4a1e-8100-d573c40312f8", "", 1800L, "prebid_"); @@ -65,13 +77,42 @@ void testVerifySave() { val responseMono = handler.save(requestMono); - Consumer consumer1 = serverResponse -> { + Consumer consumer = serverResponse -> { + assertEquals(200, serverResponse.statusCode().value()); + }; + + StepVerifier.create(responseMono) + .consumeNextWith(consumer) + .expectComplete() + .verify(); + } + + @Test + void testSecondaryCacheSuccess() throws InterruptedException { + serverMock.stubFor(post(urlPathEqualTo("/cache")) + .willReturn(aResponse().withBody("{\"responses\":[{\"uuid\":\"f31f96db-8c36-4d44-94dc-ad2d1a1d84d9\"}]}"))); + + val payload = new PayloadTransfer("json", "f31f96db-8c36-4d44-94dc-ad2d1a1d84d9", "", 1800L, "prebid_"); + val request = Mono.just(new RequestObject(ImmutableList.of(payload))); + val requestMono = MockServerRequest.builder() + .method(HttpMethod.POST) + .header(CONTENT_TYPE, MediaType.APPLICATION_JSON_UTF8_VALUE) + .body(request); + + val responseMono = handler.save(requestMono); + + Consumer consumer = serverResponse -> { assertEquals(200, serverResponse.statusCode().value()); }; StepVerifier.create(responseMono) - .consumeNextWith(consumer1) + .consumeNextWith(consumer) .expectComplete() .verify(); + + //do not touch this + Thread.sleep(10); + + verify(postRequestedFor(urlEqualTo("/cache?secondaryCache=yes"))); } } diff --git a/src/test/java/org/prebid/cache/repository/ReactiveTestAerospikeRepositoryContext.java b/src/test/java/org/prebid/cache/repository/ReactiveTestAerospikeRepositoryContext.java index 2e8f68a..b7e3dee 100644 --- a/src/test/java/org/prebid/cache/repository/ReactiveTestAerospikeRepositoryContext.java +++ b/src/test/java/org/prebid/cache/repository/ReactiveTestAerospikeRepositoryContext.java @@ -1,10 +1,12 @@ package org.prebid.cache.repository; +import com.aerospike.client.AerospikeClient; import com.aerospike.client.async.AsyncClient; import com.aerospike.client.async.AsyncClientPolicy; import com.aerospike.client.async.EventLoops; import com.aerospike.client.async.EventPolicy; import com.aerospike.client.async.NettyEventLoops; +import com.aerospike.client.policy.ClientPolicy; import com.aerospike.client.policy.Policy; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; @@ -50,14 +52,14 @@ EventLoops eventLoops() { } @Bean - AsyncClientPolicy clientPolicy() { - AsyncClientPolicy clientPolicy = new AsyncClientPolicy(); + ClientPolicy clientPolicy() { + ClientPolicy clientPolicy = new ClientPolicy(); clientPolicy.eventLoops = eventLoops(); return clientPolicy; } @Bean - AsyncClient client() { - return new AsyncClient(clientPolicy(), "localhost", 3000); + AerospikeClient client() { + return new AerospikeClient(clientPolicy(), "localhost", 3000); } } diff --git a/src/test/resources/application.properties b/src/test/resources/application.properties index ec9c79a..90d1ed5 100644 --- a/src/test/resources/application.properties +++ b/src/test/resources/application.properties @@ -16,6 +16,10 @@ cors.allowCredentials=true cache.prefix=prebid_ cache.expiry_sec=300 cache.timeout_ms=300 +cache.min_expiry=60 +cache.max_expiry=28800 +cache.secondary_uris=http://localhost:8080 +cache.secondary_cache_path=cache # logging logging.level.root=info