Skip to content

Commit

Permalink
feat(queue): introduce Kafka Queue
Browse files Browse the repository at this point in the history
  • Loading branch information
tchiotludo committed Oct 3, 2019
1 parent 83c11ee commit b4d0265
Show file tree
Hide file tree
Showing 61 changed files with 1,377 additions and 215 deletions.
42 changes: 28 additions & 14 deletions build.gradle
@@ -1,9 +1,10 @@
buildscript {
ext {
micronautVersion = "1.2.0"
micronautVersion = "1.2.3"
confluentVersion = "5.2.1"
kafkaVersion = "2.3.0"
avroVersion = "1.9.0"
lombokVersion = "1.18.10"
}
}

Expand All @@ -15,6 +16,9 @@ plugins {
id "net.ltgt.apt-idea" version "0.21"
id "com.github.johnrengelman.shadow" version "4.0.2"
id "application"

// test
id 'com.adarshr.test-logger' version '1.7.0'
}

idea {
Expand All @@ -32,6 +36,7 @@ sourceCompatibility = 11

dependencies {
compile project(":cli")
testCompile project(":repository-memory")
}


Expand All @@ -53,7 +58,10 @@ allprojects {
apply plugin:"java"
apply plugin:"net.ltgt.apt-eclipse"
apply plugin:"net.ltgt.apt-idea"
apply plugin: "io.spring.dependency-management"
apply plugin:"io.spring.dependency-management"

// test
apply plugin:"com.adarshr.test-logger"

dependencyManagement {
imports {
Expand All @@ -71,8 +79,8 @@ allprojects {
// utils
runtime "ch.qos.logback:logback-classic:1.2.3"
compile group: 'com.google.guava', name: 'guava', version: '27.1-jre'
compileOnly 'org.projectlombok:lombok:1.18.8'
annotationProcessor "org.projectlombok:lombok:1.18.8"
compileOnly 'org.projectlombok:lombok:' + lombokVersion
annotationProcessor "org.projectlombok:lombok:" + lombokVersion

// micronaut
annotationProcessor "io.micronaut:micronaut-inject-java"
Expand All @@ -96,6 +104,22 @@ allprojects {
// floworc
compile group: 'com.devskiller.friendly-id', name: 'friendly-id', version: '1.1.0'
}

// test
test {
useJUnitPlatform()

testLogging {
exceptionFormat = "full"
}
}

testlogger {
theme 'mocha-parallel'
showExceptions true
slowThreshold 2000
showStandardStreams true
}
}

/**********************************************************************************************************************\
Expand All @@ -115,16 +139,6 @@ run.jvmArgs(
"-Dcom.sun.management.jmxremote",
'-Dmicronaut.environments=dev,override'
)
/**********************************************************************************************************************\
* Test
**********************************************************************************************************************/
test {
useJUnitPlatform()

testLogging {
exceptionFormat = "full"
}
}

/**********************************************************************************************************************\
* Jar
Expand Down
2 changes: 1 addition & 1 deletion cli/build.gradle
Expand Up @@ -9,7 +9,7 @@ dependencies {
// modules
compile project(":core")

compile project(":repository-local")
compile project(":repository-memory")

compile project(":runner-memory")
compile project(":runner-kafka")
Expand Down
@@ -1,8 +1,8 @@
package org.floworc.core;
package org.floworc.cli;

import io.micronaut.configuration.picocli.PicocliRunner;
import org.floworc.core.commands.TestCommand;
import org.floworc.core.commands.WorkerCommand;
import org.floworc.cli.commands.TestCommand;
import org.floworc.cli.commands.WorkerCommand;
import picocli.CommandLine;

import java.util.concurrent.Callable;
Expand Down
55 changes: 55 additions & 0 deletions cli/src/main/java/org/floworc/cli/commands/TestCommand.java
@@ -0,0 +1,55 @@
package org.floworc.cli.commands;

import lombok.extern.slf4j.Slf4j;
import org.floworc.core.models.flows.Flow;
import org.floworc.core.repositories.FlowRepositoryInterface;
import org.floworc.core.repositories.LocalFlowRepositoryLoader;
import org.floworc.core.runners.RunnerUtils;
import org.floworc.runner.memory.MemoryRunner;
import picocli.CommandLine;

import javax.inject.Inject;
import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.TimeoutException;

@CommandLine.Command(
name = "test",
description = "test a flow"
)
@Slf4j
public class TestCommand implements Runnable {
@CommandLine.Parameters(description = "the flow file to test")
private Path file;

@Inject
private MemoryRunner runner;

@Inject
private LocalFlowRepositoryLoader repositoryLoader;

@Inject
private FlowRepositoryInterface flowRepository;

@Inject
private RunnerUtils runnerUtils;

public void run() {
try {
runner.run();
repositoryLoader.load(file.toFile());

List<Flow> all = flowRepository.findAll();
if (all.size() != 1) {
throw new IllegalArgumentException("Too many flow found, need 1, found " + all.size());
}

runnerUtils.runOne(all.get(0).getId());
runner.close();
} catch (IOException | TimeoutException e) {
throw new IllegalStateException(e);
}

}
}
@@ -1,4 +1,4 @@
package org.floworc.core.commands;
package org.floworc.cli.commands;

import io.micronaut.context.ApplicationContext;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -27,5 +27,7 @@ public void run() {
for (int i = 0; i < thread; i++) {
poolExecutor.execute(applicationContext.getBean(Worker.class));
}

log.info("Workers started with {} thread(s)", this.thread);
}
}
35 changes: 0 additions & 35 deletions cli/src/main/java/org/floworc/core/commands/TestCommand.java

This file was deleted.

38 changes: 38 additions & 0 deletions cli/src/main/resources/application.yml
@@ -1,3 +1,41 @@
micronaut:
application:
name: floworc

floworc:
kafka:
defaults:
topic:
partitions: 64
replication-factor: 1
properties:
compression.type: "lz4"

consumer:
properties:
isolation.level: "read_committed"
auto.offset.reset: "earliest"
enable.auto.commit: "false"

producer:
properties:
acks: "all"

stream:
properties:
processing.guarantee: "exactly_once"
replication.factor: "${floworc.kafka.defaults.topic.replication-factor}"
acks: "all"

topics:
org.floworc.core.models.executions.Execution:
name: "floworc_execution"
properties:
cleanup.policy: "compact"
retention.ms: "-1"

org.floworc.core.runners.WorkerTask:
name: "floworc_workertask"

org.floworc.core.runners.WorkerTaskResult:
name: "floworc_workertaskresult"
12 changes: 9 additions & 3 deletions cli/src/main/resources/logback.xml
Expand Up @@ -32,16 +32,22 @@
</encoder>
</appender>

<root level="INFO">
<root level="WARN">
<appender-ref ref="STDOUT" />
<appender-ref ref="STDERR" />
</root>

<logger name="org.apache" level="WARN" />
<logger name="io.confluent" level="WARN" />
<logger name="org.floworc" level="INFO" />
<logger name="flow" level="INFO" />

<logger name="org.floworc.runner.kafka.services" level="WARN" />

<!-- The configuration '%s' was supplied but isn't a known config. > https://github.com/apache/kafka/pull/5876 -->
<logger name="org.apache.kafka.clients.producer.ProducerConfig" level="ERROR" />
<logger name="org.apache.kafka.clients.admin.AdminClientConfig" level="ERROR" />
<logger name="org.apache.kafka.clients.consumer.ConsumerConfig" level="ERROR" />

<!--- Error registering AppInfo mbean -->
<logger name="org.apache.kafka.common.utils.AppInfoParser" level="ERROR" />

</configuration>
5 changes: 0 additions & 5 deletions core/build.gradle
Expand Up @@ -11,9 +11,4 @@ dependencies {

// validations
compile group: 'org.hibernate.validator', name: 'hibernate-validator', version: '6.0.17.Final'

// redis
implementation 'com.github.lettuce-io:lettuce-core:master'


}
@@ -1,6 +1,7 @@
package org.floworc.core.models.executions;

import lombok.*;
import lombok.Builder;
import lombok.Value;
import lombok.experimental.Wither;
import org.floworc.core.models.flows.State;
import org.floworc.core.models.tasks.Task;
Expand Down
Expand Up @@ -5,6 +5,10 @@
import org.floworc.core.runners.WorkerTaskResult;

public interface QueueFactoryInterface {
String EXECUTION_NAMED = "executionQueue";
String WORKERTASK_NAMED = "workerTaskQueue";
String WORKERTASKRESULT_NAMED = "workerTaskResultQueue";

QueueInterface<Execution> execution();

QueueInterface<WorkerTask> workerTask();
Expand Down
Expand Up @@ -6,7 +6,5 @@
public interface QueueInterface<T> extends Closeable {
void emit(T message);

void receive(Class consumerGroup, Consumer<T> consumer);

void ack(T message);
Runnable receive(Class consumerGroup, Consumer<T> consumer);
}
Expand Up @@ -6,7 +6,15 @@
import java.util.Optional;

public interface FlowRepositoryInterface {
Optional<Flow> getFlowById(String id);
Optional<Flow> findById(String id);

List<Flow> getFlows();
List<Flow> findAll();

void save(Flow flow);

void insert(Flow flow);

void update(Flow flow);

void delete(Flow flow);
}
@@ -0,0 +1,39 @@
package org.floworc.core.repositories;

import org.floworc.core.models.flows.Flow;
import org.floworc.core.serializers.YamlFlowParser;

import javax.inject.Inject;
import javax.inject.Singleton;
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.stream.Collectors;

@Singleton
public class LocalFlowRepositoryLoader {
private static final YamlFlowParser yamlFlowParser = new YamlFlowParser();

@Inject
private FlowRepositoryInterface flowRepository;

public void load(URL basePath) throws IOException, URISyntaxException {
this.load(new File(basePath.toURI()));
}

public void load(File basePath) throws IOException {
List<Path> list = Files.walk(basePath.toPath())
.filter(path -> com.google.common.io.Files.getFileExtension(path.toString()).equals("yaml"))
.collect(Collectors.toList());

for (Path file: list) {
Flow parse = yamlFlowParser.parse(file.toFile());

flowRepository.save(parse);
}
}
}
@@ -1,7 +1,4 @@
package org.floworc.core.runners;

import io.micronaut.context.annotation.Prototype;

@Prototype
public interface ExecutionStateInterface extends Runnable {
}

0 comments on commit b4d0265

Please sign in to comment.