Skip to content

Commit

Permalink
Merge branch 'master' of github.com:reactica/rhte-demo
Browse files Browse the repository at this point in the history
# Conflicts:
#	setup/openshift/install-is-and-templates.sh
  • Loading branch information
tqvarnst committed Sep 16, 2018
2 parents 0a0c9a5 + e30fc9d commit 35066aa
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 44 deletions.
9 changes: 8 additions & 1 deletion billboard/pom.xml
Expand Up @@ -18,7 +18,6 @@

<properties>
<vertx.verticle>com.redhat.coderland.reactica.BillboardVerticle</vertx.verticle>
<vertx.health.path>/health</vertx.health.path>
<node_modules.path>${project.build.directory}/classes/webroot/node_modules</node_modules.path>
</properties>

Expand All @@ -39,6 +38,14 @@
<groupId>io.vertx</groupId>
<artifactId>vertx-rx-java2</artifactId>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-config</artifactId>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-config-yaml</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>amqp-verticle</artifactId>
Expand Down
31 changes: 31 additions & 0 deletions billboard/src/main/fabric8/deployment.yaml
@@ -0,0 +1,31 @@
spec:
template:
spec:
volumes:
- configMap:
name: reactica-config
optional: true
items:
- key: application.yaml
path: config.yml
name: config
containers:
- env:
- name: VERTX_CONFIG_PATH
value: '/deployments/conf/config.yml'
volumeMounts:
- name: config
mountPath: /deployments/conf

readinessProbe:
httpGet:
path: /health
port: 8080
scheme: HTTP
initialDelaySeconds: 10
livenessProbe:
httpGet:
path: /health
port: 8080
scheme: HTTP
initialDelaySeconds: 180
@@ -1,5 +1,6 @@
package com.redhat.coderland.reactica;

