Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Add configurable timeout for publishing #1583

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,8 @@ public void whenBulkSendSuccessfullyThenUpdateBatchItemStatus() {
items.add(item);
}

kafkaTopicRepository.syncPostBatch(topicId, items, null, null, false);
kafkaTopicRepository.syncPostBatch(topicId, items, null, null,
Optional.empty(), false);

for (int i = 0; i < 10; i++) {
assertThat(items.get(i).getResponse().getPublishingStatus(), equalTo(EventPublishingStatus.SUBMITTED));
Expand All @@ -261,7 +262,8 @@ public void whenSendBatchWithItemHeadersThenCheckBatchStatus() {
item.setOwner(new EventOwnerHeader("unit", "Nakadi"));
items.add(item);
}
kafkaTopicRepository.syncPostBatch(topicId, items, null, null, false);
kafkaTopicRepository.syncPostBatch(topicId, items, null, null,
Optional.empty(), false);

for (int i = 0; i < 10; i++) {
assertThat(items.get(i).getResponse().getPublishingStatus(), equalTo(EventPublishingStatus.SUBMITTED));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.zalando.nakadi.cache.EventTypeCache;
Expand Down Expand Up @@ -93,11 +94,13 @@ public EventPublishingController(final EventPublisher publisher,
public ResponseEntity postJsonEvents(@PathVariable final String eventTypeName,
@RequestBody final String eventsAsString,
final HttpServletRequest request,
final Client client)
final Client client,
final @RequestHeader(value = "X-TIMEOUT", required = false, defaultValue = "0")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
final @RequestHeader(value = "X-TIMEOUT", required = false, defaultValue = "0")
final @RequestHeader(value = "X-Timeout", required = false, defaultValue = "0")

Also, I wonder if this should be called more specifically, smt. like X-Client-Wait-Write-Timeout.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets be consistent, X-Nakadi-Cursors, X-Nakadi-Stream-Id hence X-Nakadi-Publishing-Timeout

Copy link
Member

@adyach adyach Jan 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defaultValue = "0", maybe it is possible to have default value from config 30 seconds

int publishTimeout)
throws AccessDeniedException, BlockedException, ServiceTemporarilyUnavailableException,
InternalNakadiException, EventTypeTimeoutException, NoSuchEventTypeException {
return postEventsWithMetrics(eventTypeName, eventsAsString,
toHeaderTagMap(request.getHeader(X_CONSUMER_TAG)), request, client, false);
return postEventsWithMetrics(PublishRequest.asPublish(eventTypeName, eventsAsString,
client, toHeaderTagMap(request.getHeader(X_CONSUMER_TAG)), publishTimeout));
}

@RequestMapping(
Expand All @@ -108,15 +111,18 @@ public ResponseEntity postJsonEvents(@PathVariable final String eventTypeName,
)
public ResponseEntity postBinaryEvents(@PathVariable final String eventTypeName,
final HttpServletRequest request,
final Client client)
final Client client,
final @RequestHeader(value = "X-TIMEOUT",
required = false, defaultValue = "0")
int publishTimeout)
throws AccessDeniedException, BlockedException, ServiceTemporarilyUnavailableException,
InternalNakadiException, EventTypeTimeoutException, NoSuchEventTypeException {

// TODO: check that event type schema type is AVRO!

try {
return postBinaryEvents(eventTypeName, request.getInputStream(),
toHeaderTagMap(request.getHeader(X_CONSUMER_TAG)), client, false);
return postBinaryEvents(PublishRequest.asPublish(eventTypeName, request.getInputStream(),
client, toHeaderTagMap(request.getHeader(X_CONSUMER_TAG)), publishTimeout));
} catch (IOException e) {
throw new InternalNakadiException("failed to parse batch", e);
}
Expand All @@ -134,46 +140,46 @@ public ResponseEntity deleteBinaryEvents(@PathVariable final String eventTypeNam
throws AccessDeniedException, BlockedException, ServiceTemporarilyUnavailableException,
InternalNakadiException, EventTypeTimeoutException, NoSuchEventTypeException {
try {
return postBinaryEvents(eventTypeName, request.getInputStream(), null, client, true);
return postBinaryEvents(PublishRequest.asDelete(eventTypeName, request.getInputStream(),
client));
} catch (IOException e) {
throw new InternalNakadiException("failed to parse batch", e);
}
}


private ResponseEntity postBinaryEvents(final String eventTypeName,
final InputStream batch,
final Map<HeaderTag, String> consumerTags,
final Client client,
final boolean delete) {
private ResponseEntity postBinaryEvents(final PublishRequest<? extends InputStream> request) {
TracingService.setOperationName("publish_events")
.setTag("event_type", eventTypeName)
.setTag("event_type", request.getEventTypeName())
.setTag("сontent-type", "application/avro-binary")
.setTag(Tags.SPAN_KIND_PRODUCER, client.getClientId());
.setTag(Tags.SPAN_KIND_PRODUCER, request.getClient().getClientId());

if (blacklistService.isProductionBlocked(eventTypeName, client.getClientId())) {
if (blacklistService.isProductionBlocked(request.getEventTypeName(),
request.getClient().getClientId())) {
throw new BlockedException("Application or event type is blocked");
}

final EventType eventType = eventTypeCache.getEventType(eventTypeName);
final EventType eventType = eventTypeCache.getEventType(request.getEventTypeName());

if (delete && eventType.getCleanupPolicy() == CleanupPolicy.DELETE) {
if (request.isDeleteRequest() &&
eventType.getCleanupPolicy() == CleanupPolicy.DELETE) {
throw new InvalidEventTypeException("It is not allowed to delete events from non compacted event type");
}

authValidator.authorizeEventTypeWrite(eventType);

final EventTypeMetrics eventTypeMetrics = eventTypeMetricRegistry.metricsFor(eventTypeName);
final EventTypeMetrics eventTypeMetrics = eventTypeMetricRegistry.metricsFor(request.getEventTypeName());
try {
final long startingNanos = System.nanoTime();
try {
final CountingInputStream countingInputStream = new CountingInputStream(batch);
final CountingInputStream countingInputStream = new CountingInputStream(request.getEventsRaw());
final List<NakadiRecord> nakadiRecords = nakadiRecordMapper.fromBytesBatch(countingInputStream);
final List<NakadiRecordResult> recordResults;
if (delete) {
if (request.isDeleteRequest()) {
recordResults = binaryPublisher.delete(nakadiRecords, eventType);
} else {
recordResults = binaryPublisher.publish(eventType, nakadiRecords, consumerTags);
recordResults = binaryPublisher.publish(eventType, nakadiRecords,
request.getConsumerTags(), request.getDesiredPublishingTimeout());
}
if (recordResults.isEmpty()) {
throw new InternalNakadiException("unexpected empty record result list, " +
Expand All @@ -188,7 +194,8 @@ private ResponseEntity postBinaryEvents(final String eventTypeName,
TracingService.setTag("slo_bucket", TracingService.getSLOBucketName(totalSizeBytes));

reportMetrics(eventTypeMetrics, result, totalSizeBytes, eventCount);
reportSLOs(startingNanos, totalSizeBytes, eventCount, result, eventTypeName, client);
reportSLOs(startingNanos, totalSizeBytes, eventCount, result,
request.getEventTypeName(), request.getClient());

if (result.getStatus() == EventPublishingStatus.FAILED) {
TracingService.setErrorFlag();
Expand All @@ -214,26 +221,22 @@ public ResponseEntity deleteEvents(@PathVariable final String eventTypeName,
@RequestBody final String eventsAsString,
final HttpServletRequest request,
final Client client) {
return postEventsWithMetrics(eventTypeName, eventsAsString, null, request, client, true);
return postEventsWithMetrics(PublishRequest.asDelete(eventTypeName, eventsAsString,
client));
}

private ResponseEntity postEventsWithMetrics(final String eventTypeName,
final String eventsAsString,
final Map<HeaderTag, String> consumerTags,
final HttpServletRequest request,
final Client client,
final boolean delete) {
private ResponseEntity postEventsWithMetrics(final PublishRequest<String> request) {
TracingService.setOperationName("publish_events")
.setTag("event_type", eventTypeName)
.setTag(Tags.SPAN_KIND_PRODUCER, client.getClientId());
.setTag("event_type", request.getEventTypeName())
.setTag(Tags.SPAN_KIND_PRODUCER, request.getClient().getClientId());

if (blacklistService.isProductionBlocked(eventTypeName, client.getClientId())) {
if (blacklistService.
isProductionBlocked(request.getEventTypeName(), request.getClient().getClientId())) {
throw new BlockedException("Application or event type is blocked");
}
final EventTypeMetrics eventTypeMetrics = eventTypeMetricRegistry.metricsFor(eventTypeName);
final EventTypeMetrics eventTypeMetrics = eventTypeMetricRegistry.metricsFor(request.getEventTypeName());
try {
final ResponseEntity response = postEventInternal(
eventTypeName, eventsAsString, consumerTags, eventTypeMetrics, client, request, delete);
final ResponseEntity response = postEventInternal(request, eventTypeMetrics);
eventTypeMetrics.incrementResponseCount(response.getStatusCode().value());
return response;
} catch (final NoSuchEventTypeException exception) {
Expand All @@ -245,32 +248,29 @@ private ResponseEntity postEventsWithMetrics(final String eventTypeName,
}
}

private ResponseEntity postEventInternal(final String eventTypeName,
final String eventsAsString,
final Map<HeaderTag, String> consumerTags,
final EventTypeMetrics eventTypeMetrics,
final Client client,
final HttpServletRequest request,
final boolean delete)
private ResponseEntity postEventInternal(final PublishRequest<String> request,
final EventTypeMetrics eventTypeMetrics)
throws AccessDeniedException, ServiceTemporarilyUnavailableException, InternalNakadiException,
EventTypeTimeoutException, NoSuchEventTypeException {
final long startingNanos = System.nanoTime();
try {
final EventPublishResult result;

final int totalSizeBytes = eventsAsString.getBytes(Charsets.UTF_8).length;
final int totalSizeBytes = request.getEventsRaw().getBytes(Charsets.UTF_8).length;
TracingService.setTag("slo_bucket", TracingService.getSLOBucketName(totalSizeBytes));

if (delete) {
result = publisher.delete(eventsAsString, eventTypeName);
if (request.isDeleteRequest()) {
result = publisher.delete(request.getEventsRaw(), request.getEventTypeName());
} else {
result = publisher.publish(eventsAsString, eventTypeName, consumerTags);
result = publisher.publish(request.getEventsRaw(), request.getEventTypeName(),
request.getConsumerTags(), request.getDesiredPublishingTimeout());
}
// FIXME: there should be a more direct way to get the input batch size
final int eventCount = result.getResponses().size();

reportMetrics(eventTypeMetrics, result, totalSizeBytes, eventCount);
reportSLOs(startingNanos, totalSizeBytes, eventCount, result, eventTypeName, client);
reportSLOs(startingNanos, totalSizeBytes, eventCount, result,
request.getEventTypeName(), request.getClient());

TracingService.setTag("number_of_events", eventCount);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package org.zalando.nakadi;

import org.zalando.nakadi.domain.HeaderTag;
import org.zalando.nakadi.exceptions.runtime.InvalidPublishingParamException;
import org.zalando.nakadi.security.Client;

import java.util.Collections;
import java.util.Map;
import java.util.Optional;

public class PublishRequest<T> {
private final String eventTypeName;
private final T eventsRaw;
private final Client client;
private final Map<HeaderTag, String> consumerTags;
private final Optional<Integer> desiredPublishingTimeout;
private final boolean isDeleteRequest;

public PublishRequest(final String eventTypeName,
final T eventsRaw,
final Client client,
final Map<HeaderTag, String> consumerTags,
final int desiredPublishingTimeout,
final boolean isDeleteRequest) {
this.eventTypeName = eventTypeName;
this.eventsRaw = eventsRaw;
this.client = client;
this.consumerTags = consumerTags;
//TODO: better way to get max timeout instead of hardcoding
if (desiredPublishingTimeout < 0 || desiredPublishingTimeout > 30_000) {
throw new InvalidPublishingParamException("X-TIMEOUT cannot be less than 0 or greater than 30000 ms");
}
//0 means either nothing was supplied or 0 was supplied, in both cases it means we will leave
//the timeout to be current default
this.desiredPublishingTimeout = Optional.of(desiredPublishingTimeout).filter(v -> v != 0);
Comment on lines +29 to +35
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This constructor contains api level validation of this field.
I suggest to move "outside" this logic to the callers of the constructor methods.
(Like it's now for validation of other attributes passed here).
This should make this class POJO.

this.isDeleteRequest = isDeleteRequest;
}

public String getEventTypeName() {
return eventTypeName;
}

public T getEventsRaw() {
return eventsRaw;
}

public Client getClient() {
return client;
}

public Map<HeaderTag, String> getConsumerTags() {
return consumerTags;
}

public Optional<Integer> getDesiredPublishingTimeout() {
return desiredPublishingTimeout;
}

public boolean isDeleteRequest() {
return isDeleteRequest;
}

@Override
public String toString() {
return "PublishRequest{" +
"eventTypeName='" + eventTypeName + '\'' +
", eventsAsString='" + eventsRaw + '\'' +
", client=" + client +
", consumerTags=" + consumerTags +
", desiredPublishingTimeout=" + desiredPublishingTimeout +
", isDeleteRequest=" + isDeleteRequest +
'}';
}

public static <T> PublishRequest<T> asPublish(final String eventTypeName,
final T eventsRaw,
final Client client,
final Map<HeaderTag, String> consumerTags,
final int desiredPublishingTimeout) {
return new PublishRequest<>(eventTypeName, eventsRaw, client,
consumerTags, desiredPublishingTimeout, false);
}

public static <T> PublishRequest<T> asDelete(final String eventTypeName,
final T eventsRaw,
final Client client) {
return new PublishRequest<>(eventTypeName, eventsRaw, client,
Collections.emptyMap(), 0, true);
}

}
Loading