Skip to content

Commit

Permalink
Connection management to S3 is a thing that should happen
Browse files Browse the repository at this point in the history
Here, we have a very simple and generic async connection pool. It respects maximum inflight requests, max requests per connection, a time limit of usage. It's fairly simple
  • Loading branch information
mathgladiator committed Feb 3, 2023
1 parent a98e8b9 commit d32330c
Show file tree
Hide file tree
Showing 8 changed files with 581 additions and 0 deletions.
175 changes: 175 additions & 0 deletions common/src/main/java/org/adamalang/common/pool/AsyncPool.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package org.adamalang.common.pool;

import org.adamalang.common.*;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

/** An async pool designed for asynchronous connection management */
public class AsyncPool<R, S> {
private final SimpleExecutor executor;
private final TimeSource time;
private final int maxLifetimeMilliseconds;
private final int maxUsageCount;
private final HashMap<R, Pool<RefS>> pools;
private final PoolActions<R, S> actions;
private final int maxPoolSize;
private final int errorCodePoolTooLarge;

/** an item within the pool */
private class RefS implements PoolItem<S> {
private final Pool<RefS> pool; // the pool the item came from
private final S item; // the item being tracked
private final long created; // when we created the item
private int count; // the number of times we have used it

public RefS(Pool<RefS> pool, S item) {
this.pool = pool;
this.item = item;
this.created = time.nowMilliseconds();
this.count = 0;
}

@Override
public S item() {
return this.item;
}

@Override
public void returnToPool() {
executor.execute(new NamedRunnable("return-async-pool") {
@Override
public void execute() throws Exception {
count++;
if (count >= maxUsageCount) {
pool.bumpDown();
actions.destroy(item);
} else {
pool.add(RefS.this);
}
}
});
}

@Override
public void signalFailure() {
executor.execute(new NamedRunnable("failure-async-pool") {
@Override
public void execute() throws Exception {
pool.bumpDown();
actions.destroy(item);
}
});
}
}

public AsyncPool(SimpleExecutor executor, TimeSource time, int maxLifetimeMilliseconds, int maxUsageCount, int maxPoolSize, int errorCodePoolTooLarge, PoolActions<R, S> actions) {
this.executor = executor;
this.time = time;
this.maxLifetimeMilliseconds = maxLifetimeMilliseconds;
this.maxUsageCount = maxUsageCount;
this.pools = new HashMap<>();
this.actions = actions;
this.maxPoolSize = maxPoolSize;
this.errorCodePoolTooLarge = errorCodePoolTooLarge;
}

private Pool<RefS> poolOfWhileInExecutor(R request) {
Pool<RefS> pool = pools.get(request);
if (pool != null) {
return pool;
}
pool = new Pool<>();
pools.put(request, pool);
return pool;
}

protected int sweep() {
int cleaned = 0;
long now = time.nowMilliseconds();
Iterator<Map.Entry<R, Pool<RefS>>> itMap = pools.entrySet().iterator();
while (itMap.hasNext()) {
Map.Entry<R, Pool<RefS>> entry = itMap.next();
Pool<RefS> pool = entry.getValue();
Iterator<RefS> itValue = pool.iterator();
while (itValue.hasNext()) {
RefS candidate = itValue.next();
long age = now - candidate.created;
if (age >= maxLifetimeMilliseconds) {
pool.bumpDown();
itValue.remove();
actions.destroy(candidate.item);
cleaned++;
}
}
if (pool.size() == 0) {
itMap.remove();
}
}
return cleaned;
}

public void scheduleSweeping(AtomicBoolean alive) {
executor.schedule(new NamedRunnable("sweep") {
@Override
public void execute() throws Exception {
sweep();
if (alive.get()) {
executor.schedule(this, maxLifetimeMilliseconds);
}
}
}, maxLifetimeMilliseconds);
}

public void get(R request, Callback<PoolItem<S>> callback) {
executor.execute(new NamedRunnable("async-pool") {
@Override
public void execute() throws Exception {
// get the pool for the reuqest
Pool<RefS> pool = poolOfWhileInExecutor(request);
// start removing items from the pool
RefS item;
while ((item = pool.next()) != null) {
long age = time.nowMilliseconds() - item.created;
// if the item is young enough, then we found it
if (age < maxLifetimeMilliseconds) {
callback.success(item);
return;
} else {
// otherwise, the item is tool old so we terminate it and move on to the next item
pool.bumpDown();
actions.destroy(item.item);
}
}
// the pool is too big, so reject the request outright
if (pool.size() >= maxPoolSize) {
callback.failure(new ErrorCodeException(errorCodePoolTooLarge));
return;
}
// while in the executor, inform the pool that we are creating a new item, so account for it.
pool.bumpUp();
actions.create(request, new Callback<S>() {
@Override
public void success(S value) {
// it was a happy thing, so tell the client (it is accounted for within the counter)
callback.success(new RefS(pool, value));
}

@Override
public void failure(ErrorCodeException ex) {
executor.execute(new NamedRunnable("init-pool-failure") {
@Override
public void execute() throws Exception {
// we must jump back into the executor to account for the loss of the item's potential
pool.bumpDown();
callback.failure(ex);
}
});
}
});
}
});
}
}
47 changes: 47 additions & 0 deletions common/src/main/java/org/adamalang/common/pool/Pool.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package org.adamalang.common.pool;

