Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
92 commits
Select commit Hold shift + click to select a range
2cef977
Adjust padding to accommodate good enough headers and don't include …
akrambek Oct 25, 2023
d201582
Merge branch 'develop' into feature/consumer-group-cont
akrambek Oct 25, 2023
76bf9de
Merge branch 'feature/consumer-group-cont' into develop
akrambek Oct 26, 2023
29ae79c
Merge branch 'aklivity:develop' into develop
akrambek Oct 30, 2023
ec1b39e
Merge branch 'aklivity:develop' into develop
akrambek Oct 30, 2023
51a9f0e
Merge branch 'aklivity:develop' into develop
akrambek Oct 31, 2023
4394783
Merge branch 'aklivity:develop' into develop
akrambek Oct 31, 2023
e8696ce
Merge branch 'aklivity:develop' into develop
akrambek Nov 2, 2023
51c37b1
Merge branch 'aklivity:develop' into develop
akrambek Nov 2, 2023
5da5f04
Merge branch 'aklivity:develop' into develop
akrambek Nov 2, 2023
db1e17c
Merge branch 'aklivity:develop' into develop
akrambek Nov 4, 2023
40f73dc
Merge branch 'aklivity:develop' into develop
akrambek Nov 6, 2023
d1a0492
Merge branch 'aklivity:develop' into develop
akrambek Nov 23, 2023
45799ce
Merge branch 'aklivity:develop' into develop
akrambek Nov 29, 2023
1e55162
Merge branch 'aklivity:develop' into develop
akrambek Nov 30, 2023
fedc41f
Merge branch 'aklivity:develop' into develop
akrambek Dec 4, 2023
18a8d74
Merge branch 'aklivity:develop' into develop
akrambek Dec 4, 2023
f160aad
Merge branch 'aklivity:develop' into develop
akrambek Dec 4, 2023
e0e7d5a
Merge branch 'aklivity:develop' into develop
akrambek Dec 6, 2023
9f4a8a6
Merge branch 'aklivity:develop' into develop
akrambek Dec 8, 2023
456f111
Merge branch 'aklivity:develop' into develop
akrambek Dec 8, 2023
0d27262
Merge branch 'aklivity:develop' into develop
akrambek Dec 9, 2023
9fe7a91
Merge branch 'aklivity:develop' into develop
akrambek Dec 11, 2023
7e3d237
Merge branch 'aklivity:develop' into develop
akrambek Dec 12, 2023
33c4411
Merge branch 'aklivity:develop' into develop
akrambek Dec 13, 2023
fe9e318
Merge branch 'aklivity:develop' into develop
akrambek Dec 14, 2023
d8b5e5c
Merge branch 'aklivity:develop' into develop
akrambek Dec 14, 2023
ebca7ef
Merge branch 'aklivity:develop' into develop
akrambek Dec 18, 2023
5e3e059
Merge branch 'aklivity:develop' into develop
akrambek Dec 22, 2023
ee71db9
Merge branch 'aklivity:develop' into develop
akrambek Dec 24, 2023
0b7a15a
Merge branch 'aklivity:develop' into develop
akrambek Dec 25, 2023
be13489
Merge branch 'aklivity:develop' into develop
akrambek Dec 26, 2023
95df84c
Merge branch 'aklivity:develop' into develop
akrambek Dec 26, 2023
3ebdbf5
Merge branch 'aklivity:develop' into develop
akrambek Dec 28, 2023
24ad9e1
Merge branch 'aklivity:develop' into develop
akrambek Dec 30, 2023
6d21fec
Merge branch 'aklivity:develop' into develop
akrambek Dec 31, 2023
368a0a6
Merge branch 'aklivity:develop' into develop
akrambek Dec 31, 2023
7069f1a
Merge branch 'aklivity:develop' into develop
akrambek Jan 2, 2024
09b7041
Merge branch 'aklivity:develop' into develop
akrambek Jan 3, 2024
98f1faa
Merge branch 'aklivity:develop' into develop
akrambek Jan 4, 2024
371391a
Merge branch 'aklivity:develop' into develop
akrambek Jan 5, 2024
c6a0882
Merge branch 'aklivity:develop' into develop
akrambek Jan 8, 2024
f99f009
Merge branch 'aklivity:develop' into develop
akrambek Jan 9, 2024
a110b68
Merge branch 'aklivity:develop' into develop
akrambek Jan 11, 2024
80c4625
Merge branch 'aklivity:develop' into develop
akrambek Jan 16, 2024
6617e20
Merge branch 'aklivity:develop' into develop
akrambek Jan 19, 2024
dea9f53
Merge branch 'aklivity:develop' into develop
akrambek Jan 20, 2024
b74db57
Merge branch 'aklivity:develop' into develop
akrambek Jan 23, 2024
7af00ab
WIP
akrambek Jan 23, 2024
59113ca
WIP
akrambek Jan 24, 2024
8e0f2c5
WIP
akrambek Jan 24, 2024
a6309d6
WIP
akrambek Jan 25, 2024
2ca7d2b
WIP
akrambek Jan 25, 2024
1778cdc
WIP
akrambek Jan 26, 2024
40f4916
WIP
akrambek Jan 29, 2024
eaf19f8
WIP
akrambek Jan 30, 2024
4617b54
Merge branch 'aklivity:develop' into develop
akrambek Jan 30, 2024
448f4a5
Support of catalog in grpc
akrambek Jan 31, 2024
4d2dd7b
WIP
akrambek Jan 31, 2024
a96c0f7
Added test
akrambek Jan 31, 2024
b3b421d
Merge branch 'aklivity:develop' into develop
akrambek Jan 31, 2024
9808063
Fix NPE
akrambek Jan 31, 2024
0981d06
Fix checkstyle
akrambek Jan 31, 2024
aad450c
Apply feedback from PR
akrambek Jan 31, 2024
84c2d43
Apply feedback
akrambek Feb 1, 2024
ad978c5
WIP
akrambek Feb 1, 2024
d6ae81b
Refactor binding binding catalog config
akrambek Feb 1, 2024
73d64b1
Merge branch 'aklivity:develop' into develop
akrambek Feb 1, 2024
81ea8f3
Merge branch 'develop' into story/697
akrambek Feb 1, 2024
8aeb377
Fix catch up merge
akrambek Feb 1, 2024
d9df80c
Revert "Fix catch up merge"
akrambek Feb 1, 2024
26e9f6c
Fix compilation error
akrambek Feb 1, 2024
5da65fe
Fix remaining issues after catch up merge
akrambek Feb 1, 2024
b5da3ca
Apply feedback from PR
akrambek Feb 1, 2024
7038697
Fix potential NPE
akrambek Feb 1, 2024
f9ddcc6
Add line separator
akrambek Feb 1, 2024
d95b3ce
Apply feedback from PR
akrambek Feb 1, 2024
08dcb73
Apply feedback from PR
akrambek Feb 1, 2024
6a39338
Fix checkstyle
akrambek Feb 1, 2024
7bcb511
Fix checkstyle
akrambek Feb 1, 2024
cdad780
Fix checkstyle
akrambek Feb 1, 2024
5860171
Apply feedback from PR
akrambek Feb 2, 2024
a71577e
Apply remaining changes
akrambek Feb 2, 2024
64022a5
Apply feedback from PR
akrambek Feb 2, 2024
1edd300
Fix typo
akrambek Feb 2, 2024
6fbe66c
Apply feedback from PR
akrambek Feb 2, 2024
9f19eb2
Revert back the change
akrambek Feb 2, 2024
c26d704
Revert back the change
akrambek Feb 2, 2024
70f82f0
Revert back the change
akrambek Feb 2, 2024
7cfb3fc
Remove extra line
akrambek Feb 2, 2024
9a4aaa7
Remove extra line
akrambek Feb 2, 2024
f825538
Ignore test due to sporadic github action failure
akrambek Feb 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import static io.aklivity.zilla.runtime.binding.grpc.internal.types.stream.GrpcType.BASE64;
import static io.aklivity.zilla.runtime.binding.grpc.internal.types.stream.GrpcType.TEXT;
import static io.aklivity.zilla.runtime.engine.catalog.CatalogHandler.NO_SCHEMA_ID;
import static java.util.Arrays.asList;
import static java.util.stream.Collectors.toList;

