Skip to content

Commit

Permalink
Merge pull request #103 from demsey/serializable-tspace
Browse files Browse the repository at this point in the history
Serializable TSpace
  • Loading branch information
ar committed Mar 6, 2016
2 parents 0751888 + 4605063 commit 3338f38
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 3 deletions.
2 changes: 1 addition & 1 deletion jpos/src/main/java/org/jpos/q2/iso/ChannelAdaptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class ChannelAdaptor
extends QBeanSupport
implements ChannelAdaptorMBean, Channel, Loggeable
{
Space sp;
protected Space sp;
private ISOChannel channel;
String in, out, ready, reconnect;
long delay;
Expand Down
71 changes: 69 additions & 2 deletions jpos/src/main/java/org/jpos/space/TSpace.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.jpos.space;
import org.jpos.util.Loggeable;
import java.io.PrintStream;
import java.io.Serializable;
import java.util.*;
import java.util.concurrent.TimeUnit;

Expand All @@ -37,7 +38,7 @@ public class TSpace<K,V> implements LocalSpace<K,V>, Loggeable, Runnable {
private static final long GCLONG = 60*1000;
private static final long NRD_RESOLUTION = 500L;
private static final int MAX_ENTRIES_IN_DUMP = 1000;
private Set[] expirables;
private final Set[] expirables;
private long lastLongGC = System.currentTimeMillis();

public TSpace () {
Expand All @@ -46,6 +47,8 @@ public TSpace () {
expirables = new Set[] { new HashSet<K>(), new HashSet<K>() };
SpaceFactory.getGCExecutor().scheduleAtFixedRate(this, GCDELAY, GCDELAY, TimeUnit.MILLISECONDS);
}

@Override
public void out (K key, V value) {
if (key == null || value == null)
throw new NullPointerException ("key=" + key + ", value=" + value);
Expand All @@ -58,6 +61,8 @@ public void out (K key, V value) {
if (sl != null)
notifyListeners(key, value);
}

@Override
public void out (K key, V value, long timeout) {
if (key == null || value == null)
throw new NullPointerException ("key=" + key + ", value=" + value);
Expand All @@ -77,16 +82,22 @@ public void out (K key, V value, long timeout) {
if (sl != null)
notifyListeners(key, value);
}

@Override
public synchronized V rdp (Object key) {
if (key instanceof Template)
return (V) getObject ((Template) key, false);
return (V) getHead (key, false);
}

@Override
public synchronized V inp (Object key) {
if (key instanceof Template)
return (V) getObject ((Template) key, true);
return (V) getHead (key, true);
}

@Override
public synchronized V in (Object key) {
Object obj;
while ((obj = inp (key)) == null) {
Expand All @@ -96,6 +107,8 @@ public synchronized V in (Object key) {
}
return (V) obj;
}

@Override
public synchronized V in (Object key, long timeout) {
Object obj;
long now = System.currentTimeMillis();
Expand All @@ -109,6 +122,8 @@ public synchronized V in (Object key, long timeout) {
}
return (V) obj;
}

@Override
public synchronized V rd (Object key) {
Object obj;
while ((obj = rdp (key)) == null) {
Expand All @@ -118,6 +133,8 @@ public synchronized V rd (Object key) {
}
return (V) obj;
}

@Override
public synchronized V rd (Object key, long timeout) {
Object obj;
long now = System.currentTimeMillis();
Expand All @@ -131,13 +148,17 @@ public synchronized V rd (Object key, long timeout) {
}
return (V) obj;
}

@Override
public synchronized void nrd (Object key) {
while (rdp (key) != null) {
try {
this.wait (NRD_RESOLUTION);
} catch (InterruptedException ignored) { }
}
}

@Override
public synchronized V nrd (Object key, long timeout) {
Object obj;
long now = System.currentTimeMillis();
Expand All @@ -151,20 +172,24 @@ public synchronized V nrd (Object key, long timeout) {
}
return (V) obj;
}

@Override
public void run () {
try {
gc();
} catch (Exception e) {
e.printStackTrace(); // this should never happen
}
}

public void gc () {
gc(0);
if (System.currentTimeMillis() - lastLongGC > GCLONG) {
gc(1);
lastLongGC = System.currentTimeMillis();
}
}

private void gc (int generation) {
Set<K> exps = expirables[generation];
synchronized (this) {
Expand All @@ -186,21 +211,28 @@ private void gc (int generation) {
}
}

@Override
public synchronized int size (Object key) {
int size = 0;
List l = (List) entries.get (key);
if (l != null)
size = l.size();
return size;
}

@Override
public synchronized void addListener (Object key, SpaceListener listener) {
getSL().out (key, listener);
}

@Override
public synchronized void addListener
(Object key, SpaceListener listener, long timeout)
{
getSL().out (key, listener, timeout);
}

@Override
public synchronized void removeListener
(Object key, SpaceListener listener)
{
Expand All @@ -211,9 +243,12 @@ public synchronized void addListener (Object key, SpaceListener listener) {
public boolean isEmpty() {
return entries.isEmpty();
}

@Override
public synchronized Set<K> getKeySet() {
return new HashSet<K>(entries.keySet());
}

public String getKeysAsString () {
StringBuilder sb = new StringBuilder();
Object[] keys;
Expand All @@ -227,6 +262,8 @@ public String getKeysAsString () {
}
return sb.toString();
}

@Override
public void dump(PrintStream p, String indent) {
Object[] keys;
int size = entries.size();
Expand Down Expand Up @@ -254,6 +291,7 @@ public void dump(PrintStream p, String indent) {
}
p.printf("%s gcinfo: %d,%d%n", indent, exp0, exp1);
}

public void notifyListeners (Object key, Object value) {
Object[] listeners = null;
synchronized (this) {
Expand All @@ -273,6 +311,8 @@ public void notifyListeners (Object key, Object value) {
}
}
}

@Override
public void push (K key, V value) {
if (key == null || value == null)
throw new NullPointerException ("key=" + key + ", value=" + value);
Expand All @@ -287,6 +327,7 @@ public void push (K key, V value) {
notifyListeners(key, value);
}

@Override
public void push (K key, V value, long timeout) {
if (key == null || value == null)
throw new NullPointerException ("key=" + key + ", value=" + value);
Expand All @@ -308,6 +349,7 @@ public void push (K key, V value, long timeout) {
notifyListeners(key, value);
}

@Override
public void put (K key, V value) {
if (key == null || value == null)
throw new NullPointerException ("key=" + key + ", value=" + value);
Expand All @@ -321,6 +363,8 @@ public void put (K key, V value) {
if (sl != null)
notifyListeners(key, value);
}

@Override
public void put (K key, V value, long timeout) {
if (key == null || value == null)
throw new NullPointerException ("key=" + key + ", value=" + value);
Expand All @@ -340,13 +384,17 @@ public void put (K key, V value, long timeout) {
if (sl != null)
notifyListeners(key, value);
}

@Override
public boolean existAny (K[] keys) {
for (K key : keys) {
if (rdp(key) != null)
return true;
}
return false;
}

@Override
public boolean existAny (K[] keys, long timeout) {
long now = System.currentTimeMillis();
long end = now + timeout;
Expand All @@ -361,26 +409,30 @@ public boolean existAny (K[] keys, long timeout) {
}
return false;
}

/**
* unstandard method (required for space replication) - use with care
* @return underlying entry map
*/
public Map getEntries () {
return entries;
}

/**
* unstandard method (required for space replication) - use with care
* @param entries underlying entry map
*/
public void setEntries (Map entries) {
this.entries = entries;
}

private List getList (Object key) {
List l = (List) entries.get (key);
if (l == null)
entries.put (key, l = new LinkedList());
return l;
}

private Object getHead (Object key, boolean remove) {
Object obj = null;
List l = (List) entries.get (key);
Expand Down Expand Up @@ -408,6 +460,7 @@ private Object getHead (Object key, boolean remove) {
}
return obj;
}

private Object getObject (Template tmpl, boolean remove) {
Object obj = null;
List l = (List) entries.get (tmpl.getKey());
Expand All @@ -433,21 +486,28 @@ private Object getObject (Template tmpl, boolean remove) {
}
return obj;
}

private TSpace getSL() {
synchronized (this) {
if (sl == null)
sl = new TSpace();
}
return sl;
}

private void registerExpirable(K k, long t) {
expirables[t > GCLONG ? 1 : 0].add(k);
}

private void unregisterExpirable(Object k) {
for (Set<K> s : expirables)
s.remove(k);
}
static class Expirable implements Comparable {

static class Expirable implements Comparable, Serializable {

static final long serialVersionUID = 0xA7F22BF5;

Object value;
long expires;

Expand All @@ -456,18 +516,24 @@ public Expirable (Object value, long expires) {
this.value = value;
this.expires = expires;
}

public boolean isExpired () {
return expires < System.currentTimeMillis ();
}

@Override
public String toString() {
return getClass().getName()
+ "@" + Integer.toHexString(hashCode())
+ ",value=" + value.toString()
+ ",expired=" + isExpired ();
}

public Object getValue() {
return isExpired() ? null : value;
}

@Override
public int compareTo (Object obj) {
Expirable other = (Expirable) obj;
long otherExpires = other.expires;
Expand All @@ -479,4 +545,5 @@ else if (expires < otherExpires)
return 1;
}
}

}

0 comments on commit 3338f38

Please sign in to comment.