Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 41 additions & 99 deletions src/main/java/co/zeroae/gate/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,16 @@

import com.amazonaws.util.Base64;
import com.amazonaws.xray.AWSXRay;
import com.amazonaws.xray.entities.Subsegment;

import gate.*;
import gate.creole.ResourceInstantiationException;
import gate.util.GateException;
import gate.util.persistence.PersistenceManager;

import com.jakewharton.disklrucache.DiskLruCache;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import javax.xml.stream.XMLStreamException;
import java.io.*;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLStreamHandler;
import java.util.*;
Expand Down Expand Up @@ -56,8 +52,8 @@ public class App implements RequestHandler<APIGatewayProxyRequestEvent, APIGatew
"Gate Exporters", Utils::loadExporters
);

private static final DiskLruCache cache = AWSXRay.createSegment(
"Cache Init", App::initializeCache);
private static final DocumentLRUCache cache = AWSXRay.createSegment("Cache Init",
() -> new DocumentLRUCache(App.CACHE_DIR, App.CACHE_DIR_USAGE));

private static final URLStreamHandler b64Handler = new Handler();

Expand All @@ -75,60 +71,28 @@ public APIGatewayProxyResponseEvent handleRequest(APIGatewayProxyRequestEvent in
}
return null;
}).get();
final DocumentExporter exporter = exporters.get(responseType);
if (exporter == null) {
throw new IOException("Unsupported response content type.");
}
if (responseType != null)
response.getHeaders().put("Content-Type", responseType.split(";")[0].trim());

final DocumentExporter exporter = exporters.get(responseType);
if (exporter == null)
throw new IOException("Unsupported response content type.");