Expand All @@ -28,24 +29,31 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.LongFunction;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;

import org.agrona.AsciiSequenceView;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.ObjectHashSet;

import io.aklivity.zilla.runtime.binding.grpc.config.GrpcMethodConfig;
import io.aklivity.zilla.runtime.binding.grpc.config.GrpcOptionsConfig;
import io.aklivity.zilla.runtime.binding.grpc.config.GrpcProtobufConfig;
import io.aklivity.zilla.runtime.binding.grpc.internal.types.Array32FW;
import io.aklivity.zilla.runtime.binding.grpc.internal.types.HttpHeaderFW;
import io.aklivity.zilla.runtime.binding.grpc.internal.types.String16FW;
import io.aklivity.zilla.runtime.binding.grpc.internal.types.String8FW;
import io.aklivity.zilla.runtime.binding.grpc.internal.types.stream.GrpcMetadataFW;
import io.aklivity.zilla.runtime.binding.grpc.internal.types.stream.GrpcType;
import io.aklivity.zilla.runtime.binding.grpc.internal.types.stream.HttpBeginExFW;
import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler;
import io.aklivity.zilla.runtime.engine.config.BindingConfig;
import io.aklivity.zilla.runtime.engine.config.CatalogedConfig;
import io.aklivity.zilla.runtime.engine.config.KindConfig;
import io.aklivity.zilla.runtime.engine.config.SchemaConfig;

