diff --git a/.editorconfig b/.editorconfig
new file mode 100644
index 0000000..fb78925
--- /dev/null
+++ b/.editorconfig
@@ -0,0 +1,18 @@
+# http://EditorConfig.org
+
+# top-most EditorConfig file
+root = true
+
+[*]
+end_of_line = lf
+insert_final_newline = true
+charset = utf-8
+indent_size = 4
+indent_style = space
+trim_trailing_whitespace = true
+# from checkstyle:
+max_line_length = 120
+ij_continuation_indent_size = 4
+
+[{*.yml, *.yaml}]
+indent_size = 2
diff --git a/pom.xml b/pom.xml
index e776bf4..874a2ab 100644
--- a/pom.xml
+++ b/pom.xml
@@ -20,6 +20,7 @@
3.7.4
netty
mn.flux.response.bug.Application
+ 1.5.2.Final
@@ -70,6 +71,10 @@
micronaut-reactor-http-client
compile
+
+
+
+
jakarta.annotation
jakarta.annotation-api
@@ -85,6 +90,19 @@
micronaut-test-junit5
test
+
+
+ org.projectlombok
+ lombok
+ provided
+
+
+
+ org.mapstruct
+ mapstruct
+ ${version.org.mapstruct}
+
+
org.junit.jupiter
junit-jupiter-api
@@ -95,6 +113,25 @@
junit-jupiter-engine
test
+
+
+
+ org.testcontainers
+ cassandra
+ test
+
+
+
+ io.rest-assured
+ rest-assured
+ test
+
+
+
+ org.assertj
+ assertj-core
+ test
+
@@ -111,7 +148,33 @@
-
+
+
+ org.projectlombok
+ lombok
+ ${lombok.version}
+
+
+ org.mapstruct
+ mapstruct-processor
+ ${version.org.mapstruct}
+
+
+
+ com.datastax.oss
+ java-driver-mapper-processor
+ 4.14.1
+
+
+ io.micronaut
+ micronaut-inject-java
+ ${micronaut.version}
+
+
+ io.micronaut
+ micronaut-validation
+ ${micronaut.version}
+
io.micronaut
micronaut-http-validation
diff --git a/src/main/java/mn/flux/response/bug/EnumTypeConverters.java b/src/main/java/mn/flux/response/bug/EnumTypeConverters.java
new file mode 100644
index 0000000..1807719
--- /dev/null
+++ b/src/main/java/mn/flux/response/bug/EnumTypeConverters.java
@@ -0,0 +1,17 @@
+package mn.flux.response.bug;
+
+import io.micronaut.context.annotation.Factory;
+import io.micronaut.core.convert.ConversionContext;
+import io.micronaut.core.convert.TypeConverter;
+import jakarta.inject.Singleton;
+import java.util.Optional;
+import mn.flux.response.bug.model.ResourceName;
+
+@Factory
+public class EnumTypeConverters {
+ @Singleton
+ @SuppressWarnings({"unused"})
+ public TypeConverter stringToResourceNameConverter() {
+ return (String object, Class targetType, ConversionContext context) -> Optional.of(ResourceName.fromValue(object));
+ }
+}
diff --git a/src/main/java/mn/flux/response/bug/RevisionController.java b/src/main/java/mn/flux/response/bug/RevisionController.java
new file mode 100644
index 0000000..06b4a3a
--- /dev/null
+++ b/src/main/java/mn/flux/response/bug/RevisionController.java
@@ -0,0 +1,21 @@
+package mn.flux.response.bug;
+
+import io.micronaut.http.annotation.Controller;
+import io.micronaut.http.annotation.Get;
+import io.micronaut.http.annotation.PathVariable;
+import lombok.RequiredArgsConstructor;
+import mn.flux.response.bug.model.ResourceName;
+import reactor.core.publisher.Flux;
+
+@Controller(RevisionController.REVISION_BASE_URI)
+@RequiredArgsConstructor
+public class RevisionController {
+ public static final String REVISION_BASE_URI = "/revision";
+
+ private final RevisionService revisionService;
+
+ @Get("/{resourceName}/{resourceId}")
+ public Flux getAll(@PathVariable ResourceName resourceName, @PathVariable String resourceId) {
+ return revisionService.findAllBy(resourceId, resourceName);
+ }
+}
diff --git a/src/main/java/mn/flux/response/bug/RevisionDto.java b/src/main/java/mn/flux/response/bug/RevisionDto.java
new file mode 100644
index 0000000..ac057ac
--- /dev/null
+++ b/src/main/java/mn/flux/response/bug/RevisionDto.java
@@ -0,0 +1,38 @@
+package mn.flux.response.bug;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
+import io.micronaut.core.annotation.Introspected;
+import java.time.Instant;
+import java.util.UUID;
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.NotNull;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import mn.flux.response.bug.model.PublishStatus;
+import mn.flux.response.bug.model.ResourceName;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+@Introspected
+public class RevisionDto {
+
+ @NotBlank
+ private String resourceId;
+
+ @JsonInclude(Include.NON_NULL)
+ private UUID id;
+
+ @NotNull
+ private ResourceName resourceName;
+
+ private Instant date;
+
+ private String comment;
+
+ private PublishStatus revisionStatus;
+
+ private Long revisionNumber;
+}
diff --git a/src/main/java/mn/flux/response/bug/RevisionMapper.java b/src/main/java/mn/flux/response/bug/RevisionMapper.java
new file mode 100644
index 0000000..02888ac
--- /dev/null
+++ b/src/main/java/mn/flux/response/bug/RevisionMapper.java
@@ -0,0 +1,15 @@
+package mn.flux.response.bug;
+
+import mn.flux.response.bug.model.Revision;
+import org.mapstruct.Mapper;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+@Mapper(componentModel = "jsr330")
+public abstract class RevisionMapper {
+ protected abstract RevisionDto toDto(Revision revision);
+
+ public Flux toDto(Flux revisions) {
+ return revisions.flatMap(revision -> Mono.just(toDto(revision)));
+ }
+}
diff --git a/src/main/java/mn/flux/response/bug/RevisionService.java b/src/main/java/mn/flux/response/bug/RevisionService.java
new file mode 100644
index 0000000..53f82d9
--- /dev/null
+++ b/src/main/java/mn/flux/response/bug/RevisionService.java
@@ -0,0 +1,20 @@
+package mn.flux.response.bug;
+
+import jakarta.inject.Singleton;
+import lombok.RequiredArgsConstructor;
+import mn.flux.response.bug.model.ResourceName;
+import mn.flux.response.bug.repository.RevisionDao;
+import reactor.core.publisher.Flux;
+
+@Singleton
+@RequiredArgsConstructor
+public class RevisionService {
+
+ private final RevisionMapper revisionMapper;
+ private final RevisionDao revisionDao;
+
+ public Flux findAllBy(String resourceId, ResourceName resourceName) {
+ return Flux.from(revisionDao.findAllBy(resourceId, resourceName))
+ .transform(revisionMapper::toDto);
+ }
+}
diff --git a/src/main/java/mn/flux/response/bug/model/PublishStatus.java b/src/main/java/mn/flux/response/bug/model/PublishStatus.java
new file mode 100644
index 0000000..a4b3992
--- /dev/null
+++ b/src/main/java/mn/flux/response/bug/model/PublishStatus.java
@@ -0,0 +1,28 @@
+package mn.flux.response.bug.model;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonValue;
+import lombok.RequiredArgsConstructor;
+
+@RequiredArgsConstructor
+public enum PublishStatus {
+ DRAFT("draft"),
+ PUBLISHED("published");
+
+ private final String jsonValue;
+
+ @JsonCreator
+ public static ResourceName fromValue(String value) {
+ for (ResourceName contentType : ResourceName.values()) {
+ if (contentType.getJsonValue().equalsIgnoreCase(value)) {
+ return contentType;
+ }
+ }
+ throw new IllegalArgumentException("Unknown ResourceName value");
+ }
+
+ @JsonValue
+ public String getJsonValue() {
+ return jsonValue;
+ }
+}
diff --git a/src/main/java/mn/flux/response/bug/model/ResourceName.java b/src/main/java/mn/flux/response/bug/model/ResourceName.java
new file mode 100644
index 0000000..2f539a5
--- /dev/null
+++ b/src/main/java/mn/flux/response/bug/model/ResourceName.java
@@ -0,0 +1,30 @@
+package mn.flux.response.bug.model;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonValue;
+import io.micronaut.core.annotation.Introspected;
+import lombok.RequiredArgsConstructor;
+
+@RequiredArgsConstructor
+@Introspected
+public enum ResourceName {
+ ARTICLE("articles"),
+ VIDEO("videos");
+
+ private final String jsonValue;
+
+ @JsonCreator
+ public static ResourceName fromValue(String value) {
+ for (ResourceName contentType : ResourceName.values()) {
+ if (contentType.getJsonValue().equalsIgnoreCase(value)) {
+ return contentType;
+ }
+ }
+ throw new IllegalArgumentException("Unknown ResourceName value");
+ }
+
+ @JsonValue
+ public String getJsonValue() {
+ return jsonValue;
+ }
+}
diff --git a/src/main/java/mn/flux/response/bug/model/Revision.java b/src/main/java/mn/flux/response/bug/model/Revision.java
new file mode 100644
index 0000000..ba1d848
--- /dev/null
+++ b/src/main/java/mn/flux/response/bug/model/Revision.java
@@ -0,0 +1,40 @@
+package mn.flux.response.bug.model;
+
+import com.datastax.oss.driver.api.mapper.annotations.ClusteringColumn;
+import com.datastax.oss.driver.api.mapper.annotations.Entity;
+import com.datastax.oss.driver.api.mapper.annotations.PartitionKey;
+import java.time.Instant;
+import java.util.UUID;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Entity
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+@Builder
+public class Revision {
+
+ @PartitionKey
+ private String resourceId;
+
+ @ClusteringColumn(1)
+ private ResourceName resourceName;
+
+ @ClusteringColumn(2)
+ private UUID id;
+
+ private UUID userId;
+
+ private Instant date;
+
+ private String comment;
+
+ private String revisionData;
+
+ private PublishStatus revisionStatus;
+
+ private Long revisionNumber;
+}
diff --git a/src/main/java/mn/flux/response/bug/repository/RepositoryConfig.java b/src/main/java/mn/flux/response/bug/repository/RepositoryConfig.java
new file mode 100644
index 0000000..4039a98
--- /dev/null
+++ b/src/main/java/mn/flux/response/bug/repository/RepositoryConfig.java
@@ -0,0 +1,28 @@
+package mn.flux.response.bug.repository;
+
+import com.datastax.oss.driver.api.core.CqlIdentifier;
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.type.codec.ExtraTypeCodecs;
+import com.datastax.oss.driver.api.core.type.codec.registry.MutableCodecRegistry;
+import io.micronaut.context.annotation.Factory;
+import io.micronaut.context.annotation.Requirements;
+import io.micronaut.context.annotation.Requires;
+import jakarta.inject.Singleton;
+import mn.flux.response.bug.model.PublishStatus;
+import mn.flux.response.bug.model.ResourceName;
+
+@Factory
+@Requirements(@Requires(beans = SchemaBootstrap.class))
+public class RepositoryConfig {
+
+ @Singleton
+ @SuppressWarnings({"unused"})
+ public RevisionDao revisionDao(CqlSession cqlSession) {
+ MutableCodecRegistry registry = (MutableCodecRegistry) cqlSession.getContext().getCodecRegistry();
+ registry.register(ExtraTypeCodecs.enumNamesOf(ResourceName.class));
+ registry.register(ExtraTypeCodecs.enumNamesOf(PublishStatus.class));
+
+ RevisionDaoFactory revisionDaoFactory = new RevisionDaoFactoryBuilder(cqlSession).build();
+ return revisionDaoFactory.createRevisionDao(CqlIdentifier.fromCql(SchemaBootstrap.KEYSPACE_NAME));
+ }
+}
diff --git a/src/main/java/mn/flux/response/bug/repository/RevisionDao.java b/src/main/java/mn/flux/response/bug/repository/RevisionDao.java
new file mode 100644
index 0000000..1a0d401
--- /dev/null
+++ b/src/main/java/mn/flux/response/bug/repository/RevisionDao.java
@@ -0,0 +1,19 @@
+package mn.flux.response.bug.repository;
+
+import com.datastax.dse.driver.api.core.cql.reactive.ReactiveResultSet;
+import com.datastax.dse.driver.api.mapper.reactive.MappedReactiveResultSet;
+import com.datastax.oss.driver.api.mapper.annotations.Dao;
+import com.datastax.oss.driver.api.mapper.annotations.Insert;
+import com.datastax.oss.driver.api.mapper.annotations.Select;
+import mn.flux.response.bug.model.ResourceName;
+import mn.flux.response.bug.model.Revision;
+
+@Dao
+public interface RevisionDao {
+
+ @Select(orderBy = {"resource_name ASC", "id DESC"}, limit = ":maxResults")
+ MappedReactiveResultSet findAllBy(String resourceId, ResourceName resourceName);
+
+ @Insert
+ ReactiveResultSet save(Revision revision);
+}
diff --git a/src/main/java/mn/flux/response/bug/repository/RevisionDaoFactory.java b/src/main/java/mn/flux/response/bug/repository/RevisionDaoFactory.java
new file mode 100644
index 0000000..528d142
--- /dev/null
+++ b/src/main/java/mn/flux/response/bug/repository/RevisionDaoFactory.java
@@ -0,0 +1,14 @@
+package mn.flux.response.bug.repository;
+
+
+import com.datastax.oss.driver.api.core.CqlIdentifier;
+import com.datastax.oss.driver.api.mapper.annotations.DaoFactory;
+import com.datastax.oss.driver.api.mapper.annotations.DaoKeyspace;
+import com.datastax.oss.driver.api.mapper.annotations.Mapper;
+
+@Mapper
+public interface RevisionDaoFactory {
+
+ @DaoFactory
+ RevisionDao createRevisionDao(@DaoKeyspace CqlIdentifier keyspace);
+}
diff --git a/src/main/java/mn/flux/response/bug/repository/SchemaBootstrap.java b/src/main/java/mn/flux/response/bug/repository/SchemaBootstrap.java
new file mode 100644
index 0000000..265d2c1
--- /dev/null
+++ b/src/main/java/mn/flux/response/bug/repository/SchemaBootstrap.java
@@ -0,0 +1,50 @@
+package mn.flux.response.bug.repository;
+
+import static com.datastax.oss.driver.api.querybuilder.SchemaBuilder.createKeyspace;
+import static com.datastax.oss.driver.api.querybuilder.SchemaBuilder.createTable;
+import static java.time.Duration.ofSeconds;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.metadata.schema.ClusteringOrder;
+import com.datastax.oss.driver.api.core.type.DataTypes;
+import io.micronaut.context.annotation.Context;
+import jakarta.inject.Inject;
+import jakarta.inject.Singleton;
+
+@Singleton
+@Context
+public class SchemaBootstrap {
+
+ public static final String KEYSPACE_NAME = "revisions";
+ public static final String TABLE_REVISION_NAME = "revision";
+
+ /**
+ * Constructor initialises cassandra keyspace and table.
+ */
+ @Inject
+ public SchemaBootstrap(CqlSession cqlSession) {
+ cqlSession.execute(
+ createKeyspace(KEYSPACE_NAME)
+ .ifNotExists()
+ .withSimpleStrategy(1)
+ .build()
+ .setTimeout(ofSeconds(10)));
+
+ cqlSession.execute(
+ createTable(KEYSPACE_NAME, TABLE_REVISION_NAME)
+ .ifNotExists()
+ .withPartitionKey("resource_id", DataTypes.TEXT)
+ .withClusteringColumn("resource_name", DataTypes.TEXT)
+ .withClusteringColumn("id", DataTypes.TIMEUUID)
+ .withColumn("date", DataTypes.TIMESTAMP)
+ .withColumn("revision_data", DataTypes.TEXT)
+ .withColumn("comment", DataTypes.TEXT)
+ .withColumn("user_id", DataTypes.UUID)
+ .withColumn("revision_status", DataTypes.TEXT)
+ .withColumn("revision_number", DataTypes.BIGINT)
+ .withClusteringOrder("resource_name", ClusteringOrder.ASC)
+ .withClusteringOrder("id", ClusteringOrder.DESC)
+ .build()
+ .setTimeout(ofSeconds(10)));
+ }
+}
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
index 16f0a8e..aae8e54 100644
--- a/src/main/resources/application.yml
+++ b/src/main/resources/application.yml
@@ -1,13 +1,28 @@
micronaut:
application:
name: mnFluxResponseBug
+
+#tracing:
+# zipkin:
+# enabled: true
+
cassandra:
default:
- clusterName: '"myCluster"'
- contactPoint: '"localhost"'
- port: 9042
- maxSchemaAgreementWaitSeconds: 20
- ssl: true
+ basic:
+ contact-points:
+ - "${cdb.host}:${cdb.port}"
+ load-balancing-policy:
+ local-datacenter: "${cdb.datacenter}"
+ class: DefaultLoadBalancingPolicy
+ request:
+ timeout: 5s
+ consistency: "LOCAL_QUORUM"
+
+cdb:
+ host: "localhost"
+ port: 9043
+ datacenter: "datacenter1"
+
netty:
default:
allocator:
diff --git a/src/test/java/mn/flux/response/bug/RevisionControllerTest.java b/src/test/java/mn/flux/response/bug/RevisionControllerTest.java
new file mode 100644
index 0000000..16fb361
--- /dev/null
+++ b/src/test/java/mn/flux/response/bug/RevisionControllerTest.java
@@ -0,0 +1,139 @@
+package mn.flux.response.bug;
+
+import static io.restassured.RestAssured.given;
+import static java.util.Objects.isNull;
+import static mn.flux.response.bug.RevisionController.REVISION_BASE_URI;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import com.datastax.driver.core.utils.UUIDs;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.dockerjava.api.model.Ulimit;
+import io.micronaut.core.annotation.NonNull;
+import io.micronaut.http.HttpStatus;
+import io.micronaut.runtime.server.EmbeddedServer;
+import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
+import io.micronaut.test.support.TestPropertyProvider;
+import io.restassured.RestAssured;
+import io.restassured.builder.RequestSpecBuilder;
+import io.restassured.config.HttpClientConfig;
+import io.restassured.config.LogConfig;
+import io.restassured.http.ContentType;
+import io.restassured.response.Response;
+import jakarta.inject.Inject;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.IntStream;
+import mn.flux.response.bug.model.PublishStatus;
+import mn.flux.response.bug.model.ResourceName;
+import mn.flux.response.bug.model.Revision;
+import mn.flux.response.bug.repository.RevisionDao;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.testcontainers.containers.CassandraContainer;
+import org.testcontainers.utility.DockerImageName;
+import reactor.core.publisher.Mono;
+
+@MicronautTest
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+class RevisionControllerTest implements TestPropertyProvider {
+
+ @Inject
+ EmbeddedServer embeddedServer;
+
+ @Inject
+ RevisionDao revisionDao;
+
+ @Inject
+ RevisionMapper mapper;
+
+ @Inject
+ ObjectMapper objectMapper;
+
+ @Test
+ void getRevisions_bug_JsonEOFException() {
+ // given
+ Revision revision = Revision.builder()
+ .revisionNumber(ThreadLocalRandom.current().nextLong())
+ .id(UUIDs.timeBased())
+ .resourceName(ResourceName.ARTICLE)
+ .resourceId(UUID.randomUUID().toString())
+ .revisionStatus(PublishStatus.DRAFT)
+ .revisionNumber(1L)
+ .comment("no comment")
+ .build();
+ Mono.from(revisionDao.save(revision)).block();
+
+ String uri = String.format("/%s/%s", revision.getResourceName().getJsonValue(), revision.getResourceId());
+
+ // usually fails somewhere around 1500-3000 loops.
+ IntStream.range(0, 30000)
+ .forEach(i -> {
+ try {
+ System.out.println(i);
+
+ // when
+ Response response = given()
+ .when()
+ .get(uri)
+ .then()
+ .statusCode(HttpStatus.OK.getCode())
+ .and()
+ .extract().response();
+
+ // then
+ objectMapper.readValue(response.getBody().asString(), new TypeReference>() {
+ });
+ } catch (JsonProcessingException e) {
+ fail(e.getMessage(), e);
+ }
+ });
+ }
+
+ @BeforeEach
+ void setup() {
+ RestAssured.config = RestAssured.config()
+ .httpClient(HttpClientConfig.httpClientConfig()
+ .setParam("http.socket.timeout", 30000)
+ .setParam("http.connection.timeout", 30000))
+ .logConfig(LogConfig.logConfig().enableLoggingOfRequestAndResponseIfValidationFails());
+
+ RestAssured.requestSpecification = new RequestSpecBuilder()
+ .setBasePath(REVISION_BASE_URI)
+ .setPort(embeddedServer.getPort())
+ .setContentType(ContentType.JSON)
+ .build();
+ }
+
+ private static final String CASSANDRA_DOCKER_IMAGE = "cassandra:3.11.2";
+ private static final String JVM_EXTRA_OPTS = "-Xms512M -Xmx512M -Xmn128M -Dcassandra.skip_wait_for_gossip_to_settle=0 -Dcassandra.load_ring_state=false";
+
+ protected static CassandraContainer> CASSANDRA_CONTAINER;
+
+ @Override
+ @NonNull
+ public Map getProperties() {
+ if (isNull(CASSANDRA_CONTAINER)) {
+ CASSANDRA_CONTAINER = new CassandraContainer<>(DockerImageName.parse(CASSANDRA_DOCKER_IMAGE));
+
+ CASSANDRA_CONTAINER.withCreateContainerCmdModifier(cmd ->
+ Objects.requireNonNull(cmd.getHostConfig())
+ .withUlimits(List.of(new Ulimit("nofile", 65535L, 65535L))));
+
+ CASSANDRA_CONTAINER.withEnv("JVM_EXTRA_OPTS", JVM_EXTRA_OPTS);
+ CASSANDRA_CONTAINER.withReuse(true);
+ CASSANDRA_CONTAINER.start();
+ }
+
+ return Map.of(
+ "cdb.host", CASSANDRA_CONTAINER.getContainerIpAddress(),
+ "cdb.port", CASSANDRA_CONTAINER.getMappedPort(CassandraContainer.CQL_PORT).toString(),
+ "cdb.datacenter", "datacenter1"
+ );
+ }
+}
diff --git a/src/test/resources/application-test.yml b/src/test/resources/application-test.yml
new file mode 100644
index 0000000..2479e03
--- /dev/null
+++ b/src/test/resources/application-test.yml
@@ -0,0 +1,3 @@
+micronaut:
+ server:
+ port: ${random.port}