diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..72806b4
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,24 @@
+# Maven (examples)
+target
+dependency-reduced-pom.xml
+
+# IntelliJ IDEA
+.idea
+*.iml
+
+# Eclipse
+.classpath
+.project
+.settings
+bin
+
+# OS X
+.DS_Store
+
+# log
+logs/
+*.log
+
+#protobuf
+example/build/*
+test/test-integration/build/*
\ No newline at end of file
diff --git a/LICENSE b/LICENSE
deleted file mode 100644
index 261eeb9..0000000
--- a/LICENSE
+++ /dev/null
@@ -1,201 +0,0 @@
- Apache License
- Version 2.0, January 2004
- http://www.apache.org/licenses/
-
- TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
-
- 1. Definitions.
-
- "License" shall mean the terms and conditions for use, reproduction,
- and distribution as defined by Sections 1 through 9 of this document.
-
- "Licensor" shall mean the copyright owner or entity authorized by
- the copyright owner that is granting the License.
-
- "Legal Entity" shall mean the union of the acting entity and all
- other entities that control, are controlled by, or are under common
- control with that entity. For the purposes of this definition,
- "control" means (i) the power, direct or indirect, to cause the
- direction or management of such entity, whether by contract or
- otherwise, or (ii) ownership of fifty percent (50%) or more of the
- outstanding shares, or (iii) beneficial ownership of such entity.
-
- "You" (or "Your") shall mean an individual or Legal Entity
- exercising permissions granted by this License.
-
- "Source" form shall mean the preferred form for making modifications,
- including but not limited to software source code, documentation
- source, and configuration files.
-
- "Object" form shall mean any form resulting from mechanical
- transformation or translation of a Source form, including but
- not limited to compiled object code, generated documentation,
- and conversions to other media types.
-
- "Work" shall mean the work of authorship, whether in Source or
- Object form, made available under the License, as indicated by a
- copyright notice that is included in or attached to the work
- (an example is provided in the Appendix below).
-
- "Derivative Works" shall mean any work, whether in Source or Object
- form, that is based on (or derived from) the Work and for which the
- editorial revisions, annotations, elaborations, or other modifications
- represent, as a whole, an original work of authorship. For the purposes
- of this License, Derivative Works shall not include works that remain
- separable from, or merely link (or bind by name) to the interfaces of,
- the Work and Derivative Works thereof.
-
- "Contribution" shall mean any work of authorship, including
- the original version of the Work and any modifications or additions
- to that Work or Derivative Works thereof, that is intentionally
- submitted to Licensor for inclusion in the Work by the copyright owner
- or by an individual or Legal Entity authorized to submit on behalf of
- the copyright owner. For the purposes of this definition, "submitted"
- means any form of electronic, verbal, or written communication sent
- to the Licensor or its representatives, including but not limited to
- communication on electronic mailing lists, source code control systems,
- and issue tracking systems that are managed by, or on behalf of, the
- Licensor for the purpose of discussing and improving the Work, but
- excluding communication that is conspicuously marked or otherwise
- designated in writing by the copyright owner as "Not a Contribution."
-
- "Contributor" shall mean Licensor and any individual or Legal Entity
- on behalf of whom a Contribution has been received by Licensor and
- subsequently incorporated within the Work.
-
- 2. Grant of Copyright License. Subject to the terms and conditions of
- this License, each Contributor hereby grants to You a perpetual,
- worldwide, non-exclusive, no-charge, royalty-free, irrevocable
- copyright license to reproduce, prepare Derivative Works of,
- publicly display, publicly perform, sublicense, and distribute the
- Work and such Derivative Works in Source or Object form.
-
- 3. Grant of Patent License. Subject to the terms and conditions of
- this License, each Contributor hereby grants to You a perpetual,
- worldwide, non-exclusive, no-charge, royalty-free, irrevocable
- (except as stated in this section) patent license to make, have made,
- use, offer to sell, sell, import, and otherwise transfer the Work,
- where such license applies only to those patent claims licensable
- by such Contributor that are necessarily infringed by their
- Contribution(s) alone or by combination of their Contribution(s)
- with the Work to which such Contribution(s) was submitted. If You
- institute patent litigation against any entity (including a
- cross-claim or counterclaim in a lawsuit) alleging that the Work
- or a Contribution incorporated within the Work constitutes direct
- or contributory patent infringement, then any patent licenses
- granted to You under this License for that Work shall terminate
- as of the date such litigation is filed.
-
- 4. Redistribution. You may reproduce and distribute copies of the
- Work or Derivative Works thereof in any medium, with or without
- modifications, and in Source or Object form, provided that You
- meet the following conditions:
-
- (a) You must give any other recipients of the Work or
- Derivative Works a copy of this License; and
-
- (b) You must cause any modified files to carry prominent notices
- stating that You changed the files; and
-
- (c) You must retain, in the Source form of any Derivative Works
- that You distribute, all copyright, patent, trademark, and
- attribution notices from the Source form of the Work,
- excluding those notices that do not pertain to any part of
- the Derivative Works; and
-
- (d) If the Work includes a "NOTICE" text file as part of its
- distribution, then any Derivative Works that You distribute must
- include a readable copy of the attribution notices contained
- within such NOTICE file, excluding those notices that do not
- pertain to any part of the Derivative Works, in at least one
- of the following places: within a NOTICE text file distributed
- as part of the Derivative Works; within the Source form or
- documentation, if provided along with the Derivative Works; or,
- within a display generated by the Derivative Works, if and
- wherever such third-party notices normally appear. The contents
- of the NOTICE file are for informational purposes only and
- do not modify the License. You may add Your own attribution
- notices within Derivative Works that You distribute, alongside
- or as an addendum to the NOTICE text from the Work, provided
- that such additional attribution notices cannot be construed
- as modifying the License.
-
- You may add Your own copyright statement to Your modifications and
- may provide additional or different license terms and conditions
- for use, reproduction, or distribution of Your modifications, or
- for any such Derivative Works as a whole, provided Your use,
- reproduction, and distribution of the Work otherwise complies with
- the conditions stated in this License.
-
- 5. Submission of Contributions. Unless You explicitly state otherwise,
- any Contribution intentionally submitted for inclusion in the Work
- by You to the Licensor shall be under the terms and conditions of
- this License, without any additional terms or conditions.
- Notwithstanding the above, nothing herein shall supersede or modify
- the terms of any separate license agreement you may have executed
- with Licensor regarding such Contributions.
-
- 6. Trademarks. This License does not grant permission to use the trade
- names, trademarks, service marks, or product names of the Licensor,
- except as required for reasonable and customary use in describing the
- origin of the Work and reproducing the content of the NOTICE file.
-
- 7. Disclaimer of Warranty. Unless required by applicable law or
- agreed to in writing, Licensor provides the Work (and each
- Contributor provides its Contributions) on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- implied, including, without limitation, any warranties or conditions
- of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
- PARTICULAR PURPOSE. You are solely responsible for determining the
- appropriateness of using or redistributing the Work and assume any
- risks associated with Your exercise of permissions under this License.
-
- 8. Limitation of Liability. In no event and under no legal theory,
- whether in tort (including negligence), contract, or otherwise,
- unless required by applicable law (such as deliberate and grossly
- negligent acts) or agreed to in writing, shall any Contributor be
- liable to You for damages, including any direct, indirect, special,
- incidental, or consequential damages of any character arising as a
- result of this License or out of the use or inability to use the
- Work (including but not limited to damages for loss of goodwill,
- work stoppage, computer failure or malfunction, or any and all
- other commercial damages or losses), even if such Contributor
- has been advised of the possibility of such damages.
-
- 9. Accepting Warranty or Additional Liability. While redistributing
- the Work or Derivative Works thereof, You may choose to offer,
- and charge a fee for, acceptance of support, warranty, indemnity,
- or other liability obligations and/or rights consistent with this
- License. However, in accepting such obligations, You may act only
- on Your own behalf and on Your sole responsibility, not on behalf
- of any other Contributor, and only if You agree to indemnify,
- defend, and hold each Contributor harmless for any liability
- incurred by, or claims asserted against, such Contributor by reason
- of your accepting any such warranty or additional liability.
-
- END OF TERMS AND CONDITIONS
-
- APPENDIX: How to apply the Apache License to your work.
-
- To apply the Apache License to your work, attach the following
- boilerplate notice, with the fields enclosed by brackets "[]"
- replaced with your own identifying information. (Don't include
- the brackets!) The text should be enclosed in the appropriate
- comment syntax for the file format. We also recommend that a
- file or class name and description of purpose be included on the
- same "printed page" as the copyright notice for easier
- identification within third-party archives.
-
- Copyright [yyyy] [name of copyright owner]
-
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
diff --git a/README-zh.md b/README-zh.md
new file mode 100644
index 0000000..7665946
--- /dev/null
+++ b/README-zh.md
@@ -0,0 +1,155 @@
+## 如何使用java sdk
+### 1. import sdk
+对于 Maven 项目,将以下配置添加进 `pom.xml` 文件:
+```xml
+
+ ...
+
+ ...
+
+ io.mosn.layotto
+ runtime-sdk-parent
+ 1.0.0
+
+ ...
+
+ ...
+
+```
+
+### 2. 运行 examples 示例
+可以本地部署redis和Layotto,然后运行java应用示例,通过java sdk调Layotto,Layotto转发给redis
+
+#### 第一步:部署redis
+
+1. 取最新版的 Redis 镜像。
+ 这里我们拉取官方的最新版本的镜像:
+
+```shell
+docker pull redis:latest
+```
+
+2. 查看本地镜像
+ 使用以下命令来查看是否已安装了 redis:
+
+```shell
+docker images
+```
+
+3. 运行容器
+
+安装完成后,我们可以使用以下命令来运行 redis 容器:
+
+```shell
+docker run -itd --name redis-test -p 6380:6379 redis
+```
+
+参数说明:
+
+-p 6380:6379:映射容器服务的 6379 端口到宿主机的 6380 端口。外部可以直接通过宿主机ip:6380 访问到 Redis 的服务。
+
+#### 第二步:构建并运行Layotto
+
+clone仓库到本地:
+
+```sh
+git clone https://github.com/mosn/layotto.git
+```
+
+构建并运行Layotto:
+
+```bash
+# make sure you replace this` ${projectpath}` with your own project path.
+cd ${projectpath}/cmd/layotto
+go build
+./layotto start -c ../../configs/config_redis.json
+```
+
+构建java-sdk [Maven](https://maven.apache.org/install.html) (Apache Maven version 3.x) 项目:
+
+```sh
+# make sure you replace this` ${projectpath}` with your own project path.
+cd ${projectpath}/sdk/java-sdk
+mvn clean install
+```
+
+#### 第三步:运行java sdk示例
+通过以下Examples示例来了解如何使用SDK:
+* [Hello world](./examples/src/test/java/io/mosn/layotto/examples/helloworld)
+* [State management](./examples/src/test/java/io/mosn/layotto/examples/state)
+* [Pubsub API](./examples/src/test/java/io/mosn/layotto/examples/pubsub)
+* [File API](./examples/src/test/java/io/mosn/layotto/examples/file)
+
+## java sdk开发指南
+### java sdk职责
+1. sdk负责对Layotto的grpc API进行封装、不应该有任何中间件的定制逻辑,比如不应该出现redis、rocketmq等产品相关的逻辑。
+
+2. sdk需要把所有跟通信协议相关的东西(比如proto编译出来的stub类)屏蔽掉,请勿让public方法暴露出任何跟协议相关的东西,最好protected方法也不暴露proto相关的东西。
+ 这么做是因为将来可能改grpc API的package路径,甚至哪天不用grpc了(谁知道呢)。总之请让用户不用关心协议。
+
+举个例子, state API对应有个`deleteState`方法,需要传`DeleteStateRequest`对象。
+```java
+
+ /**
+ * Delete a state.
+ *
+ * @param request Request to delete a state.
+ */
+ void deleteState(DeleteStateRequest request);
+
+```
+这个`DeleteStateRequest`是sdk定义的,其实sdk会把它转成 `RuntimeProto.DeleteStateRequest` (proto编译出来的类) 。
+
+你可能会问:为什么不能直接传`RuntimeProto.DeleteStateRequest` 呢?
+
+这就是上面说的原因,sdk需要封装掉协议相关的东西
+
+### 想为某个grpc API提供java sdk,需要做哪些事情?
+举个例子,grpc API里添加了file API,现在想为java sdk开发file API相关功能,需要做哪些事情?
+
+1. 先找个java sdk的demo跑起来,然后看懂java sdk是怎么创建对象、怎么调用的。其实java sdk就是把grpc包了一层,封装掉grpc的一些stub类,逻辑不多。
+
+2. 参考pr [feat(java-sdk): java sdk support File API](https://github.com/mosn/layotto/pull/325) . 这个pr 给java sdk添加了file API相关功能
+
+### 如何格式化 java sdk 代码
+提交pull request之前先用maven编译一下
+
+```shell
+mvn clean compile
+```
+会自动格式化您的代码
+
+### 如何将proto文件编译成java代码
+
+#### 1. 下载编译工具 [protoc](https://github.com/protocolbuffers/protobuf/releases)
+my protoc version:
+```shell
+$ protoc --version
+libprotoc 3.11.2
+```
+
+#### 2. 修改对应`proto`文件生成类名包名等信息
+(需先修改文件内部service名)
+`spec/proto/runtime/v1/appcallback.proto` :
+```protobuf
+option java_outer_classname = "AppCallbackProto";
+option java_package = "spec.proto.runtime.v1";
+```
+`spec/proto/runtime/v1/runtime.proto` :
+```protobuf
+option java_outer_classname = "RuntimeProto";
+option java_package = "spec.proto.runtime.v1";
+```
+
+#### 3. 编译其对应`JAVA`文件
+```shell
+cd ${your PROJECT path}/spec/proto/runtime/v1
+protoc -I=. --java_out=../../../../sdk/java-sdk/sdk/src/main/java/ runtime.proto
+protoc -I=. --java_out=../../../../sdk/java-sdk/sdk/src/main/java/ appcallback.proto
+```
+
+[comment]: <> (PS: 建议用maven插件`protoc-gen-grpc-java`生成protobuf和grpc的java代码)
+
+[comment]: <> (如果您在使用 [IntelliJ IDEA](https://www.jetbrains.com/help/idea/discover-intellij-idea.html) ,双击 Maven插件中的 `compile` , IDE 会自动帮你编译 proto 文件:)
+
+[comment]: <> ()
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..ac984bd
--- /dev/null
+++ b/README.md
@@ -0,0 +1,99 @@
+[中文](./README-zh.md)
+## How to use java sdk
+### 1. import sdk
+For a Maven project, add the following to your `pom.xml` file:
+```xml
+
+ ...
+
+ ...
+
+ io.mosn.layotto
+ runtime-sdk-parent
+ 1.0.0
+
+ ...
+
+ ...
+
+```
+
+### 2. Run the examples
+Clone this repository including the submodules:
+
+```sh
+git clone https://github.com/mosn/layotto.git
+```
+
+Build and run Layotto:
+
+```bash
+# make sure you replace this` ${projectpath}` with your own project path.
+cd ${projectpath}/cmd/layotto
+go build
+./layotto start -c ../../configs/config_redis.json
+```
+
+Then head over to build the java-sdk [Maven](https://maven.apache.org/install.html) (Apache Maven version 3.x) project:
+
+```sh
+# make sure you replace this` ${projectpath}` with your own project path.
+cd ${projectpath}/sdk/java-sdk
+mvn clean install
+```
+
+
+
+
+Try the following examples to learn more about this SDK:
+* [Hello world](./examples/src/test/java/io/mosn/layotto/examples/helloworld)
+* [State management](./examples/src/test/java/io/mosn/layotto/examples/state)
+* [Pubsub API](./examples/src/test/java/io/mosn/layotto/examples/pubsub)
+* [File API](./examples/src/test/java/io/mosn/layotto/examples/file)
+
+## java sdk developer guide
+### How to format java sdk code
+Compile before submit your pull request:
+
+```shell
+mvn clean compile
+```
+It will format your code automatically.
+
+### How to generate a Java PROTO file
+
+#### 1. Download proto compiler [protoc](https://github.com/protocolbuffers/protobuf/releases)
+my protoc version:
+```shell
+$ protoc --version
+libprotoc 3.11.2
+```
+
+#### 2. Check `option` fields in these proto files
+Make sure these `option` fields have been configurated.
+
+spec/proto/runtime/v1/appcallback.proto :
+```protobuf
+option java_outer_classname = "AppCallbackProto";
+option java_package = "spec.proto.runtime.v1";
+```
+
+spec/proto/runtime/v1/runtime.proto :
+```protobuf
+option java_outer_classname = "RuntimeProto";
+option java_package = "spec.proto.runtime.v1";
+```
+
+#### 3. Compile them into corresponding `JAVA` files
+```shell
+# make sure you replace this `${your PROJECT path}` with your own project path.
+cd ${your PROJECT path}/spec/proto/runtime/v1
+protoc -I=. --java_out=../../../../sdk/java-sdk/sdk/src/main/java/ runtime.proto
+protoc -I=. --java_out=../../../../sdk/java-sdk/sdk/src/main/java/ appcallback.proto
+```
+
+[comment]: <> (PS: We recommend that you use the maven plugin `protoc-gen-grpc-java` to generate these protobuf and grpc related java code.)
+
+[comment]: <> (If you are using [IntelliJ IDEA](https://www.jetbrains.com/help/idea/discover-intellij-idea.html) ,just double click `compile` in the Maven tab and the IDE will generate proto files automatically:)
+
+[comment]: <> ()
diff --git a/examples/pom.xml b/examples/pom.xml
new file mode 100644
index 0000000..015e86b
--- /dev/null
+++ b/examples/pom.xml
@@ -0,0 +1,60 @@
+
+
+ 4.0.0
+
+
+ runtime-sdk-parent
+ io.mosn.layotto
+ 1.1.0-SNAPSHOT
+
+
+ examples
+
+
+
+ io.mosn.layotto
+ runtime-sdk
+
+
+
+
+ src/main/java
+
+
+ src/main/resources
+ false
+
+ **/**
+
+
+
+ src/test/java
+
+
+ src/test/resources
+ false
+
+ **/**
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-source-plugin
+ 3.0.1
+
+
+ attach-sources
+
+ jar
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/examples/src/test/java/io/mosn/layotto/examples/file/File.java b/examples/src/test/java/io/mosn/layotto/examples/file/File.java
new file mode 100644
index 0000000..43f2fc1
--- /dev/null
+++ b/examples/src/test/java/io/mosn/layotto/examples/file/File.java
@@ -0,0 +1,133 @@
+/*
+ * Copyright 2021 Layotto Authors
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.mosn.layotto.examples.file;
+
+import io.mosn.layotto.v1.RuntimeClientBuilder;
+import io.mosn.layotto.v1.config.RuntimeProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import spec.sdk.runtime.v1.client.RuntimeClient;
+import spec.sdk.runtime.v1.domain.file.DelFileRequest;
+import spec.sdk.runtime.v1.domain.file.FileInfo;
+import spec.sdk.runtime.v1.domain.file.GetFileRequest;
+import spec.sdk.runtime.v1.domain.file.GetFileResponse;
+import spec.sdk.runtime.v1.domain.file.GetMetaRequest;
+import spec.sdk.runtime.v1.domain.file.GetMeteResponse;
+import spec.sdk.runtime.v1.domain.file.ListFileRequest;
+import spec.sdk.runtime.v1.domain.file.ListFileResponse;
+import spec.sdk.runtime.v1.domain.file.PutFileRequest;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Specially
+ *
+ * 1. add `"local:{}` to "files" node in layotto/configs/config_file.json
+ * 2. start server by `./layotto start -c ../../configs/config_file.json`
+ */
+public class File {
+
+ private static final Logger logger = LoggerFactory.getLogger(File.class.getName());
+
+ static String storeName = "local";
+ static String fileName = "/tmp/test.log";
+
+ public static void main(String[] args) throws Exception {
+
+ RuntimeClient client = new RuntimeClientBuilder()
+ .withPort(RuntimeProperties.DEFAULT_PORT)
+ .build();
+
+ putFile(client);
+ getFile(client);
+ listFile(client);
+ getFileMeta(client);
+ delFile(client);
+ }
+
+ public static void putFile(RuntimeClient client) throws Exception {
+
+ PutFileRequest request = new PutFileRequest();
+ request.setStoreName(storeName);
+ request.setFileName(fileName);
+
+ Map meta = new HashMap<>();
+ meta.put("FileMode", "521");
+ meta.put("FileFlag", "0777");
+ request.setMetaData(meta);
+
+ request.setIn(new ByteArrayInputStream("hello world".getBytes()));
+
+ client.putFile(request, 3000);
+ }
+
+ public static void getFile(RuntimeClient client) throws Exception {
+
+ GetFileRequest request = new GetFileRequest();
+ request.setStoreName(storeName);
+ request.setFileName(fileName);
+
+ Map meta = new HashMap<>();
+ meta.put("k1", "v1");
+ request.setMetaData(meta);
+
+ GetFileResponse resp = client.getFile(request, 3000);
+
+ InputStream reader = resp.getIn();
+
+ byte[] buf = new byte[128];
+ for (int len = reader.read(buf); len > 0; len = reader.read(buf)) {
+ logger.info(new String(buf, 0, len));
+ }
+ }
+
+ public static void delFile(RuntimeClient client) throws Exception {
+
+ DelFileRequest request = new DelFileRequest();
+ request.setStoreName(storeName);
+ request.setFileName(fileName);
+
+ client.delFile(request, 3000);
+ }
+
+ public static void listFile(RuntimeClient client) throws Exception {
+
+ ListFileRequest request = new ListFileRequest();
+ request.setStoreName(storeName);
+ request.setMarker("test.log");
+ request.setName("/tmp");
+ request.setPageSize(10);
+
+ ListFileResponse resp = client.listFile(request, 3000);
+
+ for (FileInfo f : resp.getFiles()) {
+ logger.info(f.getFileName());
+ }
+ }
+
+ public static void getFileMeta(RuntimeClient client) throws Exception {
+
+ GetMetaRequest request = new GetMetaRequest();
+ request.setStoreName(storeName);
+ request.setFileName(fileName);
+
+ GetMeteResponse response = client.getFileMeta(request, 3000);
+ logger.info(response.getLastModified());
+ logger.info("" + response.getMeta().size());
+ }
+}
diff --git a/examples/src/test/java/io/mosn/layotto/examples/helloworld/Hello.java b/examples/src/test/java/io/mosn/layotto/examples/helloworld/Hello.java
new file mode 100644
index 0000000..da35ec3
--- /dev/null
+++ b/examples/src/test/java/io/mosn/layotto/examples/helloworld/Hello.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2021 Layotto Authors
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.mosn.layotto.examples.helloworld;
+
+import io.mosn.layotto.v1.RuntimeClientBuilder;
+import io.mosn.layotto.v1.config.RuntimeProperties;
+import spec.sdk.runtime.v1.client.RuntimeClient;
+
+public class Hello {
+
+ public static void main(String[] args) {
+ RuntimeClient client = new RuntimeClientBuilder()
+ .withPort(RuntimeProperties.DEFAULT_PORT)
+ .build();
+
+ String resp = client.sayHello("helloworld");
+ if (!"greeting, helloworld".equals(resp)) {
+ throw new RuntimeException("Unexpected result:" + resp);
+ }
+ System.out.println(resp);
+ }
+}
diff --git a/examples/src/test/java/io/mosn/layotto/examples/pubsub/publisher/Publisher.java b/examples/src/test/java/io/mosn/layotto/examples/pubsub/publisher/Publisher.java
new file mode 100644
index 0000000..e7b14d9
--- /dev/null
+++ b/examples/src/test/java/io/mosn/layotto/examples/pubsub/publisher/Publisher.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2021 Layotto Authors
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.mosn.layotto.examples.pubsub.publisher;
+
+import io.mosn.layotto.v1.RuntimeClientBuilder;
+import io.mosn.layotto.v1.config.RuntimeProperties;
+import spec.sdk.runtime.v1.client.RuntimeClient;
+
+public class Publisher {
+
+ public static void main(String[] args) {
+ RuntimeClient client = new RuntimeClientBuilder()
+ .withPort(RuntimeProperties.DEFAULT_PORT)
+ .build();
+
+ client.publishEvent("redis", "hello", "world".getBytes());
+ client.publishEvent("redis", "topic1", "message1".getBytes());
+ }
+}
diff --git a/examples/src/test/java/io/mosn/layotto/examples/pubsub/subscriber/Subscriber.java b/examples/src/test/java/io/mosn/layotto/examples/pubsub/subscriber/Subscriber.java
new file mode 100644
index 0000000..887a64a
--- /dev/null
+++ b/examples/src/test/java/io/mosn/layotto/examples/pubsub/subscriber/Subscriber.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2021 Layotto Authors
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.mosn.layotto.examples.pubsub.subscriber;
+
+import com.alibaba.fastjson.JSON;
+import io.mosn.layotto.examples.pubsub.subscriber.impl.RawSubscriber;
+import io.mosn.layotto.v1.RuntimeServerGrpc;
+
+import java.util.concurrent.Semaphore;
+
+public class Subscriber {
+
+ /**
+ * This is the entry point for this example app, which subscribes to a topic.
+ *
+ * @throws Exception An Exception on startup.
+ */
+ public static void main(String[] args) throws Exception {
+ RuntimeServerGrpc srv = new RuntimeServerGrpc(9999);
+ RawSubscriber pubsub = new RawSubscriber("redis");
+ pubsub.subscribe("hello", request -> {
+ String value = new String(request.getData());
+ assertEquals(value, "world");
+ System.out.println(JSON.toJSONString(request));
+ });
+ pubsub.subscribe("topic1", request -> {
+ String value = new String(request.getData());
+ assertEquals(value, "message1");
+ System.out.println(JSON.toJSONString(request));
+ });
+ srv.registerPubSubCallback(pubsub.getComponentName(), pubsub);
+ Semaphore sm = new Semaphore(0);
+ srv.start();
+ sm.acquire();
+ }
+
+ private static void assertEquals(Object actualResult, Object expected) {
+ if (actualResult == expected || actualResult.equals(expected)) {
+ return;
+ }
+ String msg = "Unexpected result:" + actualResult;
+ throw new RuntimeException(msg);
+ }
+}
\ No newline at end of file
diff --git a/examples/src/test/java/io/mosn/layotto/examples/pubsub/subscriber/impl/EventListener.java b/examples/src/test/java/io/mosn/layotto/examples/pubsub/subscriber/impl/EventListener.java
new file mode 100644
index 0000000..8f1c523
--- /dev/null
+++ b/examples/src/test/java/io/mosn/layotto/examples/pubsub/subscriber/impl/EventListener.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2021 Layotto Authors
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.mosn.layotto.examples.pubsub.subscriber.impl;
+
+import spec.sdk.runtime.v1.domain.pubsub.TopicEventRequest;
+
+public interface EventListener {
+
+ void onEvent(TopicEventRequest request) throws Exception;
+}
diff --git a/examples/src/test/java/io/mosn/layotto/examples/pubsub/subscriber/impl/RawSubscriber.java b/examples/src/test/java/io/mosn/layotto/examples/pubsub/subscriber/impl/RawSubscriber.java
new file mode 100644
index 0000000..3121c4e
--- /dev/null
+++ b/examples/src/test/java/io/mosn/layotto/examples/pubsub/subscriber/impl/RawSubscriber.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2021 Layotto Authors
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.mosn.layotto.examples.pubsub.subscriber.impl;
+
+import io.mosn.layotto.v1.callback.component.pubsub.Subscriber;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import spec.sdk.runtime.v1.domain.pubsub.TopicEventRequest;
+import spec.sdk.runtime.v1.domain.pubsub.TopicEventResponse;
+import spec.sdk.runtime.v1.domain.pubsub.TopicEventResponseStatus;
+import spec.sdk.runtime.v1.domain.pubsub.TopicSubscription;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Raw pubsub client.
+ */
+public class RawSubscriber implements Subscriber {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RawSubscriber.class);
+
+ private final Map listeners = new ConcurrentHashMap<>();
+ private final String componentName;
+
+ public RawSubscriber(String pubsubName) {
+ componentName = pubsubName;
+ }
+
+ @Override
+ public String getComponentName() {
+ return componentName;
+ }
+
+ public void subscribe(String topic, EventListener listener) {
+ if (listeners.putIfAbsent(topic, listener) != null) {
+ throw new IllegalArgumentException("Listener for topic " + topic + " already exists!");
+ }
+ }
+
+ @Override
+ public Set listTopicSubscriptions() {
+ final HashSet subscriptions = new HashSet<>();
+ for (String topic : listeners.keySet()) {
+ final TopicSubscription subscription = new TopicSubscription();
+ subscription.setTopic(topic);
+ subscription.setPubsubName(componentName);
+ subscriptions.add(subscription);
+ }
+ return subscriptions;
+ }
+
+ @Override
+ public TopicEventResponse onTopicEvent(TopicEventRequest request) {
+ final String topic = request.getTopic();
+ final EventListener eventListener = listeners.get(topic);
+ if (eventListener == null) {
+ LOG.error("Cannot find listener for topic:[{}]", topic);
+ TopicEventResponse resp = new TopicEventResponse();
+ resp.setStatus(TopicEventResponseStatus.DROP);
+ }
+ try {
+ eventListener.onEvent(request);
+ final TopicEventResponse response = new TopicEventResponse();
+ response.setStatus(TopicEventResponseStatus.SUCCESS);
+ return response;
+ } catch (Exception e) {
+ final TopicEventResponse response = new TopicEventResponse();
+ response.setStatus(TopicEventResponseStatus.RETRY);
+ return response;
+ }
+ }
+}
diff --git a/examples/src/test/java/io/mosn/layotto/examples/state/RedisCRUD.java b/examples/src/test/java/io/mosn/layotto/examples/state/RedisCRUD.java
new file mode 100644
index 0000000..34e4b13
--- /dev/null
+++ b/examples/src/test/java/io/mosn/layotto/examples/state/RedisCRUD.java
@@ -0,0 +1,131 @@
+/*
+ * Copyright 2021 Layotto Authors
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.mosn.layotto.examples.state;
+
+import io.mosn.layotto.v1.RuntimeClientBuilder;
+import io.mosn.layotto.v1.config.RuntimeProperties;
+import spec.sdk.runtime.v1.client.RuntimeClient;
+import spec.sdk.runtime.v1.domain.state.GetBulkStateRequest;
+import spec.sdk.runtime.v1.domain.state.State;
+import spec.sdk.runtime.v1.domain.state.TransactionalStateOperation;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class RedisCRUD {
+ static String storeName = "redis";
+ static String key1 = "key1";
+ static String key2 = "key2";
+ static String key3 = "key3";
+
+ public static void main(String[] args) {
+ // build RuntimeClient
+ RuntimeClient client = new RuntimeClientBuilder()
+ .withPort(RuntimeProperties.DEFAULT_PORT)
+ .build();
+ // saveState
+ client.saveState(storeName, key1, "v11");
+ // getState
+ State state = client.getState(storeName, key1, String.class);
+ assertEquals(state.getKey(), key1);
+ assertEquals(state.getValue(), "v11");
+ System.out.println("get state key:" + state.getKey() + " value:" + state.getValue());
+
+ // deleteState
+ client.deleteState(storeName, key1);
+
+ // getState
+ state = client.getState(storeName, key1, String.class);
+ assertEquals(state.getKey(), key1);
+ // TODO: currently Redis component can't tell the difference between null and 'non exist'
+ //assertEquals(state.getValue(), null);
+ assertEquals(state.getValue(), "");
+ System.out.println("get state after delete. key:" + state.getKey() + " value:" + state.getValue());
+
+ // saveBulkState
+ List> list = new ArrayList<>();
+ State> state1 = new State<>(key1, "v1", null, null);
+ State> state2 = new State<>(key2, "v2", null, null);
+ list.add(state2);
+ list.add(state1);
+ client.saveBulkState(storeName, list);
+
+ // execute transaction
+ List> operationList = new ArrayList<>();
+ operationList.add(new TransactionalStateOperation<>(TransactionalStateOperation.OperationType.UPSERT,
+ new State<>(key2, new TestClass(key2), "")));
+
+ operationList.add(new TransactionalStateOperation<>(TransactionalStateOperation.OperationType.UPSERT,
+ new State<>(key3, "v3", "")));
+
+ client.executeStateTransaction(storeName, operationList);
+
+ // getBulkState
+ List keys = new ArrayList<>();
+ keys.add(key3);
+ keys.add(key1);
+ GetBulkStateRequest req = new GetBulkStateRequest(storeName, keys);
+ List> bulkState = client.getBulkState(req, String.class);
+ assertTrue(bulkState.size() == 2);
+ for (State st : bulkState) {
+ String key = st.getKey();
+ if (key.equals(key1)) {
+ assertEquals(st.getValue(), "v1");
+ } else if (key.equals(key3)) {
+ assertEquals(st.getValue(), "v3");
+ } else {
+ throw new RuntimeException("Unexpected key:" + key);
+ }
+ }
+
+ keys = new ArrayList<>();
+ keys.add(key2);
+ req = new GetBulkStateRequest(storeName, keys);
+ List> resp = client.getBulkState(req, TestClass.class);
+ assertTrue(resp.size() == 1);
+ assertEquals(resp.get(0).getValue().name, key2);
+
+ }
+
+ private static void assertTrue(boolean b) {
+ if (!b) {
+ throw new RuntimeException("Assertion fail");
+ }
+ }
+
+ private static void assertEquals(String actualResult, String expected) {
+ if (actualResult == expected || actualResult.equals(expected)) {
+ return;
+ }
+ throw new RuntimeException("Unexpected result:" + actualResult);
+ }
+
+ public static class TestClass {
+ String name;
+
+ public TestClass(String name) {
+ this.name = name;
+ }
+
+ /**
+ * Getter method for property name.
+ *
+ * @return property value of name
+ */
+ public String getName() {
+ return name;
+ }
+ }
+}
diff --git a/img.png b/img.png
new file mode 100644
index 0000000..a826585
Binary files /dev/null and b/img.png differ
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..d57ab0a
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,263 @@
+
+
+ 4.0.0
+
+ io.mosn.layotto
+ runtime-sdk-parent
+ 1.1.0-SNAPSHOT
+ pom
+
+ runtime-sdk-parent
+ SDK for Runtime
+ https://github.com/mosn/layotto
+
+
+ sdk
+ examples
+ sdk-reactor
+ spec
+
+
+
+ 3.17.2
+ 1.34.1
+ 1.7.30
+ 1.2.69
+ 4.13.1
+ 3.11.2
+
+
+
+
+
+
+ io.mosn.layotto
+ runtime-sdk
+ ${project.version}
+
+
+ io.mosn.layotto
+ runtime-spec-pb
+ ${project.version}
+
+
+
+ com.alibaba
+ fastjson
+ ${fastjson.version}
+
+
+
+
+ org.slf4j
+ slf4j-api
+ ${slf4j.version}
+
+
+
+
+ io.grpc
+ grpc-all
+ ${grpc.version}
+
+
+ io.grpc
+ grpc-netty
+ ${grpc.version}
+
+
+ io.grpc
+ grpc-protobuf
+ ${grpc.version}
+
+
+ io.grpc
+ grpc-stub
+ ${grpc.version}
+
+
+
+ org.mockito
+ mockito-core
+ ${mockito.version}
+ test
+
+
+ junit
+ junit
+ ${junit.version}
+ test
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-install-plugin
+ 2.5.2
+
+
+ org.apache.maven.plugins
+ maven-deploy-plugin
+ 2.8.2
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.6.0
+
+ 1.8
+ 1.8
+
+
+
+ com.googlecode.maven-java-formatter-plugin
+ maven-java-formatter-plugin
+ 0.4
+
+
+
+ format
+
+
+
+
+ ${user.dir}/tools/codestyle/formatter.xml
+ UTF-8
+
+ spec/proto/runtime/v1/**
+
+
+
+
+
+ com.mycila
+ license-maven-plugin
+ 3.0
+
+
+ generate-sources
+
+ remove
+ format
+
+
+
+
+ true
+ ${user.dir}/tools/codestyle/HEADER
+
+ **/src/main/java/**
+ **/src/test/java/**
+
+ true
+
+ SLASHSTAR_STYLE
+
+
+
+
+
+
+
+
+ The Apache License, Version 2.0
+ http://www.apache.org/licenses/LICENSE-2.0.txt
+
+
+
+
+
+ seeflood
+ 349895584@qq.com
+ MOSN
+ https://mosn.io/
+
+
+ MentosL
+ 1367654518@qq.com
+ MOSN
+ https://mosn.io/
+
+
+ ZLBer
+ 1098294815@qq.com
+ MOSN
+ https://mosn.io/
+
+
+ kevinten10
+ http://www.kevinten.com/
+
+
+
+
+ http://github.com/mosn/layotto
+ scm:git:https://github.com/mosn/layotto.git
+
+
+
+ release
+
+
+
+ org.apache.maven.plugins
+ maven-javadoc-plugin
+ 2.10.4
+
+
+ attach-javadocs
+
+ jar
+
+
+
+
+
+ org.sonatype.plugins
+ nexus-staging-maven-plugin
+ 1.6.7
+ true
+
+ ossrh
+ https://s01.oss.sonatype.org/
+ false
+
+
+
+ org.apache.maven.plugins
+ maven-gpg-plugin
+ 1.6
+
+
+ sign-artifacts
+ verify
+
+ sign
+
+
+
+
+
+
+
+
+ ossrh
+ https://s01.oss.sonatype.org/content/repositories/snapshots
+
+
+ ossrh
+ https://s01.oss.sonatype.org/service/local/staging/deploy/maven2/
+
+
+
+
+
diff --git a/sdk-reactor/pom.xml b/sdk-reactor/pom.xml
new file mode 100644
index 0000000..ae43a86
--- /dev/null
+++ b/sdk-reactor/pom.xml
@@ -0,0 +1,53 @@
+
+
+
+ 4.0.0
+
+
+ io.mosn.layotto
+ runtime-sdk-parent
+ 1.1.0-SNAPSHOT
+
+
+ runtime-sdk-reactor
+
+ runtime-sdk-reactor
+ reactor-style SDK for Runtime
+ jar
+
+
+
+ io.mosn.layotto
+ runtime-spec-pb
+
+
+ org.slf4j
+ slf4j-api
+
+
+ io.projectreactor
+ reactor-core
+ 3.3.11.RELEASE
+
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ 2.11.3
+
+
+
+ com.squareup.okhttp3
+ okhttp
+ 4.9.0
+
+
+
+ org.apache.tomcat
+ annotations-api
+ 6.0.53
+ provided
+
+
+
diff --git a/sdk-reactor/src/main/java/io/mosn/layotto/v1/client/package-info.java b/sdk-reactor/src/main/java/io/mosn/layotto/v1/client/package-info.java
new file mode 100644
index 0000000..3b6fc72
--- /dev/null
+++ b/sdk-reactor/src/main/java/io/mosn/layotto/v1/client/package-info.java
@@ -0,0 +1,18 @@
+/*
+ * Copyright 2021 Layotto Authors
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * SDK JVM Client.
+ */
+package io.mosn.layotto.v1.client;
\ No newline at end of file
diff --git a/sdk-reactor/src/main/java/io/mosn/layotto/v1/client/reactor/AbstractLayottoReactorClient.java b/sdk-reactor/src/main/java/io/mosn/layotto/v1/client/reactor/AbstractLayottoReactorClient.java
new file mode 100644
index 0000000..bca2ae6
--- /dev/null
+++ b/sdk-reactor/src/main/java/io/mosn/layotto/v1/client/reactor/AbstractLayottoReactorClient.java
@@ -0,0 +1,300 @@
+/*
+ * Copyright 2021 Layotto Authors
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.mosn.layotto.v1.client.reactor;
+
+import io.mosn.layotto.v1.serializer.LayottoObjectSerializer;
+import reactor.core.publisher.Mono;
+import spec.sdk.reactor.v1.domain.core.invocation.HttpExtension;
+import spec.sdk.reactor.v1.domain.core.invocation.InvokeMethodRequest;
+import spec.sdk.reactor.v1.domain.core.pubsub.PublishEventRequest;
+import spec.sdk.reactor.v1.domain.core.state.DeleteStateRequest;
+import spec.sdk.reactor.v1.domain.core.state.ExecuteStateTransactionRequest;
+import spec.sdk.reactor.v1.domain.core.state.GetBulkStateRequest;
+import spec.sdk.reactor.v1.domain.core.state.GetStateRequest;
+import spec.sdk.reactor.v1.domain.core.state.SaveStateRequest;
+import spec.sdk.reactor.v1.domain.core.state.State;
+import spec.sdk.reactor.v1.domain.core.state.StateOptions;
+import spec.sdk.reactor.v1.domain.core.state.TransactionalStateOperation;
+import spec.sdk.reactor.v1.utils.TypeRef;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+abstract class AbstractLayottoReactorClient implements LayottoReactorClient {
+
+ /**
+ * A utility class for serialize and deserialize the transient objects.
+ */
+ protected LayottoObjectSerializer objectSerializer;
+
+ /**
+ * A utility class for serialize and deserialize state objects.
+ */
+ protected LayottoObjectSerializer stateSerializer;
+
+ /**
+ * Common constructor for implementations of this class.
+ *
+ * @param objectSerializer Serializer for transient request/response objects.
+ * @param stateSerializer Serializer for state objects.
+ */
+ AbstractLayottoReactorClient(LayottoObjectSerializer objectSerializer,
+ LayottoObjectSerializer stateSerializer) {
+ this.objectSerializer = objectSerializer;
+ this.stateSerializer = stateSerializer;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Mono publishEvent(String pubsubName, String topicName, Object data) {
+ return this.publishEvent(pubsubName, topicName, data, null);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Mono publishEvent(String pubsubName, String topicName, Object data, Map metadata) {
+ PublishEventRequest req = new PublishEventRequest(pubsubName, topicName, data)
+ .setMetadata(metadata);
+ return this.publishEvent(req).then();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Mono invokeMethod(String appId,
+ String methodName,
+ Object data,
+ HttpExtension httpExtension,
+ Map metadata,
+ TypeRef type) {
+ InvokeMethodRequest req = new InvokeMethodRequest(appId, methodName)
+ .setBody(data)
+ .setHttpExtension(httpExtension)
+ .setContentType(objectSerializer.getContentType());
+ return this.invokeMethod(req, type);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Mono invokeMethod(String appId,
+ String methodName,
+ Object request,
+ HttpExtension httpExtension,
+ Map metadata,
+ Class clazz) {
+ return this.invokeMethod(appId, methodName, request, httpExtension, metadata, TypeRef.get(clazz));
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Mono invokeMethod(String appId, String methodName, HttpExtension httpExtension,
+ Map metadata, TypeRef type) {
+ return this.invokeMethod(appId, methodName, null, httpExtension, metadata, type);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Mono invokeMethod(String appId, String methodName, HttpExtension httpExtension,
+ Map metadata, Class clazz) {
+ return this.invokeMethod(appId, methodName, null, httpExtension, metadata, TypeRef.get(clazz));
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Mono invokeMethod(String appId, String methodName, Object request, HttpExtension httpExtension,
+ TypeRef type) {
+ return this.invokeMethod(appId, methodName, request, httpExtension, null, type);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Mono invokeMethod(String appId, String methodName, Object request, HttpExtension httpExtension,
+ Class clazz) {
+ return this.invokeMethod(appId, methodName, request, httpExtension, null, TypeRef.get(clazz));
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Mono invokeMethod(String appId, String methodName, Object request, HttpExtension httpExtension) {
+ return this.invokeMethod(appId, methodName, request, httpExtension, null, TypeRef.BYTE_ARRAY).then();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Mono invokeMethod(String appId, String methodName, Object request, HttpExtension httpExtension,
+ Map metadata) {
+ return this.invokeMethod(appId, methodName, request, httpExtension, metadata, TypeRef.BYTE_ARRAY).then();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Mono invokeMethod(String appId, String methodName, HttpExtension httpExtension,
+ Map metadata) {
+ return this.invokeMethod(appId, methodName, null, httpExtension, metadata, TypeRef.BYTE_ARRAY).then();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Mono invokeMethod(String appId, String methodName, byte[] request, HttpExtension httpExtension,
+ Map metadata) {
+ return this.invokeMethod(appId, methodName, request, httpExtension, metadata, TypeRef.BYTE_ARRAY);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Mono> getState(String storeName, State state, TypeRef type) {
+ return this.getState(storeName, state.getKey(), state.getOptions(), type);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Mono> getState(String storeName, State state, Class clazz) {
+ return this.getState(storeName, state.getKey(), state.getOptions(), TypeRef.get(clazz));
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Mono> getState(String storeName, String key, TypeRef type) {
+ return this.getState(storeName, key, null, type);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Mono> getState(String storeName, String key, Class clazz) {
+ return this.getState(storeName, key, null, TypeRef.get(clazz));
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Mono> getState(String storeName, String key, StateOptions options, TypeRef type) {
+ GetStateRequest request = new GetStateRequest(storeName, key)
+ .setStateOptions(options);
+ return this.getState(request, type);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Mono> getState(String storeName, String key, StateOptions options, Class clazz) {
+ return this.getState(storeName, key, options, TypeRef.get(clazz));
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Mono>> getBulkState(String storeName, List keys, TypeRef type) {
+ return this.getBulkState(new GetBulkStateRequest(storeName, keys), type);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Mono>> getBulkState(String storeName, List keys, Class clazz) {
+ return this.getBulkState(storeName, keys, TypeRef.get(clazz));
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Mono executeStateTransaction(String storeName, List> operations) {
+ ExecuteStateTransactionRequest request = new ExecuteStateTransactionRequest(storeName)
+ .setOperations(operations);
+ return executeStateTransaction(request).then();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Mono saveBulkState(String storeName, List> states) {
+ SaveStateRequest request = new SaveStateRequest(storeName)
+ .setStates(states);
+ return this.saveBulkState(request).then();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Mono saveState(String storeName, String key, Object value) {
+ return this.saveState(storeName, key, null, value, null);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Mono saveState(String storeName, String key, String etag, Object value, StateOptions options) {
+ State> state = new State<>(key, value, etag, options);
+ return this.saveBulkState(storeName, Collections.singletonList(state));
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Mono deleteState(String storeName, String key) {
+ return this.deleteState(storeName, key, null, null);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Mono deleteState(String storeName, String key, String etag, StateOptions options) {
+ DeleteStateRequest request = new DeleteStateRequest(storeName, key)
+ .setEtag(etag)
+ .setStateOptions(options);
+ return deleteState(request).then();
+ }
+}
diff --git a/sdk-reactor/src/main/java/io/mosn/layotto/v1/client/reactor/LayottoReactorClient.java b/sdk-reactor/src/main/java/io/mosn/layotto/v1/client/reactor/LayottoReactorClient.java
new file mode 100644
index 0000000..dc8ea27
--- /dev/null
+++ b/sdk-reactor/src/main/java/io/mosn/layotto/v1/client/reactor/LayottoReactorClient.java
@@ -0,0 +1,171 @@
+/*
+ * Copyright 2021 Layotto Authors
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.mosn.layotto.v1.client.reactor;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import spec.sdk.reactor.v1.client.CloudRuntimesClient;
+import spec.sdk.reactor.v1.domain.core.configuration.ConfigurationItem;
+import spec.sdk.reactor.v1.domain.core.configuration.ConfigurationRequestItem;
+import spec.sdk.reactor.v1.domain.core.configuration.SaveConfigurationRequest;
+import spec.sdk.reactor.v1.domain.core.configuration.SubConfigurationResp;
+import spec.sdk.reactor.v1.domain.core.invocation.HttpExtension;
+import spec.sdk.reactor.v1.domain.core.invocation.InvokeMethodRequest;
+import spec.sdk.reactor.v1.domain.core.pubsub.PublishEventRequest;
+import spec.sdk.reactor.v1.domain.core.state.DeleteStateRequest;
+import spec.sdk.reactor.v1.domain.core.state.ExecuteStateTransactionRequest;
+import spec.sdk.reactor.v1.domain.core.state.GetBulkStateRequest;
+import spec.sdk.reactor.v1.domain.core.state.GetStateRequest;
+import spec.sdk.reactor.v1.domain.core.state.SaveStateRequest;
+import spec.sdk.reactor.v1.domain.core.state.State;
+import spec.sdk.reactor.v1.domain.core.state.StateOptions;
+import spec.sdk.reactor.v1.domain.core.state.TransactionalStateOperation;
+import spec.sdk.reactor.v1.utils.TypeRef;
+
+import java.util.List;
+import java.util.Map;
+
+public interface LayottoReactorClient extends CloudRuntimesClient {
+
+ @Override
+ Mono waitForSidecar(int timeoutInMilliseconds);
+
+ @Override
+ Mono shutdown();
+
+ @Override
+ void close() throws Exception;
+
+ @Override
+ Mono>> getConfiguration(ConfigurationRequestItem configurationRequestItem,
+ TypeRef type);
+
+ @Override
+ Mono saveConfiguration(SaveConfigurationRequest saveConfigurationRequest);
+
+ @Override
+ Mono deleteConfiguration(ConfigurationRequestItem configurationRequestItem);
+
+ @Override
+ Flux> subscribeConfiguration(ConfigurationRequestItem configurationRequestItem,
+ TypeRef type);
+
+ @Override
+ Mono invokeMethod(String appId, String methodName, Object data, HttpExtension httpExtension,
+ Map metadata, TypeRef type);
+
+ @Override
+ Mono invokeMethod(String appId, String methodName, Object request, HttpExtension httpExtension,
+ Map metadata, Class clazz);
+
+ @Override
+ Mono invokeMethod(String appId, String methodName, Object request, HttpExtension httpExtension,
+ TypeRef type);
+
+ @Override
+ Mono invokeMethod(String appId, String methodName, Object request, HttpExtension httpExtension,
+ Class clazz);
+
+ @Override
+ Mono invokeMethod(String appId, String methodName, HttpExtension httpExtension,
+ Map metadata, TypeRef type);
+
+ @Override
+ Mono invokeMethod(String appId, String methodName, HttpExtension httpExtension,
+ Map metadata, Class clazz);
+
+ @Override
+ Mono invokeMethod(String appId, String methodName, Object request, HttpExtension httpExtension,
+ Map metadata);
+
+ @Override
+ Mono invokeMethod(String appId, String methodName, Object request, HttpExtension httpExtension);
+
+ @Override
+ Mono invokeMethod(String appId, String methodName, HttpExtension httpExtension, Map metadata);
+
+ @Override
+ Mono invokeMethod(String appId, String methodName, byte[] request, HttpExtension httpExtension,
+ Map metadata);
+
+ @Override
+ Mono invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef type);
+
+ @Override
+ Mono publishEvent(String pubsubName, String topicName, Object data);
+
+ @Override
+ Mono publishEvent(String pubsubName, String topicName, Object data, Map metadata);
+
+ @Override
+ Mono publishEvent(PublishEventRequest request);
+
+ @Override
+ Mono> getState(String storeName, State state, TypeRef type);
+
+ @Override
+ Mono> getState(String storeName, State state, Class clazz);
+
+ @Override
+ Mono> getState(String storeName, String key, TypeRef type);
+
+ @Override
+ Mono> getState(String storeName, String key, Class clazz);
+
+ @Override
+ Mono> getState(String storeName, String key, StateOptions options, TypeRef type);
+
+ @Override
+ Mono> getState(String storeName, String key, StateOptions options, Class clazz);
+
+ @Override
+ Mono> getState(GetStateRequest request, TypeRef type);
+
+ @Override
+ Mono>> getBulkState(String storeName, List keys, TypeRef type);
+
+ @Override
+ Mono>> getBulkState(String storeName, List keys, Class clazz);
+
+ @Override
+ Mono>> getBulkState(GetBulkStateRequest request, TypeRef type);
+
+ @Override
+ Mono executeStateTransaction(String storeName, List> operations);
+
+ @Override
+ Mono executeStateTransaction(ExecuteStateTransactionRequest request);
+
+ @Override
+ Mono saveBulkState(String storeName, List> states);
+
+ @Override
+ Mono saveBulkState(SaveStateRequest request);
+
+ @Override
+ Mono saveState(String storeName, String key, Object value);
+
+ @Override
+ Mono saveState(String storeName, String key, String etag, Object value, StateOptions options);
+
+ @Override
+ Mono deleteState(String storeName, String key);
+
+ @Override
+ Mono deleteState(String storeName, String key, String etag, StateOptions options);
+
+ @Override
+ Mono deleteState(DeleteStateRequest request);
+}
diff --git a/sdk-reactor/src/main/java/io/mosn/layotto/v1/client/reactor/LayottoReactorClientBuilder.java b/sdk-reactor/src/main/java/io/mosn/layotto/v1/client/reactor/LayottoReactorClientBuilder.java
new file mode 100644
index 0000000..ae51d48
--- /dev/null
+++ b/sdk-reactor/src/main/java/io/mosn/layotto/v1/client/reactor/LayottoReactorClientBuilder.java
@@ -0,0 +1,148 @@
+/*
+ * Copyright 2021 Layotto Authors
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.mosn.layotto.v1.client.reactor;
+
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.mosn.layotto.v1.config.Properties;
+import io.mosn.layotto.v1.serializer.DefaultObjectSerializer;
+import io.mosn.layotto.v1.serializer.LayottoObjectSerializer;
+import io.mosn.layotto.v1.value.LayottoApiProtocol;
+import spec.proto.runtime.v1.RuntimeGrpc;
+
+import java.io.Closeable;
+
+/**
+ * A builder for the LayottoClient, Currently only gRPC Client will be supported.
+ */
+public class LayottoReactorClientBuilder {
+
+ /**
+ * Determine if this builder will create GRPC clients instead of HTTP clients.
+ */
+ private final LayottoApiProtocol apiProtocol;
+
+ /**
+ * Serializer used for request and response objects in LayottoClient.
+ */
+ private LayottoObjectSerializer objectSerializer;
+
+ /**
+ * Serializer used for state objects in LayottoClient.
+ */
+ private LayottoObjectSerializer stateSerializer;
+
+ /**
+ * Creates a constructor for LayottoClient.
+ *
+ * {@link DefaultObjectSerializer} is used for object and state serializers by default but is not recommended
+ * for production scenarios.
+ */
+ public LayottoReactorClientBuilder() {
+ this.objectSerializer = new DefaultObjectSerializer();
+ this.stateSerializer = new DefaultObjectSerializer();
+ this.apiProtocol = Properties.API_PROTOCOL.get();
+ }
+
+ /**
+ * Sets the serializer for objects to be sent and received from Layotto.
+ * See {@link DefaultObjectSerializer} as possible serializer for non-production scenarios.
+ *
+ * @param objectSerializer Serializer for objects to be sent and received from Layotto.
+ * @return This instance.
+ */
+ public LayottoReactorClientBuilder withObjectSerializer(LayottoObjectSerializer objectSerializer) {
+ if (objectSerializer == null) {
+ throw new IllegalArgumentException("Object serializer is required");
+ }
+
+ if (objectSerializer.getContentType() == null || objectSerializer.getContentType().isEmpty()) {
+ throw new IllegalArgumentException("Content Type should not be null or empty");
+ }
+
+ this.objectSerializer = objectSerializer;
+ return this;
+ }
+
+ /**
+ * Sets the serializer for objects to be persisted.
+ * See {@link DefaultObjectSerializer} as possible serializer for non-production scenarios.
+ *
+ * @param stateSerializer Serializer for objects to be persisted.
+ * @return This instance.
+ */
+ public LayottoReactorClientBuilder withStateSerializer(LayottoObjectSerializer stateSerializer) {
+ if (stateSerializer == null) {
+ throw new IllegalArgumentException("State serializer is required");
+ }
+
+ this.stateSerializer = stateSerializer;
+ return this;
+ }
+
+ /**
+ * Build an instance of the Client based on the provided setup.
+ *
+ * @return an instance of the setup Client
+ * @throws java.lang.IllegalStateException if any required field is missing
+ */
+ public LayottoReactorClient build() {
+ return buildLayottoClient(this.apiProtocol);
+ }
+
+ /**
+ * Creates an instance of a Layotto Client based on the chosen protocol.
+ *
+ * @param protocol Layotto API's protocol.
+ * @return the GRPC Client.
+ * @throws java.lang.IllegalStateException if either host is missing or if port is missing or a negative number.
+ */
+ private LayottoReactorClient buildLayottoClient(LayottoApiProtocol protocol) {
+ if (protocol == null) {
+ throw new IllegalStateException("Protocol is required.");
+ }
+
+ switch (protocol) {
+ case GRPC:
+ return buildLayottoClientGrpc();
+ default:
+ throw new IllegalStateException("Unsupported protocol: " + protocol.name());
+ }
+ }
+
+ /**
+ * Creates an instance of the GPRC Client.
+ *
+ * @return the GRPC Client.
+ * @throws java.lang.IllegalStateException if either host is missing or if port is missing or a negative number.
+ */
+ private LayottoReactorClient buildLayottoClientGrpc() {
+ int port = Properties.GRPC_PORT.get();
+ if (port <= 0) {
+ throw new IllegalArgumentException("Invalid port.");
+ }
+ ManagedChannel channel = ManagedChannelBuilder
+ .forAddress(Properties.SIDECAR_IP.get(), port)
+ .usePlaintext()
+ .build();
+ Closeable closeableChannel = () -> {
+ if (channel != null && !channel.isShutdown()) {
+ channel.shutdown();
+ }
+ };
+ RuntimeGrpc.RuntimeStub asyncStub = RuntimeGrpc.newStub(channel);
+ return new LayottoReactorClientGrpc(this.objectSerializer, this.stateSerializer, closeableChannel, asyncStub);
+ }
+}
diff --git a/sdk-reactor/src/main/java/io/mosn/layotto/v1/client/reactor/LayottoReactorClientGrpc.java b/sdk-reactor/src/main/java/io/mosn/layotto/v1/client/reactor/LayottoReactorClientGrpc.java
new file mode 100644
index 0000000..996502c
--- /dev/null
+++ b/sdk-reactor/src/main/java/io/mosn/layotto/v1/client/reactor/LayottoReactorClientGrpc.java
@@ -0,0 +1,624 @@
+/*
+ * Copyright 2021 Layotto Authors
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.mosn.layotto.v1.client.reactor;
+
+import com.google.common.base.Strings;
+import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Empty;
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.ClientInterceptor;
+import io.grpc.ForwardingClientCall;
+import io.grpc.Metadata;
+import io.grpc.MethodDescriptor;
+import io.grpc.stub.StreamObserver;
+import io.mosn.layotto.v1.config.Properties;
+import io.mosn.layotto.v1.exceptions.LayottoException;
+import io.mosn.layotto.v1.serializer.LayottoObjectSerializer;
+import io.mosn.layotto.v1.utils.GrpcWrapper;
+import io.mosn.layotto.v1.utils.NetworkUtils;
+import io.mosn.layotto.v1.value.Headers;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.MonoSink;
+import reactor.util.context.Context;
+import spec.proto.runtime.v1.RuntimeGrpc;
+import spec.proto.runtime.v1.RuntimeProto;
+import spec.sdk.reactor.v1.domain.core.configuration.ConfigurationItem;
+import spec.sdk.reactor.v1.domain.core.configuration.ConfigurationRequestItem;
+import spec.sdk.reactor.v1.domain.core.configuration.SaveConfigurationRequest;
+import spec.sdk.reactor.v1.domain.core.configuration.SubConfigurationResp;
+import spec.sdk.reactor.v1.domain.core.invocation.HttpExtension;
+import spec.sdk.reactor.v1.domain.core.invocation.InvokeMethodRequest;
+import spec.sdk.reactor.v1.domain.core.pubsub.PublishEventRequest;
+import spec.sdk.reactor.v1.domain.core.state.DeleteStateRequest;
+import spec.sdk.reactor.v1.domain.core.state.ExecuteStateTransactionRequest;
+import spec.sdk.reactor.v1.domain.core.state.GetBulkStateRequest;
+import spec.sdk.reactor.v1.domain.core.state.GetStateRequest;
+import spec.sdk.reactor.v1.domain.core.state.SaveStateRequest;
+import spec.sdk.reactor.v1.domain.core.state.State;
+import spec.sdk.reactor.v1.domain.core.state.StateOptions;
+import spec.sdk.reactor.v1.domain.core.state.TransactionalStateOperation;
+import spec.sdk.reactor.v1.utils.TypeRef;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+public class LayottoReactorClientGrpc extends AbstractLayottoReactorClient {
+
+ /**
+ * The GRPC managed channel to be used.
+ */
+ private final Closeable channel;
+
+ /**
+ * The async gRPC stub.
+ */
+ private final RuntimeGrpc.RuntimeStub asyncStub;
+
+ /**
+ * Default access level constructor, in order to create an instance of this class.
+ *
+ * @param closeableChannel A closeable for a Managed GRPC channel
+ * @param asyncStub async gRPC stub
+ */
+ LayottoReactorClientGrpc(LayottoObjectSerializer objectSerializer,
+ LayottoObjectSerializer stateSerializer,
+ Closeable closeableChannel,
+ RuntimeGrpc.RuntimeStub asyncStub) {
+ super(objectSerializer, stateSerializer);
+ this.channel = closeableChannel;
+ this.asyncStub = intercept(asyncStub);
+ }
+
+ @Override
+ public Mono>> getConfiguration(ConfigurationRequestItem configurationRequestItem,
+ TypeRef type) {
+ // TODO: 2021/9/26
+ return null;
+ }
+
+ @Override
+ public Mono saveConfiguration(SaveConfigurationRequest saveConfigurationRequest) {
+ // TODO: 2021/9/26
+ return null;
+ }
+
+ @Override
+ public Mono deleteConfiguration(ConfigurationRequestItem configurationRequestItem) {
+ // TODO: 2021/9/26
+ return null;
+ }
+
+ @Override
+ public Flux> subscribeConfiguration(ConfigurationRequestItem configurationRequestItem,
+ TypeRef type) {
+ // TODO: 2021/9/26
+ return null;
+ }
+
+ @Override
+ public Mono invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef type) {
+ try {
+ String appId = invokeMethodRequest.getAppId();
+ String method = invokeMethodRequest.getMethod();
+ Object body = invokeMethodRequest.getBody();
+ HttpExtension httpExtension = invokeMethodRequest.getHttpExtension();
+ RuntimeProto.InvokeServiceRequest envelope = this.buildInvokeServiceRequest(
+ httpExtension,
+ appId,
+ method,
+ body);
+ // Regarding missing metadata in method invocation for gRPC:
+ // gRPC to gRPC does not handle metadata in Layotto runtime proto.
+ // gRPC to HTTP does not map correctly in Layotto runtime as per https://github.com/layotto/layotto/issues/2342
+
+ return Mono.subscriberContext().flatMap(
+ context -> this.createMono(
+ it -> intercept(context, asyncStub).invokeService(envelope, it)
+ )
+ ).flatMap(
+ it -> {
+ try {
+ return Mono.justOrEmpty(objectSerializer.deserialize(it.getData().getValue().toByteArray(), type));
+ } catch (IOException e) {
+ throw LayottoException.propagate(e);
+ }
+ }
+ );
+ } catch (Exception ex) {
+ return LayottoException.wrapMono(ex);
+ }
+ }
+
+ @Override
+ public Mono publishEvent(PublishEventRequest request) {
+ try {
+ String pubsubName = request.getPubsubName();
+ String topic = request.getTopic();
+ Object data = request.getData();
+ RuntimeProto.PublishEventRequest.Builder envelopeBuilder = RuntimeProto.PublishEventRequest.newBuilder()
+ .setTopic(topic)
+ .setPubsubName(pubsubName)
+ .setData(ByteString.copyFrom(objectSerializer.serialize(data)));
+
+ // Content-type can be overwritten on a per-request basis.
+ // It allows CloudEvents to be handled differently, for example.
+ String contentType = request.getContentType();
+ if (contentType == null || contentType.isEmpty()) {
+ contentType = objectSerializer.getContentType();
+ }
+ envelopeBuilder.setDataContentType(contentType);
+
+ Map metadata = request.getMetadata();
+ if (metadata != null) {
+ envelopeBuilder.putAllMetadata(metadata);
+ }
+
+ return Mono.subscriberContext().flatMap(
+ context ->
+ this.createMono(
+ it -> intercept(context, asyncStub).publishEvent(envelopeBuilder.build(), it)
+ )
+ ).then();
+ } catch (Exception ex) {
+ return LayottoException.wrapMono(ex);
+ }
+ }
+
+ @Override
+ public Mono> getState(GetStateRequest request, TypeRef type) {
+ try {
+ final String stateStoreName = request.getStoreName();
+ final String key = request.getKey();
+ final StateOptions options = request.getStateOptions();
+ final Map metadata = request.getMetadata();
+
+ if ((stateStoreName == null) || (stateStoreName.trim().isEmpty())) {
+ throw new IllegalArgumentException("State store name cannot be null or empty.");
+ }
+ if ((key == null) || (key.trim().isEmpty())) {
+ throw new IllegalArgumentException("Key cannot be null or empty.");
+ }
+ RuntimeProto.GetStateRequest.Builder builder = RuntimeProto.GetStateRequest.newBuilder()
+ .setStoreName(stateStoreName)
+ .setKey(key);
+ if (metadata != null) {
+ builder.putAllMetadata(metadata);
+ }
+ if (options != null && options.getConsistency() != null) {
+ builder.setConsistency(getGrpcStateConsistency(options));
+ }
+
+ RuntimeProto.GetStateRequest envelope = builder.build();
+
+ return Mono.subscriberContext().flatMap(
+ context ->
+ this.createMono(
+ it -> intercept(context, asyncStub).getState(envelope, it)
+ )
+ ).map(
+ it -> {
+ try {
+ return buildStateKeyValue(it, key, options, type);
+ } catch (IOException ex) {
+ throw LayottoException.propagate(ex);
+ }
+ }
+ );
+ } catch (Exception ex) {
+ return LayottoException.wrapMono(ex);
+ }
+ }
+
+ @Override
+ public Mono>> getBulkState(GetBulkStateRequest request, TypeRef type) {
+ try {
+ final String stateStoreName = request.getStoreName();
+ final List keys = request.getKeys();
+ final int parallelism = request.getParallelism();
+ final Map metadata = request.getMetadata();
+ if ((stateStoreName == null) || (stateStoreName.trim().isEmpty())) {
+ throw new IllegalArgumentException("State store name cannot be null or empty.");
+ }
+ if (keys == null || keys.isEmpty()) {
+ throw new IllegalArgumentException("Key cannot be null or empty.");
+ }
+
+ if (parallelism < 0) {
+ throw new IllegalArgumentException("Parallelism cannot be negative.");
+ }
+ RuntimeProto.GetBulkStateRequest.Builder builder = RuntimeProto.GetBulkStateRequest.newBuilder()
+ .setStoreName(stateStoreName)
+ .addAllKeys(keys)
+ .setParallelism(parallelism);
+ if (metadata != null) {
+ builder.putAllMetadata(metadata);
+ }
+
+ RuntimeProto.GetBulkStateRequest envelope = builder.build();
+
+ return Mono.subscriberContext().flatMap(
+ context -> this.createMono(it -> intercept(context, asyncStub)
+ .getBulkState(envelope, it)
+ )
+ ).map(
+ it ->
+ it
+ .getItemsList()
+ .stream()
+ .map(b -> {
+ try {
+ return buildStateKeyValue(b, type);
+ } catch (Exception e) {
+ throw LayottoException.propagate(e);
+ }
+ })
+ .collect(Collectors.toList())
+ );
+ } catch (Exception ex) {
+ return LayottoException.wrapMono(ex);
+ }
+ }
+
+ @Override
+ public Mono executeStateTransaction(ExecuteStateTransactionRequest request) {
+ try {
+ final String stateStoreName = request.getStateStoreName();
+ final List> operations = request.getOperations();
+ final Map metadata = request.getMetadata();
+ if ((stateStoreName == null) || (stateStoreName.trim().isEmpty())) {
+ throw new IllegalArgumentException("State store name cannot be null or empty.");
+ }
+ RuntimeProto.ExecuteStateTransactionRequest.Builder builder = RuntimeProto.ExecuteStateTransactionRequest
+ .newBuilder();
+ builder.setStoreName(stateStoreName);
+ if (metadata != null) {
+ builder.putAllMetadata(metadata);
+ }
+ for (TransactionalStateOperation> operation : operations) {
+ RuntimeProto.TransactionalStateOperation.Builder operationBuilder = RuntimeProto.TransactionalStateOperation
+ .newBuilder();
+ operationBuilder.setOperationType(operation.getOperation().toString().toLowerCase());
+ operationBuilder.setRequest(buildStateRequest(operation.getRequest()).build());
+ builder.addOperations(operationBuilder.build());
+ }
+ RuntimeProto.ExecuteStateTransactionRequest req = builder.build();
+
+ return Mono.subscriberContext().flatMap(
+ context -> this.createMono(it -> intercept(context, asyncStub).executeStateTransaction(req, it))
+ ).then();
+ } catch (Exception e) {
+ return LayottoException.wrapMono(e);
+ }
+ }
+
+ @Override
+ public Mono saveBulkState(SaveStateRequest request) {
+ try {
+ final String stateStoreName = request.getStoreName();
+ final List> states = request.getStates();
+ if ((stateStoreName == null) || (stateStoreName.trim().isEmpty())) {
+ throw new IllegalArgumentException("State store name cannot be null or empty.");
+ }
+ RuntimeProto.SaveStateRequest.Builder builder = RuntimeProto.SaveStateRequest.newBuilder();
+ builder.setStoreName(stateStoreName);
+ for (State> state : states) {
+ builder.addStates(buildStateRequest(state).build());
+ }
+ RuntimeProto.SaveStateRequest req = builder.build();
+
+ return Mono.subscriberContext().flatMap(
+ context -> this.createMono(it -> intercept(context, asyncStub).saveState(req, it))
+ ).then();
+ } catch (Exception ex) {
+ return LayottoException.wrapMono(ex);
+ }
+ }
+
+ @Override
+ public Mono deleteState(DeleteStateRequest request) {
+ try {
+ final String stateStoreName = request.getStateStoreName();
+ final String key = request.getKey();
+ final StateOptions options = request.getStateOptions();
+ final String etag = request.getEtag();
+ final Map metadata = request.getMetadata();
+
+ if ((stateStoreName == null) || (stateStoreName.trim().isEmpty())) {
+ throw new IllegalArgumentException("State store name cannot be null or empty.");
+ }
+ if ((key == null) || (key.trim().isEmpty())) {
+ throw new IllegalArgumentException("Key cannot be null or empty.");
+ }
+
+ RuntimeProto.StateOptions.Builder optionBuilder = null;
+ if (options != null) {
+ optionBuilder = RuntimeProto.StateOptions.newBuilder();
+ if (options.getConcurrency() != null) {
+ optionBuilder.setConcurrency(getGrpcStateConcurrency(options));
+ }
+ if (options.getConsistency() != null) {
+ optionBuilder.setConsistency(getGrpcStateConsistency(options));
+ }
+ }
+ RuntimeProto.DeleteStateRequest.Builder builder = RuntimeProto.DeleteStateRequest.newBuilder()
+ .setStoreName(stateStoreName)
+ .setKey(key);
+ if (metadata != null) {
+ builder.putAllMetadata(metadata);
+ }
+ if (etag != null) {
+ builder.setEtag(RuntimeProto.Etag.newBuilder().setValue(etag).build());
+ }
+
+ if (optionBuilder != null) {
+ builder.setOptions(optionBuilder.build());
+ }
+
+ RuntimeProto.DeleteStateRequest req = builder.build();
+
+ return Mono.subscriberContext().flatMap(
+ context -> this.createMono(it -> intercept(context, asyncStub).deleteState(req, it))
+ ).then();
+ } catch (Exception ex) {
+ return LayottoException.wrapMono(ex);
+ }
+ }
+
+ /**
+ * Builds the object io.layotto.{@link RuntimeProto.InvokeServiceRequest} to be send based on the parameters.
+ *
+ * @param httpExtension Object for HttpExtension
+ * @param appId The application id to be invoked
+ * @param method The application method to be invoked
+ * @param body The body of the request to be send as part of the invocation
+ * @param The Type of the Body
+ * @return The object to be sent as part of the invocation.
+ * @throws IOException If there's an issue serializing the request.
+ */
+ private RuntimeProto.InvokeServiceRequest buildInvokeServiceRequest(
+ HttpExtension httpExtension,
+ String appId,
+ String method,
+ K body) throws IOException {
+ if (httpExtension == null) {
+ throw new IllegalArgumentException("HttpExtension cannot be null. Use HttpExtension.NONE instead.");
+ }
+ RuntimeProto.CommonInvokeRequest.Builder requestBuilder = RuntimeProto.CommonInvokeRequest.newBuilder();
+ requestBuilder.setMethod(method);
+ if (body != null) {
+ byte[] byteRequest = objectSerializer.serialize(body);
+ Any data = Any.newBuilder().setValue(ByteString.copyFrom(byteRequest)).build();
+ requestBuilder.setData(data);
+ } else {
+ requestBuilder.setData(Any.newBuilder().build());
+ }
+ RuntimeProto.HTTPExtension.Builder httpExtensionBuilder = RuntimeProto.HTTPExtension.newBuilder();
+
+ httpExtensionBuilder.setVerb(RuntimeProto.HTTPExtension.Verb.valueOf(httpExtension.getMethod().toString()))
+ .setQuerystring(httpExtension.encodeQueryString());
+ requestBuilder.setHttpExtension(httpExtensionBuilder.build());
+
+ requestBuilder.setContentType(objectSerializer.getContentType());
+
+ RuntimeProto.InvokeServiceRequest.Builder envelopeBuilder = RuntimeProto.InvokeServiceRequest.newBuilder()
+ .setId(appId)
+ .setMessage(requestBuilder.build());
+ return envelopeBuilder.build();
+ }
+
+ private State buildStateKeyValue(
+ RuntimeProto.BulkStateItem item,
+ TypeRef type) throws IOException {
+ String key = item.getKey();
+ String error = item.getError();
+ if (!Strings.isNullOrEmpty(error)) {
+ return new State<>(key, error);
+ }
+
+ ByteString payload = item.getData();
+ byte[] data = payload == null ? null : payload.toByteArray();
+ T value = stateSerializer.deserialize(data, type);
+ String etag = item.getEtag();
+ if (etag.equals("")) {
+ etag = null;
+ }
+ return new State<>(key, value, etag, item.getMetadataMap(), null);
+ }
+
+ private State buildStateKeyValue(
+ RuntimeProto.GetStateResponse response,
+ String requestedKey,
+ StateOptions stateOptions,
+ TypeRef type) throws IOException {
+ ByteString payload = response.getData();
+ byte[] data = payload == null ? null : payload.toByteArray();
+ T value = stateSerializer.deserialize(data, type);
+ String etag = response.getEtag();
+ if (etag.equals("")) {
+ etag = null;
+ }
+ return new State<>(requestedKey, value, etag, response.getMetadataMap(), stateOptions);
+ }
+
+ private RuntimeProto.StateItem.Builder buildStateRequest(State state) throws IOException {
+ byte[] bytes = stateSerializer.serialize(state.getValue());
+
+ RuntimeProto.StateItem.Builder stateBuilder = RuntimeProto.StateItem.newBuilder();
+ if (state.getEtag() != null) {
+ stateBuilder.setEtag(RuntimeProto.Etag.newBuilder().setValue(state.getEtag()).build());
+ }
+ if (state.getMetadata() != null) {
+ stateBuilder.putAllMetadata(state.getMetadata());
+ }
+ if (bytes != null) {
+ stateBuilder.setValue(ByteString.copyFrom(bytes));
+ }
+ stateBuilder.setKey(state.getKey());
+ RuntimeProto.StateOptions.Builder optionBuilder = null;
+ if (state.getOptions() != null) {
+ StateOptions options = state.getOptions();
+ optionBuilder = RuntimeProto.StateOptions.newBuilder();
+ if (options.getConcurrency() != null) {
+ optionBuilder.setConcurrency(getGrpcStateConcurrency(options));
+ }
+ if (options.getConsistency() != null) {
+ optionBuilder.setConsistency(getGrpcStateConsistency(options));
+ }
+ }
+ if (optionBuilder != null) {
+ stateBuilder.setOptions(optionBuilder.build());
+ }
+ return stateBuilder;
+ }
+
+ private RuntimeProto.StateOptions.StateConsistency getGrpcStateConsistency(StateOptions options) {
+ switch (options.getConsistency()) {
+ case EVENTUAL:
+ return RuntimeProto.StateOptions.StateConsistency.CONSISTENCY_EVENTUAL;
+ case STRONG:
+ return RuntimeProto.StateOptions.StateConsistency.CONSISTENCY_STRONG;
+ default:
+ throw new IllegalArgumentException("Missing Consistency mapping to gRPC Consistency enum");
+ }
+ }
+
+ private RuntimeProto.StateOptions.StateConcurrency getGrpcStateConcurrency(StateOptions options) {
+ switch (options.getConcurrency()) {
+ case FIRST_WRITE:
+ return RuntimeProto.StateOptions.StateConcurrency.CONCURRENCY_FIRST_WRITE;
+ case LAST_WRITE:
+ return RuntimeProto.StateOptions.StateConcurrency.CONCURRENCY_LAST_WRITE;
+ default:
+ throw new IllegalArgumentException("Missing StateConcurrency mapping to gRPC Concurrency enum");
+ }
+ }
+
+ // -- Lifecycle Functions
+
+ @Override
+ public Mono waitForSidecar(int timeoutInMilliseconds) {
+ return Mono.fromRunnable(() -> {
+ try {
+ NetworkUtils.waitForSocket(Properties.SIDECAR_IP.get(), Properties.GRPC_PORT.get(), timeoutInMilliseconds);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ @Override
+ public Mono shutdown() {
+ return Mono.subscriberContext()
+ // FIXME: 2021/9/26 Refer to Dapr
+ // .flatMap(context ->
+ // this.createMono(it ->
+ // intercept(context, asyncStub)
+ // .shutdown(Empty.getDefaultInstance(), it)))
+ .then();
+ }
+
+ private Mono createMono(Consumer> consumer) {
+ return Mono.create(sink ->
+ LayottoException
+ .wrap(() -> consumer.accept(createStreamObserver(sink)))
+ .run());
+ }
+
+ private StreamObserver createStreamObserver(MonoSink sink) {
+ return new StreamObserver() {
+ @Override
+ public void onNext(T value) {
+ sink.success(value);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ sink.error(LayottoException.propagate(new ExecutionException(t)));
+ }
+
+ @Override
+ public void onCompleted() {
+ sink.success();
+ }
+ };
+ }
+
+ /**
+ * Closes the ManagedChannel for GRPC.
+ *
+ * @throws IOException on exception.
+ * @see io.grpc.ManagedChannel#shutdown()
+ */
+ @Override
+ public void close() throws Exception {
+ if (channel != null) {
+ LayottoException
+ .wrap(() -> {
+ channel.close();
+ return true;
+ })
+ .call();
+ }
+ }
+
+ /**
+ * Populates GRPC client with interceptors for telemetry.
+ *
+ * @param context Reactor's context.
+ * @param client GRPC client for Layotto.
+ * @return Client after adding interceptors.
+ */
+ private static RuntimeGrpc.RuntimeStub intercept(Context context, RuntimeGrpc.RuntimeStub client) {
+ return GrpcWrapper.intercept(context, client);
+ }
+
+ /**
+ * Populates GRPC client with interceptors.
+ *
+ * @param client GRPC client for Layotto.
+ * @return Client after adding interceptors.
+ */
+ private static RuntimeGrpc.RuntimeStub intercept(RuntimeGrpc.RuntimeStub client) {
+ ClientInterceptor interceptor = new ClientInterceptor() {
+ @Override
+ public ClientCall interceptCall(MethodDescriptor methodDescriptor,
+ CallOptions callOptions,
+ Channel channel) {
+ ClientCall clientCall = channel.newCall(methodDescriptor, callOptions);
+ return new ForwardingClientCall.SimpleForwardingClientCall(clientCall) {
+ @Override
+ public void start(final Listener responseListener, final Metadata metadata) {
+ String layottoApiToken = Properties.API_TOKEN.get();
+ if (layottoApiToken != null) {
+ metadata.put(Metadata.Key.of(Headers.DAPR_API_TOKEN, Metadata.ASCII_STRING_MARSHALLER),
+ layottoApiToken);
+ }
+ super.start(responseListener, metadata);
+ }
+ };
+ }
+ };
+ return client.withInterceptors(interceptor);
+ }
+}
diff --git a/sdk-reactor/src/main/java/io/mosn/layotto/v1/config/BooleanProperty.java b/sdk-reactor/src/main/java/io/mosn/layotto/v1/config/BooleanProperty.java
new file mode 100644
index 0000000..d2211cd
--- /dev/null
+++ b/sdk-reactor/src/main/java/io/mosn/layotto/v1/config/BooleanProperty.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2021 Layotto Authors
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.mosn.layotto.v1.config;
+
+/**
+ * Boolean configuration property.
+ */
+public class BooleanProperty extends Property {
+
+ /**
+ * {@inheritDoc}
+ */
+ BooleanProperty(String name, String envName, Boolean defaultValue) {
+ super(name, envName, defaultValue);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected Boolean parse(String value) {
+ return Boolean.valueOf(value);
+ }
+}
diff --git a/sdk-reactor/src/main/java/io/mosn/layotto/v1/config/GenericProperty.java b/sdk-reactor/src/main/java/io/mosn/layotto/v1/config/GenericProperty.java
new file mode 100644
index 0000000..3c060b0
--- /dev/null
+++ b/sdk-reactor/src/main/java/io/mosn/layotto/v1/config/GenericProperty.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2021 Layotto Authors
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.mosn.layotto.v1.config;
+
+import java.util.function.Function;
+
+/**
+ * Configuration property for any type.
+ */
+public class GenericProperty extends Property {
+
+ private final Function parser;
+
+ /**
+ * {@inheritDoc}
+ */
+ GenericProperty(String name, String envName, T defaultValue, Function parser) {
+ super(name, envName, defaultValue);
+ this.parser = parser;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected T parse(String value) {
+ return parser.apply(value);
+ }
+}
diff --git a/sdk-reactor/src/main/java/io/mosn/layotto/v1/config/IntegerProperty.java b/sdk-reactor/src/main/java/io/mosn/layotto/v1/config/IntegerProperty.java
new file mode 100644
index 0000000..8eb6e0b
--- /dev/null
+++ b/sdk-reactor/src/main/java/io/mosn/layotto/v1/config/IntegerProperty.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2021 Layotto Authors
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.mosn.layotto.v1.config;
+
+/**
+ * Integer configuration property.
+ */
+public class IntegerProperty extends Property {
+
+ /**
+ * {@inheritDoc}
+ */
+ IntegerProperty(String name, String envName, Integer defaultValue) {
+ super(name, envName, defaultValue);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected Integer parse(String value) {
+ return Integer.valueOf(value);
+ }
+}
diff --git a/sdk-reactor/src/main/java/io/mosn/layotto/v1/config/Properties.java b/sdk-reactor/src/main/java/io/mosn/layotto/v1/config/Properties.java
new file mode 100644
index 0000000..78b0caa
--- /dev/null
+++ b/sdk-reactor/src/main/java/io/mosn/layotto/v1/config/Properties.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2021 Layotto Authors
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.mosn.layotto.v1.config;
+
+
+import io.mosn.layotto.v1.value.LayottoApiProtocol;
+
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Global properties for Layotto's SDK, using Supplier so they are dynamically resolved.
+ */
+public class Properties {
+
+ /**
+ * Layotto's default IP for gRPC communication.
+ */
+ private static final String DEFAULT_SIDECAR_IP = "127.0.0.1";
+
+ /**
+ * Layotto's default gRPC port.
+ */
+ private static final Integer DEFAULT_GRPC_PORT = 34904;
+
+ /**
+ * Layotto's default use of gRPC.
+ */
+ private static final LayottoApiProtocol DEFAULT_API_PROTOCOL = LayottoApiProtocol.GRPC;
+
+ /**
+ * Layotto's default String encoding: UTF-8.
+ */
+ private static final Charset DEFAULT_STRING_CHARSET = StandardCharsets.UTF_8;
+
+ /**
+ * IP for Layotto's sidecar.
+ */
+ public static final Property SIDECAR_IP = new StringProperty(
+ "layotto.sidecar.ip",
+ "LAYOTTO_SIDECAR_IP",
+ DEFAULT_SIDECAR_IP);
+
+ /**
+ * GRPC port for Layotto after checking system property and environment variable.
+ */
+ public static final Property GRPC_PORT = new IntegerProperty(
+ "layotto.grpc.port",
+ "LAYOTTO_GRPC_PORT",
+ DEFAULT_GRPC_PORT);
+
+ /**
+ * Determines if Layotto client will use gRPC to talk to Layotto's sidecar.
+ */
+ public static final Property API_PROTOCOL = new GenericProperty<>(
+ "layotto.api.protocol",
+ "LAYOTTO_API_PROTOCOL",
+ DEFAULT_API_PROTOCOL,
+ (s) -> LayottoApiProtocol.valueOf(s.toUpperCase()));
+
+ /**
+ * API token for authentication between App and Layotto's sidecar.
+ */
+ public static final Property API_TOKEN = new StringProperty(
+ "layotto.api.token",
+ "LAYOTTO_API_TOKEN",
+ null);
+
+ /**
+ * Determines which string encoding is used in Layotto's Java SDK.
+ */
+ public static final Property STRING_CHARSET = new GenericProperty<>(
+ "layotto.string.charset",
+ "LAYOTTO_STRING_CHARSET",
+ DEFAULT_STRING_CHARSET,
+ (s) -> Charset.forName(s));
+}
diff --git a/sdk-reactor/src/main/java/io/mosn/layotto/v1/config/Property.java b/sdk-reactor/src/main/java/io/mosn/layotto/v1/config/Property.java
new file mode 100644
index 0000000..f9801e1
--- /dev/null
+++ b/sdk-reactor/src/main/java/io/mosn/layotto/v1/config/Property.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2021 Layotto Authors
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.mosn.layotto.v1.config;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A configuration property in the Layotto's SDK.
+ */
+public abstract class Property {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(Property.class.getName());
+
+ /**
+ * Property's name as a Java Property.
+ */
+ private final String name;
+
+ /**
+ * Property's name as a environment variable.
+ */
+ private final String envName;
+
+ /**
+ * Default value.
+ */
+ private final T defaultValue;
+
+ /**
+ * Instantiates a new configuration property.
+ *
+ * @param name Java property name.
+ * @param envName Environment variable name.
+ * @param defaultValue Default value.
+ */
+ Property(String name, String envName, T defaultValue) {
+ this.name = name;
+ this.envName = envName;
+ this.defaultValue = defaultValue;
+ }
+
+ /**
+ * Gets the Java property's name.
+ *
+ * @return Name.
+ */
+ public String getName() {
+ return this.name;
+ }
+
+ /**
+ * Gets the environment variable's name.
+ *
+ * @return Name.
+ */
+ public String getEnvName() {
+ return this.envName;
+ }
+
+ /**
+ * Gets the value defined by system property first, then env variable or sticks to default.
+ *
+ * @return Value from system property (1st) or env variable (2nd) or default (last).
+ */
+ public T get() {
+ String propValue = System.getProperty(this.name);
+ if (propValue != null && !propValue.trim().isEmpty()) {
+ try {
+ return this.parse(propValue);
+ } catch (IllegalArgumentException e) {
+ LOGGER.warn(String.format("Invalid value in property: %s", this.name));
+ // OK, we tried. Falling back to system environment variable.
+ }
+ }
+
+ String envValue = System.getenv(this.envName);
+ if (envValue != null && !envValue.trim().isEmpty()) {
+ try {
+ return this.parse(envValue);
+ } catch (IllegalArgumentException e) {
+ LOGGER.warn(String.format("Invalid value in environment variable: %s", this.envName));
+ // OK, we tried. Falling back to default.
+ }
+ }
+
+ return this.defaultValue;
+ }
+
+ /**
+ * Parses the value to the specific type.
+ *
+ * @param value String value to be parsed.
+ * @return Value in the specific type.
+ */
+ protected abstract T parse(String value);
+}
diff --git a/sdk-reactor/src/main/java/io/mosn/layotto/v1/config/StringProperty.java b/sdk-reactor/src/main/java/io/mosn/layotto/v1/config/StringProperty.java
new file mode 100644
index 0000000..34939aa
--- /dev/null
+++ b/sdk-reactor/src/main/java/io/mosn/layotto/v1/config/StringProperty.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2021 Layotto Authors
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.mosn.layotto.v1.config;
+
+/**
+ * String configuration property.
+ */
+public class StringProperty extends Property {
+
+ /**
+ * {@inheritDoc}
+ */
+ StringProperty(String name, String envName, String defaultValue) {
+ super(name, envName, defaultValue);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected String parse(String value) {
+ return value;
+ }
+}
diff --git a/sdk-reactor/src/main/java/io/mosn/layotto/v1/domain/CloudEvent.java b/sdk-reactor/src/main/java/io/mosn/layotto/v1/domain/CloudEvent.java
new file mode 100644
index 0000000..312c36d
--- /dev/null
+++ b/sdk-reactor/src/main/java/io/mosn/layotto/v1/domain/CloudEvent.java
@@ -0,0 +1,306 @@
+/*
+ * Copyright 2021 Layotto Authors
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.mosn.layotto.v1.domain;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Objects;
+
+/**
+ * A cloud event in Layotto.
+ *
+ * @param The type of the payload.
+ */
+public final class CloudEvent {
+
+ /**
+ * Mime type used for CloudEvent.
+ */
+ public static final String CONTENT_TYPE = "application/cloudevents+json";
+
+ /**
+ * Shared Json serializer/deserializer as per Jackson's documentation.
+ */
+ protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
+ .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
+ false)
+ .setSerializationInclusion(JsonInclude.Include.NON_NULL);
+
+ /**
+ * Identifier of the message being processed.
+ */
+ private String id;
+
+ /**
+ * Event's source.
+ */
+ private String source;
+
+ /**
+ * Envelope type.
+ */
+ private String type;
+
+ /**
+ * Version of the specification.
+ */
+ private String specversion;
+
+ /**
+ * Type of the data's content.
+ */
+ private String datacontenttype;
+
+ /**
+ * Cloud event specs says data can be a JSON object or string.
+ */
+ private T data;
+
+ /**
+ * Cloud event specs says binary data should be in data_base64.
+ */
+ @JsonProperty("data_base64")
+ private byte[] binaryData;
+
+ /**
+ * Instantiates a CloudEvent.
+ */
+ public CloudEvent() {
+ }
+
+ /**
+ * Instantiates a CloudEvent.
+ *
+ * @param id Identifier of the message being processed.
+ * @param source Source for this event.
+ * @param type Type of event.
+ * @param specversion Version of the event spec.
+ * @param datacontenttype Type of the payload.
+ * @param data Payload.
+ */
+ public CloudEvent(
+ String id,
+ String source,
+ String type,
+ String specversion,
+ String datacontenttype,
+ T data) {
+ this.id = id;
+ this.source = source;
+ this.type = type;
+ this.specversion = specversion;
+ this.datacontenttype = datacontenttype;
+ this.data = data;
+ }
+
+ /**
+ * Instantiates a CloudEvent.
+ *
+ * @param id Identifier of the message being processed.
+ * @param source Source for this event.
+ * @param type Type of event.
+ * @param specversion Version of the event spec.
+ * @param binaryData Payload.
+ */
+ public CloudEvent(
+ String id,
+ String source,
+ String type,
+ String specversion,
+ byte[] binaryData) {
+ this.id = id;
+ this.source = source;
+ this.type = type;
+ this.specversion = specversion;
+ this.datacontenttype = "application/octet-stream";
+ this.binaryData = binaryData == null ? null : Arrays.copyOf(binaryData, binaryData.length);
+ ;
+ }
+
+ /**
+ * Deserialize a message topic from Layotto.
+ *
+ * @param payload Payload sent from Layotto.
+ * @return Message (can be null if input is null)
+ * @throws IOException If cannot parse.
+ */
+ public static CloudEvent> deserialize(byte[] payload) throws IOException {
+ if (payload == null) {
+ return null;
+ }
+
+ return OBJECT_MAPPER.readValue(payload, CloudEvent.class);
+ }
+
+ /**
+ * Gets the identifier of the message being processed.
+ *
+ * @return Identifier of the message being processed.
+ */
+ public String getId() {
+ return id;
+ }
+
+ /**
+ * Sets the identifier of the message being processed.
+ *
+ * @param id Identifier of the message being processed.
+ */
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ /**
+ * Gets the event's source.
+ *
+ * @return Event's source.
+ */
+ public String getSource() {
+ return source;
+ }
+
+ /**
+ * Sets the event's source.
+ *
+ * @param source Event's source.
+ */
+ public void setSource(String source) {
+ this.source = source;
+ }
+
+ /**
+ * Gets the envelope type.
+ *
+ * @return Envelope type.
+ */
+ public String getType() {
+ return type;
+ }
+
+ /**
+ * Sets the envelope type.
+ *
+ * @param type Envelope type.
+ */
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ /**
+ * Gets the version of the specification.
+ *
+ * @return Version of the specification.
+ */
+ public String getSpecversion() {
+ return specversion;
+ }
+
+ /**
+ * Sets the version of the specification.
+ *
+ * @param specversion Version of the specification.
+ */
+ public void setSpecversion(String specversion) {
+ this.specversion = specversion;
+ }
+
+ /**
+ * Gets the type of the data's content.
+ *
+ * @return Type of the data's content.
+ */
+ public String getDatacontenttype() {
+ return datacontenttype;
+ }
+
+ /**
+ * Sets the type of the data's content.
+ *
+ * @param datacontenttype Type of the data's content.
+ */
+ public void setDatacontenttype(String datacontenttype) {
+ this.datacontenttype = datacontenttype;
+ }
+
+ /**
+ * Gets the cloud event data.
+ *
+ * @return Cloud event's data. As per specs, data can be a JSON object or string.
+ */
+ public T getData() {
+ return data;
+ }
+
+ /**
+ * Sets the cloud event data. As per specs, data can be a JSON object or string.
+ *
+ * @param data Cloud event's data. As per specs, data can be a JSON object or string.
+ */
+ public void setData(T data) {
+ this.data = data;
+ }
+
+ /**
+ * Gets the cloud event's binary data.
+ *
+ * @return Cloud event's binary data.
+ */
+ public byte[] getBinaryData() {
+ return this.binaryData == null ? null : Arrays.copyOf(this.binaryData, this.binaryData.length);
+ }
+
+ /**
+ * Sets the cloud event's binary data.
+ *
+ * @param binaryData Cloud event's binary data.
+ */
+ public void setBinaryData(byte[] binaryData) {
+ this.binaryData = binaryData == null ? null : Arrays.copyOf(binaryData, binaryData.length);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CloudEvent> that = (CloudEvent>) o;
+ return Objects.equals(id, that.id)
+ && Objects.equals(source, that.source)
+ && Objects.equals(type, that.type)
+ && Objects.equals(specversion, that.specversion)
+ && Objects.equals(datacontenttype, that.datacontenttype)
+ && Objects.equals(data, that.data)
+ && Arrays.equals(binaryData, that.binaryData);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public int hashCode() {
+ return Objects.hash(id, source, type, specversion, datacontenttype, data, binaryData);
+ }
+}
diff --git a/sdk-reactor/src/main/java/io/mosn/layotto/v1/exceptions/LayottoError.java b/sdk-reactor/src/main/java/io/mosn/layotto/v1/exceptions/LayottoError.java
new file mode 100644
index 0000000..c7ddaa0
--- /dev/null
+++ b/sdk-reactor/src/main/java/io/mosn/layotto/v1/exceptions/LayottoError.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2021 Layotto Authors
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.mosn.layotto.v1.exceptions;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import io.grpc.Status;
+
+/**
+ * Represents an error message from Layotto.
+ */
+@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY)
+public class LayottoError {
+
+ /**
+ * Error code.
+ */
+ private String errorCode;
+
+ /**
+ * Error Message.
+ */
+ private String message;
+
+ /**
+ * Error code from gRPC.
+ */
+ private Integer code;
+
+ /**
+ * Gets the error code.
+ *
+ * @return Error code.
+ */
+ public String getErrorCode() {
+ if ((errorCode == null) && (code != null)) {
+ return Status.fromCodeValue(code).getCode().name();
+ }
+ return errorCode;
+ }
+
+ /**
+ * Sets the error code.
+ *
+ * @param errorCode Error code.
+ * @return This instance.
+ */
+ public LayottoError setErrorCode(String errorCode) {
+ this.errorCode = errorCode;
+ return this;
+ }
+
+ /**
+ * Gets the error message.
+ *
+ * @return Error message.
+ */
+ public String getMessage() {
+ return message;
+ }
+
+ /**
+ * Sets the error message.
+ *
+ * @param message Error message.
+ * @return This instance.
+ */
+ public LayottoError setMessage(String message) {
+ this.message = message;
+ return this;
+ }
+}
diff --git a/sdk-reactor/src/main/java/io/mosn/layotto/v1/exceptions/LayottoException.java b/sdk-reactor/src/main/java/io/mosn/layotto/v1/exceptions/LayottoException.java
new file mode 100644
index 0000000..dedadef
--- /dev/null
+++ b/sdk-reactor/src/main/java/io/mosn/layotto/v1/exceptions/LayottoException.java
@@ -0,0 +1,203 @@
+/*
+ * Copyright 2021 Layotto Authors
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.mosn.layotto.v1.exceptions;
+
+import io.grpc.StatusRuntimeException;
+import reactor.core.Exceptions;
+import reactor.core.publisher.Mono;
+
+import java.util.concurrent.Callable;
+
+/**
+ * A Layotto's specific exception.
+ */
+public class LayottoException extends RuntimeException {
+
+ /**
+ * Layotto's error code for this exception.
+ */
+ private final String errorCode;
+
+ /**
+ * New exception from a server-side generated error code and message.
+ *
+ * @param daprError Server-side error.
+ */
+ public LayottoException(LayottoError daprError) {
+ this(daprError.getErrorCode(), daprError.getMessage());
+ }
+
+ /**
+ * New exception from a server-side generated error code and message.
+ *
+ * @param daprError Client-side error.
+ * @param cause the cause (which is saved for later retrieval by the
+ * {@link #getCause()} method). (A {@code null} value is
+ * permitted, and indicates that the cause is nonexistent or
+ * unknown.)
+ */
+ public LayottoException(LayottoError daprError, Throwable cause) {
+ this(daprError.getErrorCode(), daprError.getMessage(), cause);
+ }
+
+ /**
+ * Wraps an exception into a LayottoException.
+ *
+ * @param exception the exception to be wrapped.
+ */
+ public LayottoException(Throwable exception) {
+ this("UNKNOWN", exception.getMessage(), exception);
+ }
+
+ /**
+ * New Exception from a client-side generated error code and message.
+ *
+ * @param errorCode Client-side error code.
+ * @param message Client-side error message.
+ */
+ public LayottoException(String errorCode, String message) {
+ super(String.format("%s: %s", errorCode, message));
+ this.errorCode = errorCode;
+ }
+
+ /**
+ * New exception from a server-side generated error code and message.
+ *
+ * @param errorCode Client-side error code.
+ * @param message Client-side error message.
+ * @param cause the cause (which is saved for later retrieval by the
+ * {@link #getCause()} method). (A {@code null} value is
+ * permitted, and indicates that the cause is nonexistent or
+ * unknown.)
+ */
+ public LayottoException(String errorCode, String message, Throwable cause) {
+ super(String.format("%s: %s", errorCode, emptyIfNull(message)), cause);
+ this.errorCode = errorCode;
+ }
+
+ /**
+ * Returns the exception's error code.
+ *
+ * @return Error code.
+ */
+ public String getErrorCode() {
+ return this.errorCode;
+ }
+
+ /**
+ * Wraps an exception into LayottoException (if not already LayottoException).
+ *
+ * @param exception Exception to be wrapped.
+ */
+ public static void wrap(Throwable exception) {
+ if (exception == null) {
+ return;
+ }
+
+ throw propagate(exception);
+ }
+
+ /**
+ * Wraps a callable with a try-catch to throw LayottoException.
+ *
+ * @param callable callable to be invoked.
+ * @param type to be returned
+ * @return object of type T.
+ */
+ public static Callable wrap(Callable callable) {
+ return () -> {
+ try {
+ return callable.call();
+ } catch (Exception e) {
+ wrap(e);
+ return null;
+ }
+ };
+ }
+
+ /**
+ * Wraps a runnable with a try-catch to throw LayottoException.
+ *
+ * @param runnable runnable to be invoked.
+ * @return object of type T.
+ */
+ public static Runnable wrap(Runnable runnable) {
+ return () -> {
+ try {
+ runnable.run();
+ } catch (Exception e) {
+ wrap(e);
+ }
+ };
+ }
+
+ /**
+ * Wraps an exception into LayottoException (if not already LayottoException).
+ *
+ * @param exception Exception to be wrapped.
+ * @param Mono's response type.
+ * @return Mono containing LayottoException.
+ */
+ public static