public final class GrpcBindingConfig
{
Expand All @@ -56,28 +64,41 @@ public final class GrpcBindingConfig
private static final byte[] HEADER_BIN_SUFFIX = new byte[4];
private static final byte[] GRPC_PREFIX = "grpc-".getBytes();
private static final byte[] BIN_SUFFIX = "-bin".getBytes();
private final HttpGrpcHeaderHelper helper;

public final long id;
public final String name;
public final KindConfig kind;
public final GrpcOptionsConfig options;
public final List<GrpcRouteConfig> routes;

private final GrpcProtobufParser parser;
private final HttpGrpcHeaderHelper helper;
private final Set<GrpcCatalogSchema> catalogs;

public GrpcBindingConfig(
BindingConfig binding,
MutableDirectBuffer metadataBuffer)
MutableDirectBuffer metadataBuffer,
LongFunction<CatalogHandler> supplyCatalog)
{
this.id = binding.id;
this.name = binding.name;
this.kind = binding.kind;
this.options = GrpcOptionsConfig.class.cast(binding.options);
this.routes = binding.routes.stream().map(GrpcRouteConfig::new).collect(toList());
this.parser = new GrpcProtobufParser();
this.helper = new HttpGrpcHeaderHelper(metadataBuffer);
Set<GrpcCatalogSchema> catalogs = new ObjectHashSet<>();
for (CatalogedConfig catalog : binding.catalogs)
{
CatalogHandler handler = supplyCatalog.apply(catalog.id);
for (SchemaConfig schema : catalog.schemas)
{
catalogs.add(new GrpcCatalogSchema(handler, schema.subject, schema.version));
}
}
this.catalogs = catalogs;
}