import com.redhat.coderland.reactica.model.Ride;
import com.redhat.coderland.reactica.model.User;
import io.reactivex.Completable;
import io.vertx.core.DeploymentOptions;
Expand All @@ -10,6 +11,7 @@
import io.vertx.ext.web.client.WebClientOptions;
import io.vertx.ext.web.handler.sockjs.BridgeOptions;
import io.vertx.reactivex.CompletableHelper;
import io.vertx.reactivex.config.ConfigRetriever;
import io.vertx.reactivex.core.AbstractVerticle;
import io.vertx.reactivex.core.eventbus.MessageConsumer;
import io.vertx.reactivex.ext.web.Router;
Expand All @@ -33,11 +35,21 @@ public class BillboardVerticle extends AbstractVerticle {
private WebClient client;
private JsonArray last = new JsonArray();


private long rideDuration;
private Integer numberOfUsers;

@Override
public void start(Future<Void> done) {
client = WebClient.create(vertx, new WebClientOptions().setDefaultHost("event-generator").setDefaultPort(8080));

initQueueEventsListener()
ConfigRetriever retriever = ConfigRetriever.create(vertx);

retriever.rxGetConfig().doOnSuccess(json -> {
configure(json);
retriever.listen(c -> configure(c.getNewConfiguration()));
}).ignoreElement()
.andThen(initQueueEventsListener())
.andThen(initNewUserListener())
.andThen(initWaitingTimeListener())
.andThen(deployAMQPVerticle())
Expand All @@ -47,6 +59,15 @@ public void start(Future<Void> done) {
.subscribe(CompletableHelper.toObserver(done));
}

private void configure(JsonObject json) {
if (json == null) {
return;
}
LOGGER.info("Configuring the billboard");
rideDuration = json.getLong("duration-in-seconds", Ride.DEFAULT_RIDE_DURATION);
numberOfUsers = json.getInteger("users-per-ride", Ride.DEFAULT_USER_ON_RIDE);
}

private Completable initWaitingTimeListener() {
MessageConsumer<JsonObject> consumer = vertx.eventBus().consumer("waiting-time");
consumer.handler(message -> {
Expand Down Expand Up @@ -126,11 +147,10 @@ private Completable initQueueEventsListener() {

long eta = 0;
if (state.equalsIgnoreCase(User.STATE_IN_QUEUE)) {
// TODO Read config map.
eta =
(Instant.now().toEpochMilli() / 1000
+ (numberOfPeopleWaiting.incrementAndGet() / 10 * 60)
) // 10 - number of user per ride, 60 - ride duration
+ (numberOfPeopleWaiting.incrementAndGet() / numberOfUsers * rideDuration)
)
* 1000; // To milliseconds
}

Expand Down
Expand Up @@ -10,6 +10,10 @@ public class Ride {
public static final String STATE_IN_PROGRESS = "IN_PROGRESS";
public static final String STATE_COMPLETED = "COMPLETED";

public static final long DEFAULT_RIDE_DURATION = 60;
public static final int DEFAULT_JITTER_DURATION = 10;
public static final int DEFAULT_USER_ON_RIDE = 10;

private String uuid;

private String state;
Expand Down
Expand Up @@ -29,10 +29,6 @@
public class RideSimulator extends AbstractVerticle {
private static final Logger LOGGER = LogManager.getLogger(UserSimulatorVerticle.class);

private static final long DEFAULT_RIDE_DURATION = 60;
private static final int DEFAULT_JITTER_DURATION = 10;
private static final int DEFAULT_USER_ON_RIDE = 10;

private AsyncCache<String, String> cache;
private Random random = new Random();

Expand Down Expand Up @@ -82,9 +78,9 @@ private void configure(JsonObject json) {
return;
}
LOGGER.info("Configuring the ride simulator");
duration = json.getLong("duration-in-seconds", DEFAULT_RIDE_DURATION);
jitter = json.getInteger("jitter-in-seconds", DEFAULT_JITTER_DURATION);
numberOfUsers = json.getInteger("users-per-ride", DEFAULT_USER_ON_RIDE);
duration = json.getLong("duration-in-seconds", Ride.DEFAULT_RIDE_DURATION);
jitter = json.getInteger("jitter-in-seconds", Ride.DEFAULT_JITTER_DURATION);
numberOfUsers = json.getInteger("users-per-ride", Ride.DEFAULT_USER_ON_RIDE);

boolean isCurrentlyEnabled = enabled;
enabled = json.getBoolean("enabled", true);
Expand Down
@@ -1,13 +1,10 @@
package com.redhat.coderland.reactica.qlc;

import com.redhat.coderland.reactica.model.Ride;
import com.redhat.coderland.reactica.model.User;
import io.reactivex.Flowable;
import io.reactivex.Observable;
import io.reactivex.Scheduler;
import io.vertx.core.Future;
import io.vertx.core.json.JsonObject;
import io.vertx.reactivex.CompletableHelper;
import io.vertx.reactivex.RxHelper;
import io.vertx.reactivex.core.AbstractVerticle;
import me.escoffier.reactive.rhdg.AsyncCache;
import me.escoffier.reactive.rhdg.DataGridClient;
Expand All @@ -17,21 +14,14 @@
import org.infinispan.client.hotrod.marshall.ProtoStreamMarshaller;
import org.infinispan.query.dsl.Query;
import org.infinispan.query.dsl.QueryFactory;
import org.infinispan.query.dsl.SortOrder;

import java.time.Duration;
import java.time.temporal.TemporalAmount;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class QueueLengthCalculator extends AbstractVerticle {
private static final Logger LOGGER = LogManager.getLogger("QueueLengthCalculator");
private static final String USER_PROTOBUFF_DEFINITION_FILE = "/user.proto";
private static final String USEREVENTS_CACHENAME = "userevents";

private static final long DEFAULT_RIDE_DURATION = 60;
private static final int DEFAULT_USER_ON_RIDE = 10;

private long duration;
private int numberOfUsers;

Expand All @@ -47,16 +37,16 @@ public void start(Future<Void> done) {
.addMarshaller(new ProtoStreamMarshaller())
.addProtoFile(USER_PROTOBUFF_DEFINITION_FILE, new UserMarshaller(), true)
)
.doOnSuccess(client -> LOGGER.info("Successfully created a Data Grid Client"))
.flatMap(client -> client.<String, User>getCache(USEREVENTS_CACHENAME))
.doOnSuccess(cache -> {
LOGGER.info("Successfully got the cache {} ", USEREVENTS_CACHENAME);
configure(config());
vertx.eventBus().<JsonObject>consumer("configuration", m -> configure(m.body().getJsonObject("ride-simulator")));
.doOnSuccess(client -> LOGGER.info("Successfully created a Data Grid Client"))
.flatMap(client -> client.<String, User>getCache(USEREVENTS_CACHENAME))
.doOnSuccess(cache -> {
LOGGER.info("Successfully got the cache {} ", USEREVENTS_CACHENAME);
configure(config());
vertx.eventBus().<JsonObject>consumer("configuration", m -> configure(m.body().getJsonObject("ride-simulator")));

vertx.setPeriodic(TimeUnit.SECONDS.toMillis(10), l -> query(cache));
vertx.setPeriodic(TimeUnit.SECONDS.toMillis(10), l -> query(cache));

})
})
.ignoreElement()
.subscribe(CompletableHelper.toObserver(done));
}
Expand All @@ -76,7 +66,7 @@ private void query(AsyncCache<String, User> cache) {
long approxWaitTime = numberOfRidesToLastPerson * duration;
LOGGER.info("Calculated the approx waittime to " + approxWaitTime);
JsonObject qlcEventMessage = new JsonObject().put("calculated-wait-time", approxWaitTime);
LOGGER.info("Sending queue length event message: " + qlcEventMessage.encodePrettily() );
LOGGER.info("Sending queue length event message: " + qlcEventMessage.encodePrettily());
vertx.eventBus().send("to-qlc-queue", qlcEventMessage);
LOGGER.info("Message sent");
}
Expand All @@ -86,8 +76,8 @@ private void configure(JsonObject json) {
return;
}
LOGGER.info("Configuring the queue length calculator simulator");
duration = json.getLong("duration-in-seconds", DEFAULT_RIDE_DURATION);
numberOfUsers = json.getInteger("users-per-ride", DEFAULT_USER_ON_RIDE);
duration = json.getLong("duration-in-seconds", Ride.DEFAULT_RIDE_DURATION);
numberOfUsers = json.getInteger("users-per-ride", Ride.DEFAULT_USER_ON_RIDE);

}
}
26 changes: 16 additions & 10 deletions setup/openshift/install-is-and-templates.sh
Expand Up @@ -3,19 +3,19 @@
info "### INSTALLING IS FOR AMQ AND RDG"
if $(oc get is/jboss-datagrid72-openshift -n openshift > /dev/null 2>&1); then
oc replace -f https://raw.githubusercontent.com/jboss-container-images/jboss-datagrid-7-openshift-image/datagrid72/templates/datagrid72-image-stream.json -n openshift --as=system:admin
else
oc create -f https://raw.githubusercontent.com/jboss-container-images/jboss-datagrid-7-openshift-image/datagrid72/templates/datagrid72-image-stream.json -n openshift --as=system:admin
fi
else
oc create -f https://raw.githubusercontent.com/jboss-container-images/jboss-datagrid-7-openshift-image/datagrid72/templates/datagrid72-image-stream.json -n openshift --as=system:admin
fi

oc -n openshift import-image jboss-datagrid72-openshift:1.1 --as=system:admin > /dev/null && echo "jboss-datagrid72-openshift:1.1 image successfully imported"
oc -n openshift import-image jboss-datagrid72-openshift:1.1 --as=system:admin > /dev/null && echo "jboss-datagrid72-openshift:1.1 image successfully imported"

if $(oc get is/amq-broker-71-openshift -n openshift > /dev/null 2>&1); then
oc replace -f https://raw.githubusercontent.com/jboss-container-images/jboss-amq-7-broker-openshift-image/amq-broker-71/amq-broker-7-image-streams.yaml -n openshift --as=system:admin
else
oc create -f https://raw.githubusercontent.com/jboss-container-images/jboss-amq-7-broker-openshift-image/amq-broker-71/amq-broker-7-image-streams.yaml -n openshift --as=system:admin
fi
if $(oc get is/amq-broker-71-openshift -n openshift > /dev/null 2>&1); then
oc replace -f https://raw.githubusercontent.com/jboss-container-images/jboss-amq-7-broker-openshift-image/amq-broker-71/amq-broker-7-image-streams.yaml -n openshift --as=system:admin
else
oc create -f https://raw.githubusercontent.com/jboss-container-images/jboss-amq-7-broker-openshift-image/amq-broker-71/amq-broker-7-image-streams.yaml -n openshift --as=system:admin
fi

oc -n openshift import-image amq-broker-71-openshift:1.0 --as=system:admin > /dev/null && echo "amq-broker-71-openshift:1.0 image successfully imported"
oc -n openshift import-image jboss-amq-63:1.4 --as=system:admin > /dev/null && echo "jboss-amq-63:1.4 image successfully imported"

info "### INSTALLING TEMPLATES FOR AMQ AND RDG"

Expand All @@ -31,3 +31,9 @@ else
oc create -f https://github.com/jboss-container-images/jboss-amq-7-broker-openshift-image/raw/amq-broker-71/templates/amq-broker-71-basic.yaml -n openshift --as=system:admin
fi

# If it does not work, try:
# $ eval $(minishift docker-env)
# $ docker pull registry.access.redhat.com/redhat-openjdk-18/openjdk18-openshift:1.2
# $ docker pull registry.access.redhat.com/jboss-datagrid-7/datagrid72-openshift:1.1
# $ docker pull registry.access.redhat.com/amq-broker-7-tech-preview/amq-broker-71-openshift:1.0

0 comments on commit 35066aa

Please sign in to comment.