-
Notifications
You must be signed in to change notification settings - Fork 20
/
Events.scala
455 lines (402 loc) · 18.2 KB
/
Events.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
package org.zalando.kanadi.api
import java.net.URI
import java.time.OffsetDateTime
import defaults._
import org.apache.pekko.http.scaladsl.HttpExt
import org.apache.pekko.http.scaladsl.marshalling.Marshal
import org.apache.pekko.http.scaladsl.model._
import org.apache.pekko.http.scaladsl.model.headers.RawHeader
import org.apache.pekko.http.scaladsl.unmarshalling.Unmarshal
import org.apache.pekko.stream.Materializer
import com.typesafe.scalalogging.{Logger, LoggerTakingImplicit}
import enumeratum._
import io.circe.Decoder.Result
import io.circe.syntax._
import io.circe.{Decoder, Encoder, Json}
import org.zalando.kanadi.models.HttpHeaders.XFlowID
import org.zalando.kanadi.models._
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
sealed abstract class Event[T](val data: T) {
def getMetadata: Option[Metadata] = this match {
case e: Event.DataChange[_] => Some(e.metadata)
case e: Event.Business[_] => Some(e.metadata)
case _: Event.Undefined[_] => None
}
}
object Event {
final case class DataChange[T](override val data: T,
dataType: String,
dataOperation: DataOperation,
metadata: Metadata)
extends Event[T](data)
object DataChange {
implicit def eventDataChangeEncoder[T](implicit encoder: Encoder[T]): Encoder[DataChange[T]] =
Encoder.forProduct4(
"data",
"data_type",
"data_op",
"metadata"
)(x => DataChange.unapply(x).get)
implicit def eventDataChangeDecoder[T](implicit decoder: Decoder[T]): Decoder[DataChange[T]] =
Decoder.forProduct4(
"data",
"data_type",
"data_op",
"metadata"
)(DataChange.apply)
}
final case class Business[T](override val data: T, metadata: Metadata = Metadata()) extends Event[T](data)
object Business {
implicit def eventBusinessEncoder[T](implicit encoder: Encoder[T]): Encoder[Business[T]] =
Encoder.instance[Business[T]] { x =>
val metadata = Json.obj(
"metadata" -> x.metadata.asJson
)
val data = x.data.asJson
data.deepMerge(metadata)
}
implicit def eventBusinessDecoder[T](implicit
decoder: Decoder[T]
): Decoder[Business[T]] =
Decoder.instance[Business[T]] { c =>
for {
metadata <- c.downField("metadata").as[Metadata]
data <- c.as[T]
} yield Business(data, metadata)
}
}
final case class Undefined[T](override val data: T) extends Event[T](data)
object Undefined {
implicit def eventUndefinedEncoder[T](implicit encoder: Encoder[T]): Encoder[Undefined[T]] =
Encoder.instance[Undefined[T]] { x =>
x.data.asJson
}
implicit def eventUndefinedDecoder[T](implicit
decoder: Decoder[T]
): Decoder[Undefined[T]] =
Decoder.instance[Undefined[T]] { c =>
for {
data <- c.as[T]
} yield Undefined(data)
}
}
implicit def eventEncoder[T](implicit encoder: Encoder[T]): Encoder[Event[T]] =
Encoder.instance[Event[T]] {
case e: Event.DataChange[T] => e.asJson
case e: Event.Business[T] => e.asJson
case e: Event.Undefined[T] => e.asJson
}
implicit def eventDecoder[T](implicit decoder: Decoder[T]): Decoder[Event[T]] =
Decoder.instance[Event[T]] { c =>
val dataOpR = c.downField("data_op").as[Option[String]]
val metadataR = c.downField("metadata").as[Option[Metadata]]
(for {
dataOp <- dataOpR
metadata <- metadataR
} yield (dataOp, metadata) match {
case (Some(_), Some(_)) =>
c.as[Event.DataChange[T]]: Result[Event[T]]
case (None, Some(_)) =>
c.as[Event.Business[T]]: Result[Event[T]]
case _ =>
c.as[Event.Undefined[T]]: Result[Event[T]]
}).joinRight
}
}
sealed abstract class DataOperation(val id: String) extends EnumEntry with Product with Serializable {
override val entryName = id
}
object DataOperation extends Enum[DataOperation] {
val values = findValues
final case object Create extends DataOperation("C")
final case object Update extends DataOperation("U")
final case object Delete extends DataOperation("D")
final case object Snapshot extends DataOperation("S")
implicit val dataOperationEncoder: Encoder[DataOperation] =
enumeratum.Circe.encoder(DataOperation)
implicit val dataOperationDecoder: Decoder[DataOperation] =
enumeratum.Circe.decoder(DataOperation)
}
final case class Metadata(eid: EventId = EventId.random,
occurredAt: OffsetDateTime = OffsetDateTime.now,
eventType: Option[EventTypeName] = None,
receivedAt: Option[OffsetDateTime] = None,
parentEids: Option[List[EventId]] = None,
flowId: Option[FlowId] = None,
partition: Option[Partition] = None,
partitionCompactionKey: Option[PartitionCompactionKey] = None,
spanCtx: Option[SpanCtx] = None,
publishedBy: Option[PublishedBy] = None)
object Metadata {
import org.zalando.kanadi.models.codec.FlowIdCodec._
implicit val metadataEncoder: Encoder[Metadata] = Encoder.forProduct10(
"eid",
"occurred_at",
"event_type",
"received_at",
"parent_eids",
"flow_id",
"partition",
"partition_compaction_key",
"span_ctx",
"published_by"
)(x => Metadata.unapply(x).get)
implicit val metadataDecoder: Decoder[Metadata] = Decoder.forProduct10(
"eid",
"occurred_at",
"event_type",
"received_at",
"parent_eids",
"flow_id",
"partition",
"partition_compaction_key",
"span_ctx",
"published_by"
)(Metadata.apply)
}
object Events {
final case class BatchItemResponse(eid: Option[EventId],
publishingStatus: PublishingStatus,
step: Option[Step],
detail: Option[String])
object BatchItemResponse {
implicit val batchItemResponseEncoder: Encoder[BatchItemResponse] =
Encoder.forProduct4(
"eid",
"publishing_status",
"step",
"detail"
)(x => BatchItemResponse.unapply(x).get)
implicit val batchItemResponseDecoder: Decoder[BatchItemResponse] =
Decoder.forProduct4(
"eid",
"publishing_status",
"step",
"detail"
)(BatchItemResponse.apply)
}
sealed abstract class PublishingStatus(val id: String) extends EnumEntry with Product with Serializable {
override val entryName = id
}
object PublishingStatus extends Enum[PublishingStatus] {
val values = findValues
final case object Submitted extends PublishingStatus("submitted")
final case object Failed extends PublishingStatus("failed")
final case object Aborted extends PublishingStatus("aborted")
implicit val eventsErrorsPublishingStatusEncoder: Encoder[PublishingStatus] =
enumeratum.Circe.encoder(PublishingStatus)
implicit val eventsErrorsPublishingStatusDecoder: Decoder[PublishingStatus] =
enumeratum.Circe.decoder(PublishingStatus)
}
sealed abstract class Step(val id: String) extends EnumEntry with Product with Serializable {
override val entryName = id
}
object Step extends Enum[Step] {
val values = findValues
final case object None extends Step("none")
final case object Validating extends Step("validating")
final case object Partitioning extends Step("partitioning")
final case object Enriching extends Step("enriching")
final case object Publishing extends Step("publishing")
implicit val eventsErrorsStepEncoder: Encoder[Step] =
enumeratum.Circe.encoder(Step)
implicit val eventsErrorsStepDecoder: Decoder[Step] =
enumeratum.Circe.decoder(Step)
}
sealed abstract class Errors extends Exception
object Errors {
final case class EventValidation(batchItemResponse: List[BatchItemResponse]) extends Errors {
override def getMessage: String =
s"Error publishing events, errors are ${batchItemResponse.mkString("\n")}"
}
}
}
case class Events(baseUri: URI, authTokenProvider: Option[AuthTokenProvider] = None)(implicit
kanadiHttpConfig: HttpConfig,
exponentialBackoffConfig: ExponentialBackoffConfig,
http: HttpExt,
materializer: Materializer)
extends EventsInterface {
private val baseUri_ = Uri(baseUri.toString)
protected val logger: LoggerTakingImplicit[FlowId] = Logger.takingImplicit[FlowId](classOf[Events])
/** Publishes a batch of [[Event]] 's of this [[org.zalando.kanadi.models.EventTypeName]]. All items must be of the
* EventType identified by name.
*
* Reception of Events will always respect the configuration of its [[org.zalando.kanadi.models.EventTypeName]] with
* respect to validation, enrichment and partition. The steps performed on reception of incoming message are:
*
* Every validation rule specified for the [[EventType]] will be checked in order against the incoming Events.
* Validation rules are evaluated in the order they are defined and the Event is rejected in the first case of
* failure. If the offending validation rule provides information about the violation it will be included in the
* BatchItemResponse. If the [[org.zalando.kanadi.models.EventTypeName]] defines schema validation it will be
* performed at this moment. The size of each Event will also be validated. The maximum size per Event is 999,000
* bytes. We use the batch input to measure the size of events, so unnecessary spaces, tabs, and carriage returns
* will count towards the event size. Once the validation succeeded, the content of the Event is updated according to
* the enrichment rules in the order the rules are defined in the [[EventType]]. No preexisting value might be
* changed (even if added by an enrichment rule). Violations on this will force the immediate rejection of the Event.
* The invalid overwrite attempt will be included in the item's BatchItemResponse object. The incoming Event's
* relative ordering is evaluated according to the rule on the [[EventType]]. Failure to evaluate the rule will
* reject the Event.
*
* Given the batched nature of this operation, any violation on validation or failures on enrichment or partitioning
* will cause the whole batch to be rejected, i.e. none of its elements are pushed to the underlying broker.
*
* Failures on writing of specific partitions to the broker might influence other partitions. Failures at this stage
* will fail only the affected partitions.
*
* @param name
* Name of the EventType
* @param events
* The Event being published
* @param encoder
* @param flowId
* The flow id of the request, which is written into the logs and passed to called services. Helpful for
* operational troubleshooting and log analysis.
* @tparam T
* @return
*/
def publish[T](name: EventTypeName, events: List[Event[T]], fillMetadata: Boolean = true)(implicit
encoder: Encoder[T],
flowId: FlowId = randomFlowId(),
executionContext: ExecutionContext
): Future[Unit] =
if (kanadiHttpConfig.failedPublishEventRetry) {
publishWithRecover(name, events, List.empty, fillMetadata, exponentialBackoffConfig.initialDelay, count = 0)
} else publishBase(name, events, fillMetadata)
private[api] def publishWithRecover[T](name: EventTypeName,
events: List[Event[T]],
currentNotValidEvents: List[Events.BatchItemResponse],
fillMetadata: Boolean,
currentDuration: FiniteDuration,
count: Int)(implicit
encoder: Encoder[T],
flowId: FlowId = randomFlowId(),
executionContext: ExecutionContext
): Future[Unit] = {
def retryUnexpectedFailure(events: List[Event[T]],
count: Int,
e: Exception,
currentDuration: FiniteDuration): Future[Unit] = {
val eventIds = events.flatMap(x => eventWithUndefinedEventIdFallback(x))
if (count > exponentialBackoffConfig.maxRetries) {
logger.error(
s"Max retry failed for publishing events, event id's still not submitted are ${eventIds.map(_.id).mkString(",")}")
Future.failed(e)
} else {
val newDuration = exponentialBackoffConfig.calculate(count, currentDuration)
logger.warn(
s"Events with eid's ${eventIds.map(_.id).mkString(",")} failed to submit, retrying in ${newDuration.toMillis} millis")
org.apache.pekko.pattern.after(newDuration, http.system.scheduler)(
publishWithRecover(name, events, currentNotValidEvents, fillMetadata, newDuration, count + 1))
}
}
publishBase(name, events, fillMetadata).recoverWith {
case Events.Errors.EventValidation(errors) =>
if (count > exponentialBackoffConfig.maxRetries) {
val finalEvents =
(errors ++ currentNotValidEvents).filter(_.publishingStatus != Events.PublishingStatus.Submitted)
logger.error(
s"Max retry failed for publishing events, event id's still not submitted are ${finalEvents.flatMap(_.eid.map(_.id)).mkString(",")}")
Future.failed(Events.Errors.EventValidation(finalEvents))
} else {
val (noNeedToRetryResponse, toRetryResponse) = errors.partition(response =>
// If there is a validation error sending the event there is no point in retrying it
response.step
.contains(Events.Step.Validating) || response.publishingStatus == Events.PublishingStatus.Submitted)
val eventsToRetry = events.filter { event =>
eventWithUndefinedEventIdFallback(event) match {
case Some(eid) => toRetryResponse.exists(_.eid.contains(eid))
case None => true // Lets just retry events which don't have valid eid's
}
}
val newDuration = exponentialBackoffConfig.calculate(count, currentDuration)
logger.warn(
s"Events with eid's ${toRetryResponse.flatMap(_.eid).map(_.id).mkString(",")} failed to submit, retrying in ${newDuration.toMillis} millis")
val invalidSchemaEvents =
noNeedToRetryResponse.filter(_.publishingStatus != Events.PublishingStatus.Submitted)
if (invalidSchemaEvents.nonEmpty) {
val errorDetails = invalidSchemaEvents
.map { response =>
val detail = response.detail
val eventId = response.eid.map(_.id)
s"eid: ${eventId.getOrElse("N/A")}, detail: ${detail.getOrElse("N/A")}"
}
.mkString(",")
logger.error(s"Events $errorDetails did not pass validation schema, not submitting")
}
val newNotValidEvents = (currentNotValidEvents ++ noNeedToRetryResponse).distinct
org.apache.pekko.pattern.after(newDuration, http.system.scheduler)(
publishWithRecover(name, eventsToRetry, newNotValidEvents, fillMetadata, newDuration, count + 1))
}
case e: RuntimeException
if e.getMessage.contains(
"The http server closed the connection unexpectedly before delivering responses for") =>
retryUnexpectedFailure(events, count, e, currentDuration)
case httpServiceError: HttpServiceError
if httpServiceError.httpResponse.status.intValue().toString.startsWith("5") =>
retryUnexpectedFailure(events, count, httpServiceError, currentDuration)
}
}
/** If we have an event of type [[Event.Undefined]], this function will try and manually parse the event to see if it
* has an "eid" field. The "eid" field is not mandatory in [[Event.Undefined]] however there is a chance it can still
* be there.
*
* @param event
* @param encoder
* @tparam T
* @return
*/
private[api] def eventWithUndefinedEventIdFallback[T](event: Event[T])(implicit
encoder: Encoder[T]): Option[EventId] =
event.getMetadata.map(_.eid) orElse {
(event.data.asJson \\ "eid").headOption.flatMap { json =>
json.as[EventId].toOption
}
}
private[api] def publishBase[T](name: EventTypeName, events: List[Event[T]], fillMetadata: Boolean = true)(implicit
encoder: Encoder[T],
flowId: FlowId = randomFlowId(),
executionContext: ExecutionContext
): Future[Unit] = {
import org.mdedetrich.pekko.http.support.CirceHttpSupport._
val uri =
baseUri_.withPath(baseUri_.path / "event-types" / name.name / "events")
val baseHeaders = List(RawHeader(XFlowID, flowId.value))
val finalEvents = if (fillMetadata) {
events.map {
case e: Event.Business[_] =>
e.copy(metadata = e.metadata.copy(eventType = Some(e.metadata.eventType.getOrElse(name))))
case e: Event.DataChange[_] =>
e.copy(metadata = e.metadata.copy(eventType = Some(e.metadata.eventType.getOrElse(name))))
case e: Event.Undefined[_] => e
}
} else events
for {
headers <- authTokenProvider match {
case None => Future.successful(baseHeaders)
case Some(futureProvider) =>
futureProvider.value().map { authToken =>
toHeader(authToken) +: baseHeaders
}
}
entity <- Marshal(finalEvents).to[RequestEntity]
request = HttpRequest(HttpMethods.POST, uri, headers, entity)
_ = logger.debug(request.toString)
response <- http.singleRequest(request)
result <- {
response.status match {
case StatusCodes.UnprocessableEntity | StatusCodes.MultiStatus =>
Unmarshal(response.entity.httpEntity.withContentType(ContentTypes.`application/json`))
.to[List[Events.BatchItemResponse]]
.map(x => throw Events.Errors.EventValidation(x))
case s if s.isSuccess() =>
response.discardEntityBytes()
Future.successful(())
case _ =>
processNotSuccessful(request, response)
}
}
} yield result
}
}