public GrpcRouteConfig resolve(
long authorization,
CharSequence service,
Expand Down Expand Up @@ -107,13 +128,12 @@ public GrpcMethodResult resolveMethod(
final CharSequence serviceName = serviceNameHeader != null ? serviceNameHeader : matcher.group(SERVICE_NAME);
final String methodName = matcher.group(METHOD);

final GrpcMethodConfig method = options.protobufs.stream()
.map(p -> p.services.stream().filter(s -> s.service.equals(serviceName)).findFirst().orElse(null))
.filter(Objects::nonNull)
.map(s -> s.methods.stream().filter(m -> m.method.equals(methodName)).findFirst().orElse(null))
.filter(Objects::nonNull)
.findFirst()
.orElse(null);
GrpcMethodConfig method = resolveMethod(catalogs, serviceName, methodName);

if (method == null && options != null)
{
method = resolveMethod(options.protobufs, serviceName, methodName);
}

if (method != null)
{
Expand All @@ -133,6 +153,36 @@ public GrpcMethodResult resolveMethod(
return methodResolver;
}

private GrpcMethodConfig resolveMethod(
Set<GrpcCatalogSchema> catalogs,
CharSequence serviceName,
String methodName)
{
return resolveMethod(catalogs.stream().map(GrpcCatalogSchema::resolveProtobuf), serviceName, methodName);
}

private GrpcMethodConfig resolveMethod(
List<GrpcProtobufConfig> protobufs,
CharSequence serviceName,
String methodName)
{
return resolveMethod(protobufs.stream(), serviceName, methodName);
}

private GrpcMethodConfig resolveMethod(
Stream<GrpcProtobufConfig> protobufs,
CharSequence serviceName,
String methodName)
{
return protobufs
.map(p -> p.services.stream().filter(s -> s.service.equals(serviceName)).findFirst().orElse(null))
.filter(Objects::nonNull)
.map(s -> s.methods.stream().filter(m -> m.method.equals(methodName)).findFirst().orElse(null))
.filter(Objects::nonNull)
.findFirst()
.orElse(null);
}

private static final class HttpGrpcHeaderHelper
{
private static final Pattern PERIOD_PATTERN = Pattern.compile("([0-9]+)([HMSmun])");
Expand Down Expand Up @@ -187,7 +237,6 @@ private static final class HttpGrpcHeaderHelper
public String16FW authority;
public String16FW te;


HttpGrpcHeaderHelper(
MutableDirectBuffer metadataBuffer)
{
Expand Down Expand Up @@ -350,4 +399,39 @@ private long parsePeriod(
return milliseconds;
}
}

final class GrpcCatalogSchema
{
final CatalogHandler handler;
final String subject;
final String version;

GrpcProtobufConfig protobuf;

int schemaId = NO_SCHEMA_ID;

GrpcCatalogSchema(
CatalogHandler handler,
String subject,
String version)
{
this.handler = handler;
this.subject = subject;
this.version = version;
}

private GrpcProtobufConfig resolveProtobuf()
{
final int newSchemaId = handler.resolve(subject, version);

if (schemaId != newSchemaId)
{
schemaId = newSchemaId;
String schema = handler.resolve(schemaId);
protobuf = parser.parse(null, schema);
}

return protobuf;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import static java.util.stream.Collectors.toList;

import java.util.List;
import java.util.Set;
import java.util.function.Function;

import jakarta.json.Json;
Expand All @@ -29,26 +28,19 @@
import jakarta.json.JsonValue;
import jakarta.json.bind.adapter.JsonbAdapter;

import org.agrona.collections.ObjectHashSet;
import org.antlr.v4.runtime.BailErrorStrategy;
import org.antlr.v4.runtime.CharStream;
import org.antlr.v4.runtime.CharStreams;
import org.antlr.v4.runtime.CommonTokenStream;
import org.antlr.v4.runtime.tree.ParseTreeWalker;

import io.aklivity.zilla.runtime.binding.grpc.config.GrpcOptionsConfig;
import io.aklivity.zilla.runtime.binding.grpc.config.GrpcProtobufConfig;
import io.aklivity.zilla.runtime.binding.grpc.config.GrpcServiceConfig;
import io.aklivity.zilla.runtime.binding.grpc.internal.GrpcBinding;
import io.aklivity.zilla.runtime.binding.grpc.internal.parser.Protobuf3Lexer;
import io.aklivity.zilla.runtime.binding.grpc.internal.parser.Protobuf3Parser;
import io.aklivity.zilla.runtime.engine.config.ConfigAdapterContext;
import io.aklivity.zilla.runtime.engine.config.OptionsConfig;
import io.aklivity.zilla.runtime.engine.config.OptionsConfigAdapterSpi;

public final class GrpcOptionsConfigAdapter implements OptionsConfigAdapterSpi, JsonbAdapter<OptionsConfig, JsonObject>
{
private static final String SERVICES_NAME = "services";

private final GrpcProtobufParser parser = new GrpcProtobufParser();

private Function<String, String> readURL;

@Override
Expand Down Expand Up @@ -111,18 +103,8 @@ private GrpcProtobufConfig asProtobuf(
JsonValue value)
{
final String location = ((JsonString) value).getString();
final String protoService = readURL.apply(location);
CharStream input = CharStreams.fromString(protoService);
Protobuf3Lexer lexer = new Protobuf3Lexer(input);
CommonTokenStream tokens = new CommonTokenStream(lexer);

Protobuf3Parser parser = new Protobuf3Parser(tokens);
parser.setErrorHandler(new BailErrorStrategy());
ParseTreeWalker walker = new ParseTreeWalker();
Set<GrpcServiceConfig> services = new ObjectHashSet<>();
GrpcServiceDefinitionListener listener = new GrpcServiceDefinitionListener(services);
walker.walk(listener, parser.proto());

return new GrpcProtobufConfig(location, services);
final String protobuf = readURL.apply(location);

return parser.parse(location, protobuf);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2021-2023 Aklivity Inc
*
* Licensed under the Aklivity Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* https://www.aklivity.io/aklivity-community-license/
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package io.aklivity.zilla.runtime.binding.grpc.internal.config;

import java.util.Set;

import org.agrona.collections.ObjectHashSet;
import org.antlr.v4.runtime.BailErrorStrategy;
import org.antlr.v4.runtime.CharStream;
import org.antlr.v4.runtime.CharStreams;
import org.antlr.v4.runtime.CommonTokenStream;
import org.antlr.v4.runtime.tree.ParseTreeWalker;

import io.aklivity.zilla.runtime.binding.grpc.config.GrpcProtobufConfig;
import io.aklivity.zilla.runtime.binding.grpc.config.GrpcServiceConfig;
import io.aklivity.zilla.runtime.binding.grpc.internal.parser.Protobuf3Lexer;
import io.aklivity.zilla.runtime.binding.grpc.internal.parser.Protobuf3Parser;

public final class GrpcProtobufParser
{
private final ParseTreeWalker walker;
private final BailErrorStrategy errorStrategy;
private final Protobuf3Lexer lexer;
private CommonTokenStream tokens;
private final Protobuf3Parser parser;

public GrpcProtobufParser()
{
this.walker = new ParseTreeWalker();
this.errorStrategy = new BailErrorStrategy();
this.lexer = new Protobuf3Lexer(null);
this.parser = new Protobuf3Parser(null);
this.tokens = new CommonTokenStream(lexer);
parser.setErrorHandler(errorStrategy);
}

public GrpcProtobufConfig parse(
String location,
String schema)
{
CharStream input = CharStreams.fromString(schema);
lexer.reset();
lexer.setInputStream(input);

tokens.setTokenSource(lexer);
parser.setTokenStream(tokens);

Set<GrpcServiceConfig> services = new ObjectHashSet<>();
GrpcServiceDefinitionListener listener = new GrpcServiceDefinitionListener(services);
walker.walk(listener, parser.proto());

return new GrpcProtobufConfig(location, services);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/
package io.aklivity.zilla.runtime.binding.grpc.internal.stream;

import java.util.function.LongFunction;
import java.util.function.LongUnaryOperator;

import org.agrona.DirectBuffer;
Expand Down Expand Up @@ -50,6 +51,7 @@
import io.aklivity.zilla.runtime.engine.EngineContext;
import io.aklivity.zilla.runtime.engine.binding.BindingHandler;
import io.aklivity.zilla.runtime.engine.binding.function.MessageConsumer;
import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler;
import io.aklivity.zilla.runtime.engine.config.BindingConfig;

public class GrpcClientFactory implements GrpcStreamFactory
Expand Down Expand Up @@ -114,6 +116,7 @@ public class GrpcClientFactory implements GrpcStreamFactory
private final MutableDirectBuffer metadataBuffer;
private final MutableDirectBuffer extBuffer;
private final BindingHandler streamFactory;
private final LongFunction<CatalogHandler> supplyCatalog;
private final LongUnaryOperator supplyInitialId;
private final LongUnaryOperator supplyReplyId;
private final int httpTypeId;
Expand All @@ -130,6 +133,7 @@ public GrpcClientFactory(
this.metadataBuffer = new UnsafeBuffer(new byte[writeBuffer.capacity()]);
this.extBuffer = new UnsafeBuffer(new byte[writeBuffer.capacity()]);
this.streamFactory = context.streamFactory();
this.supplyCatalog = context::supplyCatalog;
this.supplyInitialId = context::supplyInitialId;
this.supplyReplyId = context::supplyReplyId;
this.httpTypeId = context.supplyTypeId(HTTP_TYPE_NAME);
Expand Down Expand Up @@ -159,7 +163,7 @@ public int routedTypeId()
public void attach(
BindingConfig binding)
{
GrpcBindingConfig grpcBinding = new GrpcBindingConfig(binding, metadataBuffer);
GrpcBindingConfig grpcBinding = new GrpcBindingConfig(binding, metadataBuffer, supplyCatalog);
bindings.put(binding.id, grpcBinding);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static java.time.Instant.now;

import java.util.function.Consumer;
import java.util.function.LongFunction;
import java.util.function.LongSupplier;
import java.util.function.LongUnaryOperator;

Expand Down Expand Up @@ -62,6 +63,7 @@
import io.aklivity.zilla.runtime.engine.binding.BindingHandler;
import io.aklivity.zilla.runtime.engine.binding.function.MessageConsumer;
import io.aklivity.zilla.runtime.engine.buffer.BufferPool;
import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler;
import io.aklivity.zilla.runtime.engine.concurrent.Signaler;
import io.aklivity.zilla.runtime.engine.config.BindingConfig;

Expand Down Expand Up @@ -134,6 +136,7 @@ public final class GrpcServerFactory implements GrpcStreamFactory
private final BufferPool bufferPool;
private final Signaler signaler;
private final BindingHandler streamFactory;
private final LongFunction<CatalogHandler> supplyCatalog;
private final LongUnaryOperator supplyInitialId;
private final LongUnaryOperator supplyReplyId;
private final LongSupplier supplyTraceId;
Expand Down Expand Up @@ -235,6 +238,7 @@ public GrpcServerFactory(
this.bufferPool = context.bufferPool();
this.signaler = context.signaler();
this.streamFactory = context.streamFactory();
this.supplyCatalog = context::supplyCatalog;
this.supplyInitialId = context::supplyInitialId;
this.supplyReplyId = context::supplyReplyId;
this.supplyTraceId = context::supplyTraceId;
Expand All @@ -259,7 +263,7 @@ public int routedTypeId()
public void attach(
BindingConfig binding)
{
GrpcBindingConfig grpcBinding = new GrpcBindingConfig(binding, metadataBuffer);
GrpcBindingConfig grpcBinding = new GrpcBindingConfig(binding, metadataBuffer, supplyCatalog);
bindings.put(binding.id, grpcBinding);
}

Expand Down
Loading
Loading