/
KafkaJsonSchemaContentMatcher.kt
83 lines (69 loc) · 2.38 KB
/
KafkaJsonSchemaContentMatcher.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package au.com.dius.pact.core.matchers
import au.com.dius.pact.core.model.ContentType
import au.com.dius.pact.core.model.OptionalBody
import au.com.dius.pact.core.support.Json
import au.com.dius.pact.core.support.Result
import au.com.dius.pact.core.support.json.JsonException
import au.com.dius.pact.core.support.json.JsonParser
import au.com.dius.pact.core.support.json.KafkaSchemaRegistryWireFormatter
import io.pact.plugins.jvm.core.InteractionContents
import io.github.oshai.kotlinlogging.KLogging
class KafkaJsonSchemaContentMatcher : ContentMatcher {
override fun matchBody(
expected: OptionalBody,
actual: OptionalBody,
context: MatchingContext
): BodyMatchResult {
val raw = removeMagicBytes(actual)
if (isInvalidActualValue(expected, raw))
return getInvalidActualJsonResult(expected, raw)
return JsonContentMatcher.matchBody(expected, raw, context)
}
private fun removeMagicBytes(optionalBody: OptionalBody): OptionalBody {
return optionalBody.copy(value = KafkaSchemaRegistryWireFormatter.removeMagicBytes(optionalBody.value))
}
private fun isInvalidActualValue(
expected: OptionalBody,
decodedActualOptionalBody: OptionalBody
) = expected.isPresent() && !isValidJson(decodedActualOptionalBody.value)
private fun getInvalidActualJsonResult(
expected: OptionalBody,
actual: OptionalBody
) = BodyMatchResult(
null, listOf(
BodyItemMatchResult(
"$",
listOf(
BodyMismatch(
expected.valueAsString(),
actual.valueAsString(),
"Expected json body but received '${actual.valueAsString()}'"
)
)
)
)
)
override fun setupBodyFromConfig(
bodyConfig: Map<String, Any?>
): Result<List<InteractionContents>, String> {
return Result.Ok(listOf(InteractionContents("",
OptionalBody.body(
Json.toJson(bodyConfig["body"]).serialise().toByteArray(),
ContentType.KAFKA_SCHEMA_REGISTRY_JSON
)
)))
}
private fun isValidJson(value: ByteArray?): Boolean {
if(value == null)
return false
return value.isEmpty() || isParsableJson(value)
}
private fun isParsableJson(value: ByteArray): Boolean = try {
JsonParser.parseString(String(value))
true
} catch (e: JsonException) {
logger.debug("Swallowed Exception deliberately", e)
false
}
companion object : KLogging()
}