Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<properties>
<project.build.resources.directory>src/main/resources</project.build.resources.directory>
<main.class>org.prebid.cache.PBCacheApplication</main.class>
<junit-jupiter.version>5.0.3</junit-jupiter.version>
<junit-jupiter.version>5.3.1</junit-jupiter.version>

<!-- used for rpm packaging -->
<build.number>0</build.number>
Expand Down Expand Up @@ -55,6 +55,8 @@
<equals-verifier.version>2.4.3</equals-verifier.version>
<aerospike-client.version>4.1.6</aerospike-client.version>
<reactor-extra.version>3.1.6.RELEASE</reactor-extra.version>
<wiremock.version>2.11.0</wiremock.version>
<wiremock-extension.version>0.3.1</wiremock-extension.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -153,6 +155,18 @@
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.tomakehurst</groupId>
<artifactId>wiremock</artifactId>
<version>${wiremock.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.JensPiegsa</groupId>
<artifactId>wiremock-extension</artifactId>
<version>${wiremock-extension.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.cjnygard</groupId>
<artifactId>rest-maven-plugin</artifactId>
Expand Down Expand Up @@ -312,6 +326,10 @@
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>jitpack.io</id>
<url>https://jitpack.io</url>
</repository>
</repositories>

<pluginRepositories>
Expand Down
73 changes: 66 additions & 7 deletions src/main/java/org/prebid/cache/handlers/PostCacheHandler.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.prebid.cache.handlers;

import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.StringUtils;
import org.prebid.cache.builders.PrebidServerResponseBuilder;
import org.prebid.cache.exceptions.ExpiryOutOfRangeException;
import org.prebid.cache.exceptions.InvalidUUIDException;
Expand All @@ -13,6 +14,10 @@
import org.prebid.cache.helpers.RandomUUID;
import org.prebid.cache.metrics.MetricsRecorder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.stereotype.Component;
Expand All @@ -22,7 +27,10 @@
import reactor.core.scheduler.Schedulers;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Supplier;
Expand All @@ -35,12 +43,14 @@
public class PostCacheHandler extends CacheHandler {

private static final String UUID_KEY = "uuid";
private final String SECONDARY_CACHE_KEY = "secondaryCache";

private final ReactiveRepository<PayloadWrapper, String> repository;
private final CacheConfig config;
private final Supplier<Date> currentDateProvider;
private final Function<PayloadWrapper, Map<String, String>> payloadWrapperToMapTransformer = payload ->
ImmutableMap.of(UUID_KEY, payload.getId());
private final Map<String, WebClient> webClients = new HashMap<>();

@Autowired
public PostCacheHandler(final ReactiveRepository<PayloadWrapper, String> repository,
Expand All @@ -52,6 +62,11 @@ public PostCacheHandler(final ReactiveRepository<PayloadWrapper, String> reposit
this.type = ServiceType.SAVE;
this.repository = repository;
this.config = config;
if(config.getSecondaryUris() != null) {
config.getSecondaryUris().forEach(ip -> {
webClients.put(ip, WebClient.create(ip));
});
}
this.builder = builder;
this.currentDateProvider = currentDateProvider;
this.metricTagPrefix = "write";
Expand All @@ -61,18 +76,26 @@ public Mono<ServerResponse> save(final ServerRequest request) {
metricsRecorder.markMeterForTag(this.metricTagPrefix, MetricsRecorder.MeasurementTag.REQUEST);
val timerContext = metricsRecorder.createRequestContextTimerOptionalForServiceType(type)
.orElse(null);

String secondaryCache = request.queryParam(SECONDARY_CACHE_KEY).orElse(StringUtils.EMPTY);

val bodyMono = request.bodyToMono(RequestObject.class);
val monoList = bodyMono.map(RequestObject::getPuts);
val flux = monoList.flatMapMany(Flux::fromIterable);
val payloadFlux = flux.map(payload -> payload.toBuilder()
.prefix(config.getPrefix())
.expiry(adjustExpiry(payload.getExpiry()))
.build())
val payloadFlux = flux
.map(payload -> payload.toBuilder()
.prefix(config.getPrefix())
.expiry(adjustExpiry(payload.getExpiry()))
.build())
.map(payloadWrapperTransformer(currentDateProvider))
.handle(this::validateUUID)
.handle(this::validateExpiry)
.concatMap(repository::save)
.timeout(Duration.ofMillis(config.getTimeoutMs()))
.subscribeOn(Schedulers.parallel())
.collectList()
.doOnNext(payloadWrappers -> sendRequestToSecondaryPrebidCacheHosts(payloadWrappers, secondaryCache))
.flatMapMany(Flux::fromIterable)
.subscribeOn(Schedulers.parallel());

final Mono<ServerResponse> responseMono = payloadFlux
Expand All @@ -96,6 +119,7 @@ private Function<PayloadTransfer, PayloadWrapper> payloadWrapperTransformer(Supp
new PayloadWrapper(
RandomUUID.extractUUID(transfer),
transfer.getPrefix(),
// TODO: 26.09.18 is this correct behaviour to put no key in case of generated key
new Payload(transfer.getType(), transfer.getKey(), transfer.valueAsString()),
transfer.getExpiry(),
currentDateProvider.get()
Expand All @@ -119,15 +143,50 @@ private void validateExpiry(final PayloadWrapper payload, final SynchronousSink<
}

private long adjustExpiry(Long expiry) {
if(expiry == null) {
if (expiry == null) {
return config.getExpirySec();
} else if(expiry > config.getMaxExpiry()) {
} else if (expiry > config.getMaxExpiry()) {
return config.getMaxExpiry();
} else if(expiry < config.getMinExpiry()) {
} else if (expiry < config.getMinExpiry()) {
return config.getMinExpiry();
} else {
return expiry;
}
}

private void sendRequestToSecondaryPrebidCacheHosts(List<PayloadWrapper> payloadWrappers, String secondaryCache) {
if (!secondaryCache.equals("yes") && webClients.size() != 0) {
final List<PayloadTransfer> payloadTransfers = new ArrayList<>();
for (PayloadWrapper payloadWrapper : payloadWrappers) {
payloadTransfers.add(wrapperToTransfer(payloadWrapper));
}
RequestObject requestObject = new RequestObject(payloadTransfers);
webClients.forEach((ip, webClient) -> {
webClient.post()
.uri(uriBuilder -> uriBuilder.path(config.getSecondaryCachePath()).queryParam("secondaryCache", "yes").build())
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.syncBody(requestObject)
.exchange()
.doOnError(throwable -> {
metricsRecorder.getSecondaryCacheWriteError().mark();
log.info("Failed to send request : ", throwable);
})
.subscribe((clientResponse) -> {
if (clientResponse.statusCode() != HttpStatus.OK) {
metricsRecorder.getSecondaryCacheWriteError().mark();
log.debug(clientResponse.statusCode().toString());
log.info("Failed to write to remote address : {}", ip);
}
});
});
}
}

private PayloadTransfer wrapperToTransfer(final PayloadWrapper wrapper) {
return PayloadTransfer.builder().type(wrapper.getPayload().getType())
.key(wrapper.getId()).value(wrapper.getPayload().getValue()).expiry(wrapper.getExpiry()).build();
}


}

Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public class GraphiteMetricsRecorder extends MetricsRecorder
// Other 404
private static final Meter invalidRequestMeter = registry.meter(MeasurementTag.REQUEST_INVALID.getTag());

private static final Meter secondaryCacheWriteError = registry.meter(MeasurementTag.ERROR_SECONDARY_WRITE.getTag());

@Autowired
public GraphiteMetricsRecorder(final GraphiteConfig config) {
this.config = config;
Expand All @@ -61,6 +63,10 @@ public Meter getInvalidRequestMeter() {
return invalidRequestMeter;
}

public Meter getSecondaryCacheWriteError() {
return secondaryCacheWriteError;
}

private Meter meterForTag(final String prefix, final MeasurementTag measurementTag) {
return registry.meter(measurementTag.getTag().replaceAll(PREFIX_PLACEHOLDER, prefix));
}
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/prebid/cache/metrics/MetricsRecorder.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ public enum MeasurementTag {
REQUEST_INVALID("pbc.request.invalid"),
JSON("pbc.${prefix}.json"),
XML("pbc.${prefix}.xml"),
ERROR_DB("pbc.${prefix}.err.db");
ERROR_DB("pbc.${prefix}.err.db"),
ERROR_SECONDARY_WRITE("pbc.err.secondaryWrite");

@Getter private String tag;

Expand Down
4 changes: 4 additions & 0 deletions src/main/java/org/prebid/cache/repository/CacheConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

import java.util.List;

@Data
@NoArgsConstructor
@AllArgsConstructor
Expand All @@ -17,5 +19,7 @@ public class CacheConfig {
private int timeoutMs;
private long minExpiry;
private long maxExpiry;
private List<String> secondaryUris;
private String secondaryCachePath;
}

Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.aerospike.client.async.EventLoops;
import com.aerospike.client.async.EventPolicy;
import com.aerospike.client.async.NettyEventLoops;
import com.aerospike.client.policy.ClientPolicy;
import com.aerospike.client.policy.Policy;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
Expand Down Expand Up @@ -82,8 +83,8 @@ EventLoops eventLoops() {
}

@Bean
AsyncClientPolicy clientPolicy() {
AsyncClientPolicy clientPolicy = new AsyncClientPolicy();
ClientPolicy clientPolicy() {
ClientPolicy clientPolicy = new ClientPolicy();
clientPolicy.eventLoops = eventLoops();
return clientPolicy;
}
Expand Down
3 changes: 3 additions & 0 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ cache.expiry_sec: 300
cache:
min_expiry: 60
max_expiry: 28800
secondary_uris:
- "http://localhost:8080"
secondary_cache_path : "cache"
metrics.graphite.enabled: false
spring:
aerospike:
Expand Down
2 changes: 2 additions & 0 deletions src/test/java/org/prebid/cache/PrebidApplicationTests.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.prebid.cache;

import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.prebid.cache.repository.aerospike.AerospikePropertyConfiguration;
Expand All @@ -8,6 +9,7 @@

@ExtendWith(SpringExtension.class)
@SpringBootTest(classes = {PBCacheApplication.class, AerospikePropertyConfiguration.class})
@Disabled
class PrebidApplicationTests {

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.prebid.cache.handlers;

import org.junit.jupiter.api.Disabled;
import org.prebid.cache.builders.PrebidServerResponseBuilder;
import org.prebid.cache.metrics.GraphiteMetricsRecorder;
import org.prebid.cache.metrics.GraphiteTestConfig;
Expand Down Expand Up @@ -42,6 +43,7 @@
})
@EnableConfigurationProperties
@SpringBootTest
@Disabled
class GetCacheHandlerTests extends CacheHandlerTests {

@Autowired
Expand Down
45 changes: 43 additions & 2 deletions src/test/java/org/prebid/cache/handlers/PostCacheHandlerTests.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package org.prebid.cache.handlers;

import com.github.jenspiegsa.wiremockextension.InjectServer;
import com.github.jenspiegsa.wiremockextension.WireMockExtension;
import com.github.tomakehurst.wiremock.WireMockServer;
import com.google.common.collect.ImmutableList;
import org.junit.jupiter.api.Disabled;
import org.prebid.cache.builders.PrebidServerResponseBuilder;
import org.prebid.cache.helpers.CurrentDateProvider;
import org.prebid.cache.metrics.GraphiteMetricsRecorder;
Expand All @@ -27,9 +31,12 @@

import java.util.function.Consumer;

import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.springframework.http.HttpHeaders.CONTENT_TYPE;

import static com.github.tomakehurst.wiremock.client.WireMock.*;

@ExtendWith(SpringExtension.class)
@ContextConfiguration(classes = {
PostCacheHandler.class,
Expand All @@ -43,6 +50,8 @@
})
@EnableConfigurationProperties
@SpringBootTest
@ExtendWith(WireMockExtension.class)
@Disabled
class PostCacheHandlerTests extends CacheHandlerTests {

@Autowired
Expand All @@ -54,6 +63,9 @@ void testVerifyError() {
verifyRepositoryError(handler);
}

@InjectServer
WireMockServer serverMock;

@Test
void testVerifySave() {
val payload = new PayloadTransfer("json", "2be04ba5-8f9b-4a1e-8100-d573c40312f8", "", 1800L, "prebid_");
Expand All @@ -65,13 +77,42 @@ void testVerifySave() {

val responseMono = handler.save(requestMono);

Consumer<ServerResponse> consumer1 = serverResponse -> {
Consumer<ServerResponse> consumer = serverResponse -> {
assertEquals(200, serverResponse.statusCode().value());
};

StepVerifier.create(responseMono)
.consumeNextWith(consumer)
.expectComplete()
.verify();
}

@Test
void testSecondaryCacheSuccess() throws InterruptedException {
serverMock.stubFor(post(urlPathEqualTo("/cache"))
.willReturn(aResponse().withBody("{\"responses\":[{\"uuid\":\"f31f96db-8c36-4d44-94dc-ad2d1a1d84d9\"}]}")));

val payload = new PayloadTransfer("json", "f31f96db-8c36-4d44-94dc-ad2d1a1d84d9", "", 1800L, "prebid_");
val request = Mono.just(new RequestObject(ImmutableList.of(payload)));
val requestMono = MockServerRequest.builder()
.method(HttpMethod.POST)
.header(CONTENT_TYPE, MediaType.APPLICATION_JSON_UTF8_VALUE)
.body(request);

val responseMono = handler.save(requestMono);

Consumer<ServerResponse> consumer = serverResponse -> {
assertEquals(200, serverResponse.statusCode().value());
};

StepVerifier.create(responseMono)
.consumeNextWith(consumer1)
.consumeNextWith(consumer)
.expectComplete()
.verify();

//do not touch this
Thread.sleep(10);

verify(postRequestedFor(urlEqualTo("/cache?secondaryCache=yes")));
}
}
Loading