Skip to content

Commit

Permalink
feat(spring-sdk): provide access to currentState on views
Browse files Browse the repository at this point in the history
  • Loading branch information
efgpinto committed Sep 9, 2022
1 parent 24462ca commit d7a4a02
Show file tree
Hide file tree
Showing 13 changed files with 368 additions and 33 deletions.
@@ -0,0 +1,17 @@
package wiring.view;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

public class UserWithVersion {
public final String name;
public final String email;
public final int version;

@JsonCreator
public UserWithVersion(@JsonProperty String name, @JsonProperty String email, @JsonProperty int version) {
this.name = name;
this.email = email;
this.version = version;
}
}
@@ -0,0 +1,31 @@
package wiring.view;

import kalix.javasdk.view.View;
import kalix.springsdk.annotations.Query;
import kalix.springsdk.annotations.Subscribe;
import kalix.springsdk.annotations.Table;
import org.springframework.web.bind.annotation.*;
import wiring.domain.User;
import wiring.valueentity.UserValueEntity;


@Table("users_view")
public class UserWithVersionView extends View<UserWithVersion> {

// when methods are annotated, it's implicitly a transform = true
@Subscribe.ValueEntity(UserValueEntity.class)
public UpdateEffect<UserWithVersion> onChange(UserWithVersion userView, User user) {
return effects().updateState(new UserWithVersion(user.name, user.email, userView.version + 1));
}

@Query("SELECT * FROM users_view WHERE email = :email")
@PostMapping("/users/by-email/{email}")
public UserWithVersion getUser(@PathVariable String email) {
return null;
}

@Override
public UserWithVersion emptyState() {
return new UserWithVersion("", "", 0);
}
}
39 changes: 32 additions & 7 deletions sdk/java-sdk/src/main/java/kalix/javasdk/Kalix.java
Expand Up @@ -266,12 +266,24 @@ private Kalix registerView(
Descriptors.FileDescriptor... additionalDescriptors) {

AnySupport anySupport = newAnySupport(additionalDescriptors);
return registerView(
factory, anySupport, descriptor, viewId, viewOptions, additionalDescriptors);
}