final FeatureMap featureMap = Factory.newFeatureMap();
final String bodyType = input.getHeaders().getOrDefault("Content-Type", "text/plain");
final String inputDigest = AWSXRay.createSubsegment("Message Digest",() -> {
String rv = Utils.computeMessageDigest(bodyType, input.getBody());
final String contentType = input.getHeaders().getOrDefault("Content-Type", "text/plain");
final String contentDigest = AWSXRay.createSubsegment("Message Digest",() -> {
String rv = Utils.computeMessageDigest(contentType, input.getBody());
AWSXRay.getCurrentSubsegment().putMetadata("SHA256", rv);
return rv;
});

featureMap.put(Document.DOCUMENT_MIME_TYPE_PARAMETER_NAME, bodyType);
if (!input.getIsBase64Encoded())
featureMap.put(Document.DOCUMENT_STRING_CONTENT_PARAMETER_NAME, input.getBody());
else {
// GATE FastInfosetFormat can not handle binary in the string content.
Handler.paths.put(inputDigest, input.getBody());
featureMap.put(
Document.DOCUMENT_URL_PARAMETER_NAME,
new URL("b64",
bodyType != null ? Base64.encodeAsString(bodyType.getBytes()) : null,
64,
inputDigest,
b64Handler));
}
putRequestBody(featureMap, contentType, contentDigest, input.getBody(), input.getIsBase64Encoded());

response.getHeaders().put("x-zae-gate-cache", "HIT");
final Document doc = cacheComputeIfNull(
inputDigest,
() -> {
final Subsegment subsegment = AWSXRay.beginSubsegment("Gate Execute");
final Corpus corpus = application.getCorpus();
final Document rv = (Document)Factory.createResource(
"gate.corpora.DocumentImpl",
featureMap
);
response.getHeaders().put("x-zae-gate-cache", "MISS");
try {
corpus.add(rv);
application.execute();
} catch (GateException e) {
subsegment.addException(e);
throw e;
} finally {
corpus.clear();
AWSXRay.endSubsegment();
}
return rv;
}
);
final Document doc = cache.computeIfNull(contentDigest, () -> {
response.getHeaders().put("x-zae-gate-cache", "MISS");
return execute(featureMap);
});
AWSXRay.beginSubsegment("Gate Export");
AWSXRay.getCurrentSubsegment().putMetadata("Content-Type", response.getHeaders().get("Content-Type"));
try {
Expand All @@ -152,41 +116,36 @@ public APIGatewayProxyResponseEvent handleRequest(APIGatewayProxyRequestEvent in
}
}

private void cachePutDocument(String key, Document doc) {
try {
DiskLruCache.Editor editor = cache.edit(key);
editor.set(0, doc.toXml());
editor.commit();
} catch (IOException e) {
logger.warn(e);
AWSXRay.getCurrentSubsegment().addException(e);
private void putRequestBody(FeatureMap featureMap, String mimeType, String contentDigest, String content, boolean isBase64Encoded) throws MalformedURLException {
featureMap.put(Document.DOCUMENT_MIME_TYPE_PARAMETER_NAME, mimeType);
if (!isBase64Encoded)
featureMap.put(Document.DOCUMENT_STRING_CONTENT_PARAMETER_NAME, content);
else {
// GATE FastInfosetFormat can not handle binary in the string content.
Handler.paths.put(contentDigest, content);
featureMap.put(
Document.DOCUMENT_URL_PARAMETER_NAME,
new URL("b64",
mimeType != null ? Base64.encodeAsString(mimeType.getBytes()) : null,
64,
contentDigest,
b64Handler));
}
}

private Document cacheComputeIfNull(String key, Utils.TextProcessor processor) throws GateException {
private Document execute(FeatureMap docFeatureMap) throws GateException {
AWSXRay.beginSubsegment("Gate Execute");
try {
final DiskLruCache.Snapshot snapshot = cache.get(key);
if (snapshot == null) {
final Document doc = processor.process();
AWSXRay.createSubsegment("Cache Edit", () -> cachePutDocument(key, doc));
return doc;
} else {
AWSXRay.beginSubsegment("Cache Read");
try {
return Utils.xmlToDocument(new InputStreamReader(snapshot.getInputStream(0)));
} catch (ResourceInstantiationException | XMLStreamException e) {
logger.warn(e);
AWSXRay.getCurrentSubsegment().addException(e);
cache.remove(key);
return processor.process();
} finally {
AWSXRay.endSubsegment();
}
}
} catch (IOException e) {
logger.warn(e);
final Document rv = (Document) Factory.createResource("gate.corpora.DocumentImpl", docFeatureMap);
application.getCorpus().add(rv);
application.execute();
return rv;
} catch (GateException e) {
AWSXRay.getCurrentSubsegment().addException(e);
return processor.process();
throw e;
} finally {
application.getCorpus().clear();
AWSXRay.endSubsegment();
}
}

Expand Down Expand Up @@ -242,21 +201,4 @@ private static CorpusController loadApplication() {
}
}

private static DiskLruCache initializeCache() {
File cacheDir = new File(CACHE_DIR);
if (!cacheDir.exists() && !cacheDir.mkdirs()) {
throw new RuntimeException("Unable to create cache directory '" + cacheDir.getName() + "'.");
}
for (File file: Objects.requireNonNull(cacheDir.listFiles())) file.delete();
try {
long usableSpace = (long) (cacheDir.getUsableSpace()*CACHE_DIR_USAGE);
return DiskLruCache.open(cacheDir,
1,
1,
usableSpace);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

}
91 changes: 91 additions & 0 deletions src/main/java/co/zeroae/gate/DocumentLRUCache.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package co.zeroae.gate;

import com.amazonaws.xray.AWSXRay;
import com.jakewharton.disklrucache.DiskLruCache;
import gate.Document;
import gate.creole.ResourceInstantiationException;
import gate.util.GateException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import javax.xml.stream.XMLStreamException;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Objects;

class DocumentLRUCache {
private static final int VERSION = 1;
private static final int VALUE_COUNT = 1;

private static final Logger logger = LogManager.getLogger(DocumentLRUCache.class);

private final DiskLruCache cache;

DocumentLRUCache(String cacheDir, double maxUsage) {
cache = initializeCache(cacheDir, maxUsage);
}

public Document computeIfNull(String key, Utils.GATESupplier<Document> supplier) throws GateException {
Document rv = get(key);
if (rv == null) {
rv = supplier.get();
put(key, rv);
}
return rv;
}

public Document get(String key) {
AWSXRay.beginSubsegment("Cache Read");
try {
final DiskLruCache.Snapshot snapshot = cache.get(key);
if (snapshot != null) {
AWSXRay.beginSubsegment("Deserialize");
try {
return Utils.xmlToDocument(new InputStreamReader(snapshot.getInputStream(0)));
} catch (ResourceInstantiationException | XMLStreamException e) {
logger.warn(e);
AWSXRay.getCurrentSubsegment().addException(e);
cache.remove(key);
return null;
} finally {
AWSXRay.endSubsegment();
}
} else
return null;
} catch (IOException e) {
logger.warn(e);
return null;
} finally {
AWSXRay.endSubsegment();
}
}

public void put(String key, Document doc) {
AWSXRay.beginSubsegment("Cache Edit");
try {
DiskLruCache.Editor editor = cache.edit(key);
editor.set(0, doc.toXml());
editor.commit();
} catch (IOException e) {
logger.warn(e);
AWSXRay.getCurrentSubsegment().addException(e);
} finally {
AWSXRay.endSubsegment();
}
}

private static DiskLruCache initializeCache(String cacheDir, double maxUsage) {
File cacheDirPath = new File(cacheDir);
if (!cacheDirPath.exists() && !cacheDirPath.mkdirs()) {
throw new RuntimeException("Unable to create cache directory '" + cacheDirPath.getName() + "'.");
}
for (File file: Objects.requireNonNull(cacheDirPath.listFiles())) file.delete();
try {
long usableSpace = (long) (cacheDirPath.getUsableSpace()*maxUsage);
return DiskLruCache.open(cacheDirPath, VERSION, VALUE_COUNT, usableSpace);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
5 changes: 3 additions & 2 deletions src/main/java/co/zeroae/gate/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@

public class Utils {

interface TextProcessor {
Document process() throws GateException;
@FunctionalInterface
interface GATESupplier<T> {
T get() throws GateException;
}

/**
Expand Down