Skip to content

Commit

Permalink
Introduced micro-services utils (based on HTTP client).
Browse files Browse the repository at this point in the history
  • Loading branch information
nmihajlovski committed Jul 31, 2015
1 parent 3cc2ed6 commit 95e7288
Show file tree
Hide file tree
Showing 8 changed files with 197 additions and 16 deletions.
Expand Up @@ -42,8 +42,10 @@ public final void run() {
while (!Thread.interrupted()) { while (!Thread.interrupted()) {
try { try {
loop(); loop();
} catch (ThreadDeath e) {
throw e;
} catch (Throwable e) { } catch (Throwable e) {
Log.error("Exception occured while watching for changes", e); Log.error("Exception occured inside the thread loop!", e);
} }


U.sleep(sleepMs); U.sleep(sleepMs);
Expand Down
4 changes: 4 additions & 0 deletions rapidoid-http/src/main/java/org/rapidoid/http/HTTP.java
Expand Up @@ -48,6 +48,10 @@ public static byte[] post(String uri, Map<String, String> headers, Map<String, S
return post(uri, headers, data, files, null).get(); return post(uri, headers, data, files, null).get();
} }


public static byte[] post(String uri) {
return post(uri, null, null, null, null).get();
}

public static Future<byte[]> get(String uri, Callback<byte[]> callback) { public static Future<byte[]> get(String uri, Callback<byte[]> callback) {
return DEFAULT_CLIENT.get(uri, callback); return DEFAULT_CLIENT.get(uri, callback);
} }
Expand Down
22 changes: 10 additions & 12 deletions rapidoid-http/src/main/java/org/rapidoid/http/HttpClient.java
Expand Up @@ -36,6 +36,7 @@
import org.apache.http.entity.mime.MultipartEntityBuilder; import org.apache.http.entity.mime.MultipartEntityBuilder;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClients; import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.nio.entity.NByteArrayEntity;
import org.rapidoid.concurrent.Callback; import org.rapidoid.concurrent.Callback;
import org.rapidoid.concurrent.Callbacks; import org.rapidoid.concurrent.Callbacks;
import org.rapidoid.concurrent.Future; import org.rapidoid.concurrent.Future;
Expand Down Expand Up @@ -65,7 +66,7 @@ public Future<byte[]> post(String uri, Map<String, String> headers, Map<String,
data = U.safe(data); data = U.safe(data);
files = U.safe(files); files = U.safe(files);


HttpPost httppost = new HttpPost(uri); HttpPost req = new HttpPost(uri);


MultipartEntityBuilder builder = MultipartEntityBuilder.create(); MultipartEntityBuilder builder = MultipartEntityBuilder.create();


Expand All @@ -81,27 +82,24 @@ public Future<byte[]> post(String uri, Map<String, String> headers, Map<String,
builder = builder.addTextBody(entry.getKey(), entry.getValue(), contentType); builder = builder.addTextBody(entry.getKey(), entry.getValue(), contentType);
} }


httppost.setEntity(builder.build()); NByteArrayEntity entity = new NByteArrayEntity(builder.build().toString().getBytes());
req.setEntity(entity);


for (Entry<String, String> e : headers.entrySet()) { for (Entry<String, String> e : headers.entrySet()) {
httppost.addHeader(e.getKey(), e.getValue()); req.addHeader(e.getKey(), e.getValue());
} }


Log.debug("Starting HTTP POST request", "request", httppost.getRequestLine()); Log.debug("Starting HTTP POST request", "request", req.getRequestLine());


return execute(client, httppost, callback); return execute(client, req, callback);
} }


public Future<byte[]> get(String uri, Callback<byte[]> callback) { public Future<byte[]> get(String uri, Callback<byte[]> callback) {
try { HttpGet req = new HttpGet(uri);
HttpGet req = new HttpGet(uri);


Log.debug("Starting HTTP GET request", "request", req.getRequestLine()); Log.debug("Starting HTTP GET request", "request", req.getRequestLine());


return execute(client, req, callback); return execute(client, req, callback);
} catch (Throwable e) {
throw U.rte(e);
}
} }


