Skip to content
This repository has been archived by the owner on May 9, 2019. It is now read-only.

Add readside processor to user #131

Merged
merged 8 commits into from
Aug 2, 2017
Merged

Add readside processor to user #131

merged 8 commits into from
Aug 2, 2017

Conversation

lakhina
Copy link
Contributor

@lakhina lakhina commented Jul 13, 2017

Phase one of #120 is complete. Now getUsers() provide you 10 users in nav list in with alphabatically least uuid

@lakhina lakhina changed the title Add readside processor to user credential [WIP] Add readside processor to user credential Jul 13, 2017
@lakhina lakhina force-pushed the readside branch 2 times, most recently from 40f14d3 to 5944d45 Compare July 13, 2017 12:09
@lakhina lakhina changed the title [WIP] Add readside processor to user credential [WIP] Add readside processor to user Jul 13, 2017
@lakhina lakhina changed the title [WIP] Add readside processor to user Add readside processor to user Jul 13, 2017
@lakhina lakhina requested review from jroper, TimMoore and ignasi35 and removed request for jroper and TimMoore July 13, 2017 14:34
Copy link
Contributor

@TimMoore TimMoore left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend writing a test for this, like the ones for ItemRepository and TransactionRepository, so that it will be easier to catch bugs more quickly.

@@ -2,6 +2,7 @@

import com.lightbend.lagom.serialization.Jsonable;
import lombok.Value;
import lombok.experimental.Wither;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it used?

).withPathParamSerializer(
UUID.class, PathParamSerializers.required("UUID", UUID::fromString, UUID::toString)
).withHeaderFilter(SecurityHeaderFilter.INSTANCE);
// ).withPathParamSerializer(UUID.class, PathParamSerializers.required("UUID", UUID::fromString, UUID::toString));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The commented line can be deleted, since it is duplicated above.


user.getId(),
user.getEmail(),
user.getName()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't look quite right. The statement you are binding to is this:

"INSERT INTO User(UserId, Name, email, PasswordHash) VALUES (?, ?, ?, ?)"

You need to bind the same number of parameters, in the same order they appear in the statement.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, I would also say that you do not need to include the PasswordHash column in this table at all, since it is not used on the read side.


