Skip to content

Commit

Permalink
Synchronized context and implemented context-preserving job execution.
Browse files Browse the repository at this point in the history
  • Loading branch information
nmihajlovski committed Jul 9, 2015
1 parent bedd3e7 commit 714f515
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 30 deletions.
26 changes: 14 additions & 12 deletions rapidoid-ctx/src/main/java/org/rapidoid/ctx/Ctx.java
Expand Up @@ -36,60 +36,62 @@ public class Ctx {


Ctx() {} Ctx() {}


public UserInfo user() { public synchronized UserInfo user() {
return user; return user;
} }


public void setUser(UserInfo user) { public synchronized void setUser(UserInfo user) {
this.user = user; this.user = user;
} }


@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public <T> T exchange() { public synchronized <T> T exchange() {
return (T) exchange; return (T) exchange;
} }


public void setExchange(Object exchange) { public synchronized void setExchange(Object exchange) {
this.exchange = exchange; this.exchange = exchange;
} }


public Classes classes() { public synchronized Classes classes() {
return classes; return classes;
} }


public void setClasses(Classes classes) { public synchronized void setClasses(Classes classes) {
this.classes = classes; this.classes = classes;
} }


@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public <P> P persistor() { public synchronized <P> P persistor() {
if (this.persistor == null) { if (this.persistor == null) {
this.persistor = Ctxs.createPersistor(); this.persistor = Ctxs.createPersistor();
} }


return (P) this.persistor; return (P) this.persistor;
} }


public void setPersistor(Object persistor) { public synchronized void setPersistor(Object persistor) {
this.persistor = persistor; this.persistor = persistor;
} }


public void clear() { public synchronized void clear() {
setClasses(null); setClasses(null);
setExchange(null); setExchange(null);
setUser(null); setUser(null);
setPersistor(null); setPersistor(null);
} }


public Ctx copy() { public synchronized Ctx copy() {
Ctx ctx = new Ctx(); Ctx ctx = new Ctx();
this.assignTo(ctx);
return ctx;
}


public synchronized void assignTo(Ctx ctx) {
ctx.classes = this.classes; ctx.classes = this.classes;
ctx.exchange = this.exchange; ctx.exchange = this.exchange;
ctx.user = this.user; ctx.user = this.user;
ctx.persistor = this.persistor; ctx.persistor = this.persistor;

return ctx;
} }


} }
31 changes: 17 additions & 14 deletions rapidoid-ctx/src/main/java/org/rapidoid/ctx/Ctxs.java
Expand Up @@ -32,8 +32,12 @@ public class Ctxs {


private Ctxs() {} private Ctxs() {}


public static Ctx get() {
return CTXS.get();
}

public static Ctx ctx() { public static Ctx ctx() {
Ctx ctx = CTXS.get(); Ctx ctx = get();


if (ctx == null) { if (ctx == null) {
throw new IllegalStateException("App ctx wasn't set!"); throw new IllegalStateException("App ctx wasn't set!");
Expand All @@ -43,26 +47,25 @@ public static Ctx ctx() {
} }


public static boolean hasContext() { public static boolean hasContext() {
return CTXS.get() != null; return get() != null;
} }


private static Ctx provide() { public static void reset() {
Ctx ctx = CTXS.get(); CTXS.remove();
}


if (ctx == null) { public static void attach(Ctx ctx) {
ctx = new Ctx(); if (!hasContext()) {
CTXS.set(ctx); CTXS.set(ctx);
} else {
throw new IllegalStateException("The context was already opened!");
} }

return ctx;
}

public static void reset() {
CTXS.remove();
} }


public static void open() { public static Ctx open() {
provide().clear(); Ctx ctx = new Ctx();
attach(ctx);
return ctx;
} }


public static void close() { public static void close() {
Expand Down
Expand Up @@ -20,20 +20,38 @@
* #L% * #L%
*/ */


import org.rapidoid.annotation.Authors;
import org.rapidoid.annotation.Since;
import org.rapidoid.ctx.Ctx;
import org.rapidoid.ctx.Ctxs; import org.rapidoid.ctx.Ctxs;
import org.rapidoid.log.Log;


public class WrapperJob implements Runnable { @Authors("Nikolche Mihajlovski")
@Since("4.1.0")
public class ContextPreservingJob implements Runnable {


private final Runnable job; private final Runnable job;


public WrapperJob(Runnable job) { private final Ctx ctx;

public ContextPreservingJob(Runnable job, Ctx ctx) {
this.job = job; this.job = job;
this.ctx = ctx;
} }


@Override @Override
public void run() { public void run() {
if (ctx != null) {
Ctxs.attach(ctx);
} else {
Ctxs.open();
}

try { try {
job.run(); job.run();
} catch (Throwable e) {
Log.error("Job execution failed!", e);
throw U.rte("Job execution failed!", e);
} finally { } finally {
Ctxs.close(); Ctxs.close();
} }
Expand Down
3 changes: 2 additions & 1 deletion rapidoid-utils/src/main/java/org/rapidoid/util/Jobs.java
Expand Up @@ -27,6 +27,7 @@
import org.rapidoid.annotation.Authors; import org.rapidoid.annotation.Authors;
import org.rapidoid.annotation.Since; import org.rapidoid.annotation.Since;
import org.rapidoid.config.Conf; import org.rapidoid.config.Conf;
import org.rapidoid.ctx.Ctxs;


@Authors("Nikolche Mihajlovski") @Authors("Nikolche Mihajlovski")
@Since("4.1.0") @Since("4.1.0")
Expand Down Expand Up @@ -54,7 +55,7 @@ public static ScheduledFuture<?> execute(Runnable job) {
} }


public static Runnable wrap(Runnable job) { public static Runnable wrap(Runnable job) {
return new WrapperJob(job); return new ContextPreservingJob(job, Ctxs.get());
} }


} }
Expand Up @@ -54,7 +54,7 @@ public static synchronized ScheduledFuture<?> job(Runnable job, long delay) {
} }


public static Runnable wrap(Runnable job) { public static Runnable wrap(Runnable job) {
return new WrapperJob(job); return Jobs.wrap(job);
} }


} }
81 changes: 81 additions & 0 deletions rapidoid-utils/src/test/java/org/rapidoid/util/JobsTest.java
@@ -0,0 +1,81 @@
package org.rapidoid.util;

import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;
import org.rapidoid.annotation.Authors;
import org.rapidoid.annotation.Since;
import org.rapidoid.ctx.Ctxs;
import org.rapidoid.ctx.UserInfo;
import org.rapidoid.test.TestCommons;

/*
* #%L
* rapidoid-utils
* %%
* Copyright (C) 2014 - 2015 Nikolche Mihajlovski and contributors
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/

@Authors("Nikolche Mihajlovski")
@Since("4.1.0")
public class JobsTest extends TestCommons {

@Test(timeout = 10000)
public void testJobsExecution() {

int total = 300000;
final AtomicInteger counter = new AtomicInteger();

multiThreaded(1000, total, new Runnable() {

@Override
public void run() {
Ctxs.open();

final UserInfo user = new UserInfo();
user.username = rndStr(50);
Ctxs.ctx().setUser(user);
ensureUser(user);

ScheduledFuture<?> future = Jobs.execute(new Runnable() {
@Override
public void run() {
ensureUser(user);
counter.incrementAndGet();
}
});

try {
future.get();
} catch (Exception e) {
e.printStackTrace();
fail("The job throwed an exception!");
}

Ctxs.close();
}

});

eq(counter.get(), total);
}

private void ensureUser(UserInfo user) {
eq(Ctxs.ctx().user(), user);
}

}

0 comments on commit 714f515

Please sign in to comment.