private Kalix registerView(
ViewFactory factory,
MessageCodec messageCodec,
Descriptors.ServiceDescriptor descriptor,
String viewId,
ViewOptions viewOptions,
Descriptors.FileDescriptor... additionalDescriptors) {

ViewService service =
new ViewService(
Optional.ofNullable(factory),
descriptor,
additionalDescriptors,
anySupport,
messageCodec,
viewId,
viewOptions);
services.put(descriptor.getFullName(), system -> service);
Expand Down Expand Up @@ -430,12 +442,25 @@ public <S, E extends EventSourcedEntity<S>> Kalix register(
* @return This stateful service builder.
*/
public Kalix register(ViewProvider<?, ?> provider) {
return lowLevel.registerView(
provider::newRouter,
provider.serviceDescriptor(),
provider.viewId(),
provider.options(),
provider.additionalDescriptors());
return provider
.alternativeCodec()
.map(
codec ->
lowLevel.registerView(
provider::newRouter,
codec,
provider.serviceDescriptor(),
provider.viewId(),
provider.options(),
provider.additionalDescriptors()))
.orElseGet(
() ->
lowLevel.registerView(
provider::newRouter,
provider.serviceDescriptor(),
provider.viewId(),
provider.options(),
provider.additionalDescriptors()));
}

/**
Expand Down
Expand Up @@ -35,7 +35,7 @@ final class ViewService(
val factory: Optional[ViewFactory],
override val descriptor: Descriptors.ServiceDescriptor,
override val additionalDescriptors: Array[Descriptors.FileDescriptor],
val anySupport: AnySupport,
val messageCodec: MessageCodec,
val viewId: String,
val viewOptions: Option[ViewOptions])
extends Service {
Expand All @@ -44,10 +44,10 @@ final class ViewService(
factory: Optional[ViewFactory],
descriptor: Descriptors.ServiceDescriptor,
additionalDescriptors: Array[Descriptors.FileDescriptor],
anySupport: AnySupport,
messageCodec: MessageCodec,
viewId: String,
viewOptions: ViewOptions) =
this(factory, descriptor, additionalDescriptors, anySupport, viewId, Some(viewOptions))
this(factory, descriptor, additionalDescriptors, messageCodec, viewId, Some(viewOptions))

override def resolvedMethods: Option[Map[String, ResolvedServiceMethod[_, _]]] =
factory.asScala.collect { case resolved: ResolvedEntityFactory =>
Expand Down Expand Up @@ -102,10 +102,10 @@ final class ViewsImpl(system: ActorSystem, _services: Map[String, ViewService],

val state: Option[Any] =
receiveEvent.bySubjectLookupResult.flatMap(row =>
row.value.map(scalaPb => service.anySupport.decodeMessage(scalaPb)))
row.value.map(scalaPb => service.messageCodec.decodeMessage(scalaPb)))

val commandName = receiveEvent.commandName
val msg = service.anySupport.decodeMessage(receiveEvent.payload.get)
val msg = service.messageCodec.decodeMessage(receiveEvent.payload.get)
val metadata = new MetadataImpl(receiveEvent.metadata.map(_.entries.toVector).getOrElse(Nil))
val context = new UpdateContextImpl(service.viewId, commandName, metadata)

Expand All @@ -122,7 +122,7 @@ final class ViewsImpl(system: ActorSystem, _services: Map[String, ViewService],
case ViewUpdateEffectImpl.Update(newState) =>
if (newState == null)
throw ViewException(context, "updateState with null state is not allowed.", None)
val serializedState = ScalaPbAny.fromJavaProto(service.anySupport.encodeJava(newState))
val serializedState = ScalaPbAny.fromJavaProto(service.messageCodec.encodeJava(newState))
val upsert = pv.Upsert(Some(pv.Row(value = Some(serializedState))))
val out = pv.ViewStreamOut(pv.ViewStreamOut.Message.Upsert(upsert))
Source.single(out)
Expand Down
Expand Up @@ -19,6 +19,7 @@
import com.example.Main;
import com.example.wiring.actions.echo.Message;
import com.example.wiring.valueentities.user.User;
import com.example.wiring.views.UserWithVersion;
import kalix.springsdk.KalixConfigurationTest;
import org.hamcrest.core.IsEqual;
import org.junit.jupiter.api.Assertions;
Expand All @@ -43,9 +44,11 @@
import static org.awaitility.Awaitility.await;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static java.time.temporal.ChronoUnit.SECONDS;
import static org.awaitility.Awaitility.await;

@ExtendWith(SpringExtension.class)
@SpringBootTest(classes = Main.class)
Expand All @@ -55,7 +58,7 @@ public class SpringSdkWiringIntegrationTest {

@Autowired private WebClient webClient;

private Duration timeout = Duration.of(5, SECONDS);
private Duration timeout = Duration.of(10, SECONDS);

@Test
public void verifyEchoActionWiring() {
Expand Down Expand Up @@ -115,6 +118,48 @@ public void verifyCounterEventSourcedWiring() {
Assertions.assertEquals("\"200\"", counterGet);
}

@Test
public void verifyTransformedUserViewWiring() throws InterruptedException {

User u1 = new User("john@doe.com", "JohnDoe");
String userCreation =
webClient
.post()
.uri("/user/JohnDoe/" + u1.email + "/" + u1.name)
.retrieve()
.bodyToMono(String.class)
.block(timeout);
Assertions.assertEquals("\"Ok\"", userCreation);

String userUpdate =
webClient
.post()
.uri("/user/JohnDoe/" + u1.email + "/JohnDoeJr")
.retrieve()
.bodyToMono(String.class)
.block(timeout);
Assertions.assertEquals("\"Ok\"", userUpdate);

// the view is eventually updated
await()
.atMost(15, TimeUnit.of(SECONDS))
.until(
() -> {
try {
var userView =
webClient
.get()
.uri("/users/by-email/" + u1.email)
.retrieve()
.bodyToMono(UserWithVersion.class)
.block(timeout);
return userView != null && 2 == userView.version;
} catch (Exception e) {
return false;
}
});
}

@Test
public void verifyFindUsersByEmail() {

Expand Down Expand Up @@ -143,7 +188,7 @@ public void verifyFindUsersByEmail() {
new IsEqual("jane.example.com"));
}

// @Test
// @Test
public void verifyFindUsersByNameStreaming() {

{ // joe 1
Expand Down
Expand Up @@ -24,7 +24,7 @@ public class User {
public String name;

@JsonCreator
public User(@JsonProperty("email") String email, @JsonProperty("name") String name) {
public User(@JsonProperty String email, @JsonProperty String name) {
this.email = email;
this.name = name;
}
Expand Down
@@ -0,0 +1,32 @@
/*
* Copyright 2021 Lightbend Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.example.wiring.views;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

public class UserWithVersion {
public final String email;
public final int version;

@JsonCreator
public UserWithVersion(
@JsonProperty("email") String email, @JsonProperty("version") int version) {
this.email = email;
this.version = version;
}
}
@@ -0,0 +1,42 @@
/*
* Copyright 2021 Lightbend Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.example.wiring.views;

import com.example.wiring.valueentities.user.User;
import com.example.wiring.valueentities.user.UserEntity;
import kalix.javasdk.view.View;
import kalix.springsdk.annotations.Query;
import kalix.springsdk.annotations.Subscribe;
import kalix.springsdk.annotations.Table;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;

@Table("user_view")
public class UserWithVersionView extends View<UserWithVersion> {

@Subscribe.ValueEntity(UserEntity.class)
public UpdateEffect<UserWithVersion> onChange(UserWithVersion state, User user) {
if (state == null) return effects().updateState(new UserWithVersion(user.email, 1));
else return effects().updateState(new UserWithVersion(user.email, state.version + 1));
}

@Query("SELECT * FROM user_view WHERE email = :email")
@GetMapping("/users/by-email/{email}")
public UserWithVersion getUser(@PathVariable String email) {
return null;
}
}
Expand Up @@ -90,21 +90,36 @@ private[impl] object ViewDescriptorFactory extends ComponentDescriptorFactory {
.map { method =>
// validate that all updates return the same type
val valueEntityClass = method.getAnnotation(classOf[Subscribe.ValueEntity]).value().asInstanceOf[Class[_]]
val valueEntityEventClass = valueEntityClass.getGenericSuperclass
.asInstanceOf[ParameterizedType]
.getActualTypeArguments
.head
.asInstanceOf[Class[_]]
previousValueEntityClass match {
case Some(`valueEntityClass`) => // ok
case Some(other) =>
throw InvalidComponentException(
s"All update methods must return the same type, but [${method.getName}] returns [${valueEntityClass.getName}] while a previous update method returns [${other.getName}]")
case None => previousValueEntityClass = Some(valueEntityClass)
}
// FIXME validate that transform method accepts value entity state type

method.getParameterTypes.toList match {
case p1 :: Nil if p1 != valueEntityEventClass =>
throw InvalidComponentException(
s"Method [${method.getName}] annotated with '@Subscribe' should either receive a single parameter of type [${valueEntityEventClass.getName}] or two ordered parameters of type [${tableType.getName}, ${valueEntityEventClass.getName}]")
case p1 :: p2 :: Nil if p1 != tableType || p2 != valueEntityEventClass =>
throw InvalidComponentException(
s"Method [${method.getName}] annotated with '@Subscribe' should either receive a single parameter of type [${valueEntityEventClass.getName}] or two ordered parameters of type [${tableType.getName}, ${valueEntityEventClass.getName}]")
case _ => // happy days, dev did good with the signature
}

// event sourced or topic subscription updates
val methodOptionsBuilder = kalix.MethodOptions.newBuilder()
methodOptionsBuilder.setEventing(eventingInForValueEntity(method))
addTableOptionsToUpdateMethod(tableName, tableProtoMessageName, methodOptionsBuilder, true)

KalixMethod(RestServiceMethod(method)).withKalixOptions(methodOptionsBuilder.build())
KalixMethod(RestServiceMethod(method, None, method.getParameterCount - 1))
.withKalixOptions(methodOptionsBuilder.build())

}
.toSeq
Expand Down
Expand Up @@ -95,9 +95,10 @@ case class VirtualServiceMethod(component: Class[_], methodName: String, inputTy
* Build from methods annotated with @Subscription. Those methods are not annotated with Spring REST annotations, but
* they become a REST method at the end.
*/
case class RestServiceMethod(javaMethod: Method, optMethodName: Option[String] = None) extends AnyServiceMethod {
case class RestServiceMethod(javaMethod: Method, optMethodName: Option[String] = None, inputTypeParamIndex: Int = 0)
extends AnyServiceMethod {

val inputType: Class[_] = javaMethod.getParameterTypes()(0)
val inputType: Class[_] = javaMethod.getParameterTypes()(inputTypeParamIndex)

override def methodName: String = optMethodName.getOrElse(javaMethod.getName)

Expand Down

0 comments on commit d7a4a02

Please sign in to comment.