Skip to content

Commit

Permalink
Nested job execution with fine-grained context setup.
Browse files Browse the repository at this point in the history
  • Loading branch information
nmihajlovski committed Feb 24, 2016
1 parent 8d36e57 commit 6431396
Show file tree
Hide file tree
Showing 15 changed files with 274 additions and 145 deletions.
@@ -1,8 +1,5 @@
package org.rapidoid.ctx;

import org.rapidoid.annotation.Authors;
import org.rapidoid.annotation.Since;

/*
* #%L
* rapidoid-commons
Expand All @@ -23,10 +20,33 @@
* #L%
*/

import org.rapidoid.annotation.Authors;
import org.rapidoid.annotation.Since;
import org.rapidoid.u.U;

import java.util.Set;

@Authors("Nikolche Mihajlovski")
@Since("5.0.0")
public interface JobStatusListener {
@Since("5.1.0")
public class Auth {

public static UserInfo user() {
Ctx ctx = Ctxs.get();
return ctx != null ? ctx.user() : null;
}

public static boolean isLoggedIn() {
return user() != null;
}

public static String username() {
UserInfo user = user();
return user != null ? user.username : null;
}

void onAsync();
public static Set<String> roles() {
UserInfo user = user();
return U.safe(user != null ? U.set(user.roles) : U.<String>set());
}

}
55 changes: 7 additions & 48 deletions rapidoid-commons/src/main/java/org/rapidoid/ctx/Ctx.java
Expand Up @@ -3,12 +3,9 @@
import org.rapidoid.annotation.Authors;
import org.rapidoid.annotation.Since;
import org.rapidoid.commons.Coll;
import org.rapidoid.job.Jobs;
import org.rapidoid.lambda.Lmbd;
import org.rapidoid.log.Log;

import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;

/*
Expand Down Expand Up @@ -51,7 +48,7 @@ public class Ctx implements CtxMetadata {

private volatile ThreadLocal<Object> persisters = new ThreadLocal<Object>();

private final List<Object> allPersisters = Collections.synchronizedList(new ArrayList<Object>(5));
private final List<Object> persistersToClose = Collections.synchronizedList(new LinkedList<Object>());

private final Map<Object, Object> extras = Coll.synchronizedMap();

Expand Down Expand Up @@ -89,17 +86,14 @@ public synchronized <P> P persister() {
if (persister == null) {
persister = Ctxs.createPersister();
this.persisters.set(persister);
allPersisters.add(persister);
persistersToClose.add(persister);
}

return (P) persister;
}

public synchronized void setPersister(Object persister) {
this.persisters.set(persister);
if (!allPersisters.contains(persister)) {
allPersisters.add(persister);
}
}

public synchronized Ctx span() {
Expand Down Expand Up @@ -134,11 +128,11 @@ private synchronized void clear() {
this.exchange = null;
this.persisters = null;

for (Object persister : allPersisters) {
for (Object persister : persistersToClose) {
Ctxs.closePersister(persister);
}

allPersisters.clear();
persistersToClose.clear();
extras.clear();

closed = true;
Expand All @@ -154,9 +148,9 @@ private void ensureNotClosed() {
public String toString() {
final int maxLen = 10;
return prefixed("Ctx [id=" + id + ", tag=" + tag + ", user=" + user + ", exchange=" + exchange
+ ", referenceCounter=" + referenceCounter + ", persisters=" + persisters + ", closed=" + closed
+ ", allPersisters=" + (allPersisters != null ? toString(allPersisters, maxLen) : null) + ", extras="
+ (extras != null ? toString(extras.entrySet(), maxLen) : null) + "]");
+ ", referenceCounter=" + referenceCounter + ", closed=" + closed
+ ", persistersToClose=" + toString(persistersToClose, maxLen) + ", extras="
+ toString(extras.entrySet(), maxLen) + "]");
}

private String prefixed(String asStr) {
Expand All @@ -182,46 +176,11 @@ public boolean isClosed() {
return closed;
}

public static synchronized <T> T executeInCtx(CtxData cd, Callable<T> action) {
Ctx ctx = Ctxs.open("call");

ctx.setExchange(cd.exchange());
ctx.setUser(new UserInfo(cd.username(), cd.roles()));
Coll.assign(ctx.extras(), cd.extras());

if (cd.persister() != null) {
ctx.setPersister(cd.persister());
}

try {
return Lmbd.call(action);
} finally {
Ctxs.close();
}
}

public static synchronized void executeInCtx(String tag, Runnable action) {
Ctxs.open(tag);
try {
Jobs.execute(action);
} finally {
Ctxs.close();
}
}

public Map<Object, Object> extras() {
ensureNotClosed();
return extras;
}

public boolean isLoggedIn() {
return user() != null;
}

public String username() {
return isLoggedIn() ? user().username : null;
}

public String tag() {
return tag;
}
Expand Down
18 changes: 18 additions & 0 deletions rapidoid-commons/src/main/java/org/rapidoid/ctx/Ctxs.java
Expand Up @@ -2,7 +2,9 @@

import org.rapidoid.annotation.Authors;
import org.rapidoid.annotation.Since;
import org.rapidoid.commons.Coll;
import org.rapidoid.log.Log;
import org.rapidoid.u.U;

/*
* #%L
Expand Down Expand Up @@ -63,6 +65,20 @@ public static void attach(Ctx ctx) {
}
}

public static Ctx open(WithContext context) {
Ctx ctx = Ctxs.open(context.tag());

ctx.setExchange(context.exchange());
ctx.setUser(new UserInfo(context.username(), context.roles()));
Coll.assign(ctx.extras(), U.safe(context.extras()));

if (context.persister() != null) {
ctx.setPersister(context.persister());
}

return ctx;
}

public static Ctx open(String tag) {
Ctx ctx = new Ctx(tag);
Log.debug("Opening context", "ctx", ctx);
Expand Down Expand Up @@ -97,7 +113,9 @@ public static Object createPersister() {
}

public static void closePersister(Object persister) {
U.notNull(persisterProvider, "Ctxs.persisterProvider");
persisterProvider.closePersister(persister);
}


}
22 changes: 2 additions & 20 deletions rapidoid-commons/src/main/java/org/rapidoid/ctx/UserInfo.java
Expand Up @@ -33,10 +33,6 @@
@Since("2.0.0")
public class UserInfo implements Serializable {

private static final String USERNAME = "_USER.USERNAME";
private static final String EMAIL = "_USER.EMAIL";
private static final String NAME = "_USER.NAME";

private static final long serialVersionUID = 7062732348562440194L;

private static final UserInfo ANONYMOUS = new UserInfo("anonymous", null, "Anonymous", null, null,
Expand All @@ -52,9 +48,9 @@ public class UserInfo implements Serializable {

public final String oauthProvider;

public volatile Set<String> roles;
public final Set<String> roles;

public volatile Map<String, Boolean> is;
public final Map<String, Boolean> is;

public UserInfo(String username) {
this(username, username, username);
Expand Down Expand Up @@ -92,20 +88,6 @@ private static Map<String, Boolean> rolesMap(Set<String> roles) {
return rolesMap;
}

public static UserInfo from(Map<String, ?> scope) {
String username = (String) scope.get(USERNAME);
String email = (String) scope.get(EMAIL);
String name = (String) scope.get(NAME);

return username != null ? new UserInfo(username, email, name) : ANONYMOUS;
}

public void saveTo(Map<String, Serializable> scope) {
scope.put(USERNAME, this.username);
scope.put(EMAIL, this.email);
scope.put(NAME, this.name);
}

@Override
public String toString() {
return "UserInfo [username=" + username + ", email=" + email + ", name=" + name + ", oauthId=" + oauthId
Expand Down
24 changes: 14 additions & 10 deletions rapidoid-commons/src/main/java/org/rapidoid/ctx/With.java
Expand Up @@ -30,24 +30,28 @@
@Since("2.5.0")
public class With {

public static CtxData username(String username) {
return new CtxData().username(username);
public static WithContext tag(String tag) {
return new WithContext().tag(tag);
}

public static CtxData roles(Set<String> roles) {
return new CtxData().roles(roles);
public static WithContext username(String username) {
return new WithContext().username(username);
}

public static CtxData persister(Object persister) {
return new CtxData().persister(persister);
public static WithContext roles(Set<String> roles) {
return new WithContext().roles(roles);
}

public static CtxData exchange(Object exchange) {
return new CtxData().exchange(exchange);
public static WithContext persister(Object persister) {
return new WithContext().persister(persister);
}

public static CtxData extras(Map<String, Object> extras) {
return new CtxData().extras(extras);
public static WithContext exchange(Object exchange) {
return new WithContext().exchange(exchange);
}

public static WithContext extras(Map<String, Object> extras) {
return new WithContext().extras(extras);
}

}
Expand Up @@ -22,72 +22,83 @@

import org.rapidoid.annotation.Authors;
import org.rapidoid.annotation.Since;
import org.rapidoid.job.Jobs;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;

@Authors("Nikolche Mihajlovski")
@Since("2.5.0")
public class CtxData {
public class WithContext {

private String username = null;
private volatile String tag;

private Set<String> roles = null;
private volatile String username;

private Object persister = null;
private volatile Set<String> roles;

private Object exchange = null;
private volatile Object persister;

private Map<String, Object> extras = null;
private volatile Object exchange;

public synchronized CtxData username(String username) {
private volatile Map<String, Object> extras;

public String tag() {
return tag;
}

public WithContext tag(String tag) {
this.tag = tag;
return this;
}

public WithContext username(String username) {
this.username = username;
return this;
}

public synchronized String username() {
public String username() {
return this.username;
}

public synchronized CtxData roles(Set<String> roles) {
public WithContext roles(Set<String> roles) {
this.roles = roles;
return this;
}

public synchronized Set<String> roles() {
public Set<String> roles() {
return this.roles;
}

public synchronized CtxData persister(Object persister) {
public WithContext persister(Object persister) {
this.persister = persister;
return this;
}

public synchronized Object persister() {
public Object persister() {
return this.persister;
}

public synchronized CtxData exchange(Object exchange) {
public WithContext exchange(Object exchange) {
this.exchange = exchange;
return this;
}

public synchronized Object exchange() {
public Object exchange() {
return this.exchange;
}

public synchronized CtxData extras(Map<String, Object> extras) {
public WithContext extras(Map<String, Object> extras) {
this.extras = extras;
return this;
}

public synchronized Map<String, Object> extras() {
public Map<String, Object> extras() {
return extras;
}

public synchronized <T> T call(Callable<T> action) {
return Ctx.executeInCtx(this, action);
public void run(Runnable action) {
Jobs.executeInContext(this, action);
}

}

0 comments on commit 6431396

Please sign in to comment.