private Future<byte[]> execute(CloseableHttpAsyncClient client, HttpRequestBase req, Callback<byte[]> callback) { private Future<byte[]> execute(CloseableHttpAsyncClient client, HttpRequestBase req, Callback<byte[]> callback) {
Expand Down
66 changes: 66 additions & 0 deletions rapidoid-http/src/main/java/org/rapidoid/http/Micro.java
@@ -0,0 +1,66 @@
package org.rapidoid.http;

/*
* #%L
* rapidoid-http
* %%
* Copyright (C) 2014 - 2015 Nikolche Mihajlovski and contributors
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/

import org.rapidoid.annotation.Authors;
import org.rapidoid.annotation.Since;
import org.rapidoid.concurrent.Callback;
import org.rapidoid.concurrent.Callbacks;
import org.rapidoid.concurrent.Future;
import org.rapidoid.concurrent.Futures;
import org.rapidoid.jackson.JSON;
import org.rapidoid.lambda.Mapper;
import org.rapidoid.util.U;

@Authors("Nikolche Mihajlovski")
@Since("4.1.0")
public class Micro {

private static final Mapper<byte[], Object> JSON_BYTES_TO_OBJ = new Mapper<byte[], Object>() {
@Override
public Object map(byte[] src) throws Exception {
return src != null ? JSON.parse(src, Object.class) : null;
}
};

public static <T> Future<T> get(final String uri, final Callback<T> callback) {
Mapper<byte[], T> mapper = U.cast(JSON_BYTES_TO_OBJ);
Callback<byte[]> cb = Callbacks.mapping(callback, mapper);
return Futures.mapping(HTTP.get(uri, cb), mapper);
}

@SuppressWarnings("unchecked")
public static <T> T get(final String uri) {
return (T) get(uri, null).get();
}

public static <T> Future<T> post(final String uri, final Callback<T> callback) {
Mapper<byte[], T> mapper = U.cast(JSON_BYTES_TO_OBJ);
Callback<byte[]> cb = Callbacks.mapping(callback, mapper);
return Futures.mapping(HTTP.post(uri, null, null, null, cb), mapper);
}

@SuppressWarnings("unchecked")
public static <T> T post(final String uri) {
return (T) post(uri, null).get();
}

}
Expand Up @@ -43,6 +43,7 @@ public void testHttpServer() {
HTTPServer server = HTTP.server().applications(WebAppGroup.main()).build().start(); HTTPServer server = HTTP.server().applications(WebAppGroup.main()).build().start();


eq(new String(HTTP.get("http://localhost:8080/")), "home"); eq(new String(HTTP.get("http://localhost:8080/")), "home");
eq(new String(HTTP.post("http://localhost:8080/")), "123");


server.shutdown(); server.shutdown();
} }
Expand Down
@@ -0,0 +1,102 @@
package org.rapidoid.test;

/*
* #%L
* rapidoid-integration-tests
* %%
* Copyright (C) 2014 - 2015 Nikolche Mihajlovski and contributors
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/

import java.util.concurrent.CountDownLatch;

import org.junit.Test;
import org.rapidoid.activity.RapidoidThread;
import org.rapidoid.annotation.Authors;
import org.rapidoid.annotation.Since;
import org.rapidoid.concurrent.Callback;
import org.rapidoid.http.HTTP;
import org.rapidoid.http.HTTPServer;
import org.rapidoid.http.Handler;
import org.rapidoid.http.HttpExchange;
import org.rapidoid.http.Micro;
import org.rapidoid.util.U;
import org.rapidoid.util.UTILS;
import org.rapidoid.webapp.WebApp;
import org.rapidoid.webapp.WebAppGroup;

@Authors("Nikolche Mihajlovski")
@Since("4.1.0")
public class MicroServicesTest extends IntegrationTestCommons {

@Test
public void testMicroserviceCommunication() {
WebApp app = WebAppGroup.openRootContext();

app.getRouter().generic(new Handler() {
@Override
public Object handle(HttpExchange x) throws Exception {
// return x.async();
return U.num(x.param("n")) + 1;
}
});
HTTPServer server = HTTP.server().applications(WebAppGroup.main()).build().start();

// a blocking call
eq(Micro.get("http://localhost:8080/?n=7"), 8);
eq(Micro.post("http://localhost:8080/?n=7"), 8);

int count = 10000;
final CountDownLatch latch = new CountDownLatch(count);
UTILS.startMeasure();

RapidoidThread loop = UTILS.loop(new Runnable() {
@Override
public void run() {
System.out.println(latch);
U.sleep(1000);
}
});

for (int i = 0; i < count; i++) {
final int expected = i + 1;

Callback<Integer> callback = new Callback<Integer>() {
@Override
public void onDone(Integer result, Throwable error) throws Exception {
if (result != null) {
eq(result.intValue(), expected);
} else {
System.out.println(error);
}
latch.countDown();
}
};

if (i % 2 == 0) {
Micro.get("http://localhost:8080/?n=" + i, callback);
} else {
Micro.post("http://localhost:8080/?n=" + i, callback);
}
}

U.wait(latch);
UTILS.endMeasure(count, "calls");

loop.interrupt();
server.shutdown();
}

}
4 changes: 4 additions & 0 deletions rapidoid-u/src/main/java/org/rapidoid/util/U.java
Expand Up @@ -540,6 +540,10 @@ public static RuntimeException rte(String message, Object... args) {
return rte(nice(message, args)); return rte(nice(message, args));
} }


public static RuntimeException cancelled() {
return rte("This operation was cancelled!");
}

public static RuntimeException notExpected() { public static RuntimeException notExpected() {
return rte("This operation is not expected to be called!"); return rte("This operation is not expected to be called!");
} }
Expand Down
10 changes: 7 additions & 3 deletions rapidoid-utils/src/main/java/org/rapidoid/util/UTILS.java
Expand Up @@ -729,13 +729,17 @@ public static <K, V> Map<K, V> cast(Map<?, ?> map) {
return (Map<K, V>) map; return (Map<K, V>) map;
} }


public static void loop(final Runnable loop) { public static RapidoidThread loop(final Runnable loop) {
new RapidoidThread() { RapidoidThread thread = new RapidoidThread() {
@Override @Override
protected void loop() { protected void loop() {
loop.run(); loop.run();
} }
}.start(); };

thread.start();

return thread;
} }


} }

0 comments on commit 95e7288

Please sign in to comment.