private CompletionStage<List<BoundStatement>> insertUser(PUser user) {
return completedStatements(
(List<BoundStatement>) insertUserCreator(user)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cast will fail: a BoundStatement cannot be cast to a List<BoundStatement>. You will need to wrap it in a list instead, such as Arrays.asList(insertUserCreator(user)).

"UserId UUID PRIMARY KEY, " +
"Name text, "+
"email text, " +
"PasswordHash text "+
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned in another comment, there's no need for the PasswordHash column, since it is never read.

pathCall("/api/user?pageNo&pageSize", this::getUsers)
).withPathParamSerializer(
UUID.class, PathParamSerializers.required("UUID", UUID::fromString, UUID::toString)
).withHeaderFilter(SecurityHeaderFilter.INSTANCE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason for adding the SecurityHeaderFilter? I don't see it used anywhere.

private CompletionStage<Done> prepareInsertUserStatement() {

return session.
prepare("INSERT INTO User(" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name of the table here ("User") does not match the table created above ("UserInfo").

);
}
private CompletionStage<Optional<Row>> selectUser(UUID UserId) {
return session.selectOne("SELECT * FROM UserCreator WHERE UserId = ?", UserId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This table name also does not match either of the other two named used.

@lakhina lakhina force-pushed the readside branch 5 times, most recently from d8bae49 to cd05b99 Compare July 15, 2017 19:08
"CREATE TABLE IF NOT EXISTS UserInfo (" +
"UserId UUID PRIMARY KEY, " +
"Name text, " +
"email text, " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what happened to the comment from @yg-apaza pointing out the same, but there is a syntax error here: trailing comma on the last column.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TimMoore @yg-apaza Thanks. I will fix it.

Copy link
Member

@octonato octonato Jul 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a syntax error, on the contrary.

If I remove the trailing comma I get a syntax error from the cassandra driver.
ItemRepository also have a comma separating the last column and the definition of a compound primary key.

There is two slightly different syntax.

CREATE TABLE demo (
 foo text PRIMARY KEY,
 bar text
)

No comma between foo column definition and PRIMARY KEY as we are marking foo as the primary key.

CREATE TABLE demo (
 foo text,
 bar text,
 PRIMARY KEY(foo, bar)
)

In the above example we do need a comma because we are defining a compound PK. In that case, "PRIMARY KEY" is not part of the column definition and therefore a comma is needed

Copy link
Member

@octonato octonato Jul 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TimMoore, I think that @yg-apaza's comment disappeared because she may have realised it and removed it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rcavalcanti I have changed this query now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, seems that I had already checked out the one with the compound key that made the comments here obsolete. Sorry for the noisy on the line.

@TimMoore TimMoore assigned lakhina and unassigned yg-apaza Jul 18, 2017
@lakhina lakhina force-pushed the readside branch 2 times, most recently from 1594163 to af92ea0 Compare July 18, 2017 15:47
private final String name;
private final String email;

@JsonCreator
public User(@JsonProperty("id") UUID id, @JsonProperty("name") String name, @JsonProperty("email") String email) {
public User(@JsonProperty("id") UUID id, @JsonProperty("creatorId") UUID creatorId , @JsonProperty("name") String name, @JsonProperty("email") String email) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want it to be a timestamp (maybe called createdAt) rather than a UUID.

@@ -4,19 +4,23 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Value;

import java.sql.Timestamp;
import java.time.Instant;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that you have imported both Timestamp and Instant here... it would be better to use Instant and not Timestamp, because java.sql.Timestamp is specific to JDBC, which we are not using here.

private final String name;
private final String email;

@JsonCreator
public User(@JsonProperty("id") UUID id, @JsonProperty("name") String name, @JsonProperty("email") String email) {
public User(@JsonProperty("id") UUID id, @JsonProperty("createdAt") Timestamp createdAt , @JsonProperty("name") String name, @JsonProperty("email") String email) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This constructor should be removed, similar to all of the other @Value classes.

import java.util.Optional;
import java.util.UUID;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please be sure to clean unused imports.


public interface PUserCommand extends Jsonable {
@Value
final class CreatePUser implements PUserCommand, PersistentEntity.ReplyType<PUser> {
private final Timestamp createdAt;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than putting createdAt in the command, I think it's better to generate it within the persistent entity when a CreatePUser command is received.

return insertUserStatement.bind(

user.getId(),
user.getCreatedAt(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to bind an Instant to a Cassandra query, you can use the optional InstantCodec provided by the Java Cassandra driver.

You can see an example of how to register a codec in ItemRepository (and there is a similar example in TransactionRepository).

@@ -1,5 +1,5 @@
play.http.secret.key = "somesecret"
online-auction.instruction.show = true

lagom.persistence.read-side.global-prepare-timeout = 60s
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this doesn't fix the problem for you, then it should probably be removed. We'll need to find some other solution.

@TimMoore
Copy link
Contributor

@lakhina the latest build has a new error:

 Caused by: java.util.concurrent.ExecutionException: java.util.concurrent.ExecutionException: com.datastax.driver.core.exceptions.SyntaxError: line 1:107 mismatched input '(' expecting ')' (..., email text PRIMARY KEY [(]createdAt...)

This indicates a syntax error in your CQL statement. Have a look at the stack trace to find where.

The error "Couldn't register the processor on the testkit" is a generic wrapper exception that could have a variety of different causes. It's important to always read the nested exceptions to find the root cause of the error.

@lakhina lakhina force-pushed the readside branch 2 times, most recently from 427c8d1 to 694a55d Compare July 25, 2017 18:04
Copy link
Contributor

@TimMoore TimMoore left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lakhina can you explain your thinking for "Change reply type of PUserCreated to Optional" please? I don't understand why this change was made.

@octonato
Copy link
Member

@lakhina I have been trying to trace why the build is failing.

Most of the times it fails with a timeout on the actor system when it tries to create the tables. I could find the cause of that yet. I will keep searching.

However, from time to time it succeeds and when it happens we get other errors on the UserRepositoryTest (see my comments there)


private CompletionStage<List<BoundStatement>> insertUser(PUser user) {
return completedStatements(
Arrays.asList(insertUserCreator(user))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we can remove the Arrays.asList because there is an overloaded completedStatements that gets a varargs.

}

PaginatedSequence<User> createdUsers = Await.result(userRepository.getUsers(1, 10));
assertEquals(25, createdUsers.getCount());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we insert a User on previous test, the count here will be 26.

We have two options:

  • make sure table is cleared before each test (costly)
  • cache the current count before insert the new Users and compare with it.
int size = 25
int initialCount = createdUsers.getCount();

// create users here

assertEquals(initialCount + size, createdUsers.getCount());

assertEquals(25, createdUsers.getCount());
assertEquals(10, createdUsers.getItems().size());
// default ordering is time DESC so page 2 of size 10 over a set of 25 returns item ids 5-14. On that seq, the fifth item is id=10
assertEquals("user10", createdUsers.getItems().get(4).getName());
Copy link
Member

@octonato octonato Jul 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm getting admin4 instead. Not user10

@lakhina
Copy link
Contributor Author

lakhina commented Jul 28, 2017

@rcavalcanti Thanks for the help. I have made the suggested changes but still the same error is persisting.

@octonato
Copy link
Member

@lakhina the fix I suggested won't fix the build problem (see previous comment).

What I observed is that when it happens to pass the stage of table creation, it does fail because of errors in the test itself.

Can you push your last changes? Thanks

private final PUser userCreated = new PUser(userId, createdAt, name, email, password);
int initialCount = createdUsers.getCount();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to have it inside the test method instead.

size is the number of users you plan to create inside shouldPaginateUserRetrieval method.
Preferably, we should use this value in the for loop as well.

for (int i = 0; i < size; i++) { ... }

The initialCount as well. This call will happen on class initialisation. At that point, the initial count is 0.

My suggestion was more something like the following:

 @Test
public void shouldPaginateUserRetrieval() throws InterruptedException, ExecutionException, TimeoutException {

    PaginatedSequence<User> usersCreatedBefore = Await.result(userRepository.getUsers(1, 10));
    int initialCount = usersCreatedBefore.getCount();
    int size = 25;

    for (int i = 0; i < size; i++) {
        feed(new PUserEvent.PUserCreated(buildFixture(createdAt, i)));
    }

    PaginatedSequence<User> createdUsers = Await.result(userRepository.getUsers(1, 10));
    assertEquals(size + initialCount, createdUsers.getCount());
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking back to the code I just pasted here, it is even better to call userRepository.countUsers() instead of userRepository.getUsers(1, 10). Certainly for the first call where we are only interested in the total current count.

assertEquals(10, createdUsers.getItems().size());
// default ordering is time DESC so page 2 of size 10 over a set of 25 returns item ids 5-14. On that seq, the fifth item is id=10
assertEquals("user10", createdUsers.getItems().get(4).getName());
assertEquals("admin4", createdUsers.getItems().get(4).getName());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that I was wrong about this one. I'm getting random values here.
I've seen the following values so far:
admin4
admin10
admin22
admin18

@lakhina
Copy link
Contributor Author

lakhina commented Jul 28, 2017

@rcavalcanti I have committed the changes. Please have a look.

import com.google.inject.AbstractModule;
import com.lightbend.lagom.javadsl.server.ServiceGuiceSupport;

public class UserModule extends AbstractModule implements ServiceGuiceSupport {
public class Module extends AbstractModule implements ServiceGuiceSupport {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could finally detect the origin of the broken build. I don't have yet full understand how does it happens, but it's related with the order of wiring of Guice modules.

It seems that when using a non-standard location for the Guice module, the bit that will make the akka cluster to join itself is affected by the bootstrap of cassandra. Probably by the loading of the Cassandra Module.

What I have observed is that sometimes (1 over 3 times), the cluster joining (JoinClusterImpl.join) is only called after some Cassandra setup.

2017-07-31 11:17:32,128 [INFO] from akka.cluster.Cluster(akka://ServiceTest_170731111724385) - Cluster Node [akka.tcp://ServiceTest_170731111724385@127.0.0.1:53609] - Starting up...
2017-07-31 11:17:32,214 [INFO] from akka.cluster.Cluster(akka://ServiceTest_170731111724385) - Cluster Node [akka.tcp://ServiceTest_170731111724385@127.0.0.1:53609] - Registered cluster JMX MBean [akka:type=Cluster]
2017-07-31 11:17:32,215 [INFO] from akka.cluster.Cluster(akka://ServiceTest_170731111724385) - Cluster Node [akka.tcp://ServiceTest_170731111724385@127.0.0.1:53609] - Started up successfully
2017-07-31 11:17:32,236 [INFO] from akka.cluster.Cluster(akka://ServiceTest_170731111724385) - Cluster Node [akka.tcp://ServiceTest_170731111724385@127.0.0.1:53609] - No seed-nodes configured, manual cluster join required
...
2017-07-31 11:17:35,577 [DEBUG] from com.datastax.driver.core.ControlConnection - [Control connection] Refreshing schema for servicetest_170731111724385
2017-07-31 11:17:39,781 [INFO] from org.apache.cassandra.auth.CassandraRoleManager - Created default superuser role 'cassandra'

// JoinClusterImpl.join is called here
2017-07-31 11:17:54,137 [INFO] from akka.cluster.Cluster(akka://ServiceTest_170731111724385) - Cluster Node [akka.tcp://ServiceTest_170731111724385@127.0.0.1:53609] - Node [akka.tcp://ServiceTest_170731111724385@127.0.0.1:53609] is JOINING, roles []
2017-07-31 11:17:54,150 [INFO] from akka.cluster.Cluster(akka://ServiceTest_170731111724385) - Cluster Node [akka.tcp://ServiceTest_170731111724385@127.0.0.1:53609] - Leader is moving node [akka.tcp://ServiceTest_170731111724385@127.0.0.1:53609] to [Up]

When this happens, the UserRepositoryTest fails because it depends on async call to ClusterStartupTaskActor in order to get an instance of CassandraOffsetDao which is required by the cassandra read side.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, that is some very good detective work! How strange.

We have noticed some other issues related to this in the past, for example, lagom/lagom#568. That issue pertains to JDBC rather than Cassandra, but I think it's similar in that the underlying issue is a lack of coordination in the initialization order. All of our ClusterStartupTask implementations kick off immediately as soon as they are defined.

@rcavalcanti: could you record your findings in a lagom/lagom issue? We should really fix this upstream.

PaginatedSequence<User> createdUsers = Await.result(userRepository.getUsers(1, 10));
assertEquals(size + initialCount, createdUsers.getCount());
assertEquals(10, createdUsers.getItems().size());
assertEquals("admin11", createdUsers.getItems().get(4).getName());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one will randomly fail because the ordering is not guaranteed.

UserRepository.selectUsers must include a order by createdAt to make this call deterministic.

public void shouldPaginateUserRetrieval() throws InterruptedException, ExecutionException, TimeoutException {

PaginatedSequence<User> usersCreatedBefore = Await.result(userRepository.getUsers(1, 10));
int initialCount = usersCreatedBefore.getCount();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we make UserRepository.countUsers() public, we can calculate the initialCount in one shot without the end fetch results in-memory.

int initialCount = Await.result(userRepository.countUsers());

@TimMoore
Copy link
Contributor

TimMoore commented Aug 1, 2017

With the latest changes from @rcavalcanti, it looks like the build failure is now a legitimate test failure (should be reproducible locally) rather than a problem with the framework.

Copy link
Contributor

@TimMoore TimMoore left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Green build ✅

Thanks for all of your effort on this @lakhina!

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Development

Successfully merging this pull request may close these issues.

4 participants