import java.util.Iterator;
import java.util.LinkedList;

/** a pool is a simple list wrapper where we count the inflight items; consumers of the pool are expected to return the item back to the pool */
public class Pool<S> {
private final LinkedList<S> queue;
private int size;

public Pool() {
this.queue = new LinkedList<>();
this.size = 0;
}

/** increase the size of the pool */
public void bumpUp() {
size++;
}

/** decrease the size of the pool */
public void bumpDown() {
size--;
}

/** @return the size of the pool */
public int size() {
return size;
}

/** add an item back into the pool as available */
public void add(S item) {
queue.add(item);
}

public Iterator<S> iterator() {
return queue.iterator();
}

/** remove the next item from the queue if available */
public S next() {
if (queue.size() > 0) {
return queue.removeFirst();
}
return null;
}
}
12 changes: 12 additions & 0 deletions common/src/main/java/org/adamalang/common/pool/PoolActions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package org.adamalang.common.pool;

import org.adamalang.common.Callback;

/** The pool is asynchronous and has to make external calls to create and kill items within the pool */
public interface PoolActions<R, S> {
/** request an item to be created to be placed within the pool */
public void create(R request, Callback<S> created);

/** destroy an item as it is leaving the pool */
public void destroy(S item);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.adamalang.common.pool;

import org.adamalang.common.Callback;
import org.adamalang.common.ErrorCodeException;

/** adapt an existing callback to register success/failure signals onto a PoolItem */
public class PoolCallbackWrapper<X, S> implements Callback<X> {
private final Callback<X> wrapped;
private final PoolItem<S> item;

public PoolCallbackWrapper(Callback<X> wrapped, PoolItem<S> item) {
this.wrapped = wrapped;
this.item = item;
}

@Override
public void success(X value) {
try {
wrapped.success(value);
} finally {
item.returnToPool();
}
}

@Override
public void failure(ErrorCodeException ex) {
try {
wrapped.failure(ex);
} finally {
item.signalFailure();
}
}
}
13 changes: 13 additions & 0 deletions common/src/main/java/org/adamalang/common/pool/PoolItem.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package org.adamalang.common.pool;

/** a wrapper around an item which is used to report status on the item */
public interface PoolItem<S> {
/** @return the item */
public S item();

/** signal the item has a failure and should not be re-used again */
public void signalFailure();

/** return the item to the pool */
public void returnToPool();
}
Loading

0 comments on commit d32330c

Please sign in to comment.