Skip to content

Commit

Permalink
JBPM-6681 - Support for lazy loaded variables (#1095)
Browse files Browse the repository at this point in the history
  • Loading branch information
mswiderski authored and Tihomir Surdilovic committed Dec 14, 2017
1 parent 95a8ede commit ebc4f60
Show file tree
Hide file tree
Showing 13 changed files with 347 additions and 48 deletions.
Expand Up @@ -87,6 +87,7 @@
import org.kie.internal.runtime.manager.context.ProcessInstanceIdContext; import org.kie.internal.runtime.manager.context.ProcessInstanceIdContext;
import org.kie.internal.task.api.TaskModelFactory; import org.kie.internal.task.api.TaskModelFactory;
import org.kie.internal.task.api.TaskModelProvider; import org.kie.internal.task.api.TaskModelProvider;
import org.kie.internal.utils.LazyLoaded;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


Expand Down Expand Up @@ -658,6 +659,12 @@ protected CaseFileInstance internalGetCaseFileInstance(String caseId, String dep
Map<String, Object> filteredData = authorizationManager.filterByDataAuthorization(caseId, caseFile, caseFile.getData()); Map<String, Object> filteredData = authorizationManager.filterByDataAuthorization(caseId, caseFile, caseFile.getData());
((CaseFileInstanceImpl)caseFile).setData(filteredData); ((CaseFileInstanceImpl)caseFile).setData(filteredData);


for (Object variable : caseFile.getData().values()) {
if (variable instanceof LazyLoaded<?>) {
((LazyLoaded<?>) variable).load();
}
}

return caseFile; return caseFile;
} }
logger.warn("Multiple case files found in working memory (most likely not using PER_CASE strategy), trying to filter out..."); logger.warn("Multiple case files found in working memory (most likely not using PER_CASE strategy), trying to filter out...");
Expand All @@ -671,7 +678,16 @@ protected CaseFileInstance internalGetCaseFileInstance(String caseId, String dep
// apply authorization // apply authorization
Map<String, Object> filteredData = authorizationManager.filterByDataAuthorization(caseId, caseFile, caseFile.getData()); Map<String, Object> filteredData = authorizationManager.filterByDataAuthorization(caseId, caseFile, caseFile.getData());
((CaseFileInstanceImpl)caseFile).setData(filteredData); ((CaseFileInstanceImpl)caseFile).setData(filteredData);


for (Object variable : caseFile.getData().values()) {
if (variable instanceof LazyLoaded<?>) {
((LazyLoaded<?>) variable).load();
}
}
} }


return caseFile; return caseFile;
} }


Expand Down
4 changes: 4 additions & 0 deletions jbpm-document/pom.xml
Expand Up @@ -37,6 +37,10 @@
<groupId>org.kie</groupId> <groupId>org.kie</groupId>
<artifactId>kie-api</artifactId> <artifactId>kie-api</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.kie</groupId>
<artifactId>kie-internal</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.drools</groupId> <groupId>org.drools</groupId>
<artifactId>drools-core</artifactId> <artifactId>drools-core</artifactId>
Expand Down
Expand Up @@ -23,6 +23,7 @@ public interface Document extends Serializable {


public static final String DOCUMENT_DATE_PATTERN = "yyyy-MM-dd HH:mm:ss"; public static final String DOCUMENT_DATE_PATTERN = "yyyy-MM-dd HH:mm:ss";
public static final String PROPERTIES_SEPARATOR = "####"; public static final String PROPERTIES_SEPARATOR = "####";
public static final String UPDATED_ATTRIBUTE = "_UPDATED_";


void setIdentifier(String identifier); void setIdentifier(String identifier);


Expand Down
Expand Up @@ -34,11 +34,15 @@ public class DocumentMarshallingStrategy extends AbstractDocumentMarshallingStra
private DocumentStorageService documentStorageService; private DocumentStorageService documentStorageService;


public DocumentMarshallingStrategy() { public DocumentMarshallingStrategy() {
documentStorageService = DocumentStorageServiceProvider.get().getStorageService(); this.documentStorageService = DocumentStorageServiceProvider.get().getStorageService();
} }


public DocumentMarshallingStrategy(String path) { public DocumentMarshallingStrategy(String path) {
documentStorageService = DocumentStorageServiceProvider.get().getStorageService(); this.documentStorageService = DocumentStorageServiceProvider.get().getStorageService();
}

public DocumentMarshallingStrategy(DocumentStorageService documentStorageService) {
this.documentStorageService = documentStorageService;
} }


@Override @Override
Expand All @@ -48,40 +52,22 @@ public Document buildDocument( String name, long size, Date lastModified, Map<St


@Override @Override
public void write(ObjectOutputStream os, Object object) throws IOException { public void write(ObjectOutputStream os, Object object) throws IOException {
Document document = (Document) object; throw new UnsupportedOperationException("write is not supported anymore, use marshal instead");

if (document != null && document.getContent() != null) {
documentStorageService.saveDocument(document, document.getContent());
}
os.writeUTF(document.getIdentifier());
os.writeUTF(document.getClass().getCanonicalName());
os.writeUTF(document.getLink());
} }


public Object read(ObjectInputStream os) throws IOException, ClassNotFoundException { public Object read(ObjectInputStream os) throws IOException, ClassNotFoundException {
String objectId = os.readUTF(); throw new UnsupportedOperationException("read is not supported anymore, use unmarshal instead");
String canonicalName = os.readUTF();
String link = os.readUTF();
try {
Document doc = documentStorageService.getDocument(objectId);
Document document = (Document) Class.forName(canonicalName).newInstance();
document.setIdentifier(objectId);
document.setLink(link);
document.setName(doc.getName());
document.setSize(doc.getSize());
document.setLastModified(doc.getLastModified());
document.setAttributes(doc.getAttributes());
document.setContent(doc.getContent());
return document;
} catch(Exception e) {
throw new RuntimeException("Cannot read document", e);
}
} }


@Override @Override
public byte[] marshal(Context context, ObjectOutputStream objectOutputStream, Object o) throws IOException { public byte[] marshal(Context context, ObjectOutputStream objectOutputStream, Object o) throws IOException {
Document document = (Document) o; Document document = (Document) o;
documentStorageService.saveDocument(document, document.getContent()); String updatedAttribute = document.getAttribute(Document.UPDATED_ATTRIBUTE);
if (Boolean.parseBoolean(updatedAttribute)) {
// store via service only when it was actually updated
documentStorageService.saveDocument(document, document.getContent());
document.addAttribute(Document.UPDATED_ATTRIBUTE, "false");
}
ByteArrayOutputStream buff = new ByteArrayOutputStream(); ByteArrayOutputStream buff = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(buff); ObjectOutputStream oos = new ObjectOutputStream(buff);
oos.writeUTF(document.getIdentifier()); oos.writeUTF(document.getIdentifier());
Expand All @@ -91,28 +77,24 @@ public byte[] marshal(Context context, ObjectOutputStream objectOutputStream, Ob
return buff.toByteArray(); return buff.toByteArray();
} }


@SuppressWarnings({"resource", "unused"})
@Override @Override
public Object unmarshal(Context context, ObjectInputStream objectInputStream, byte[] object, ClassLoader classLoader) throws IOException, ClassNotFoundException { public Object unmarshal(Context context, ObjectInputStream objectInputStream, byte[] object, ClassLoader classLoader) throws IOException, ClassNotFoundException {
DroolsObjectInputStream is = new DroolsObjectInputStream(new ByteArrayInputStream(object), classLoader); DroolsObjectInputStream is = new DroolsObjectInputStream(new ByteArrayInputStream(object), classLoader);
// first we read out the object id and class name we stored during marshaling // first we read out the object id and class name we stored during marshaling
String objectId = is.readUTF(); String objectId = is.readUTF();
String canonicalName = is.readUTF(); String canonicalName = is.readUTF();
String link = is.readUTF(); String link = is.readUTF();
Document document = null; Document storedDoc = null;
try { try {
document = (Document) Class.forName(canonicalName).newInstance(); storedDoc = documentStorageService.getDocument(objectId);
Document storedDoc = documentStorageService.getDocument(objectId); storedDoc.setLink( link );
document.setIdentifier(storedDoc.getIdentifier()); // when loaded, mark it as not updated to avoid not needed marshalling
document.setName( storedDoc.getName() ); storedDoc.addAttribute(Document.UPDATED_ATTRIBUTE, "false");
document.setLink( link );
document.setLastModified( storedDoc.getLastModified() );
document.setSize( storedDoc.getSize() );
document.setAttributes( storedDoc.getAttributes() );
document.setContent(storedDoc.getContent());
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException("Cannot read document from storage service", e); throw new RuntimeException("Cannot read document from storage service", e);
} }
return document; return storedDoc;
} }


@Override @Override
Expand Down
Expand Up @@ -51,6 +51,13 @@ public interface DocumentStorageService {
* @return The java.io.File identified with the id * @return The java.io.File identified with the id
*/ */
Document getDocument(String id); Document getDocument(String id);

/**
* Loads document content
* @param id unique id of the document
* @return loaded document's content
*/
byte[] loadContent(String id);


/** /**
* Deletes the File identified by the given id * Deletes the File identified by the given id
Expand Down
Expand Up @@ -26,24 +26,33 @@
import javax.xml.bind.annotation.XmlRootElement; import javax.xml.bind.annotation.XmlRootElement;


import org.jbpm.document.Document; import org.jbpm.document.Document;
import org.jbpm.document.service.DocumentStorageService;
import org.kie.internal.utils.LazyLoaded;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


@XmlAccessorType(XmlAccessType.FIELD) @XmlAccessorType(XmlAccessType.FIELD)
@XmlRootElement(name = "document-object") @XmlRootElement(name = "document-object")
public class DocumentImpl implements Document { public class DocumentImpl implements Document, LazyLoaded<DocumentStorageService> {


private static final long serialVersionUID = -7422666286189013484L; private static final long serialVersionUID = -7422666286189013484L;


private static final Logger logger = LoggerFactory.getLogger(DocumentImpl.class);

private String identifier = ""; private String identifier = "";
private String name; private String name;
private String link = ""; private String link = "";
private long size; private long size;
private Date lastModified; private Date lastModified;
private byte[] content; private byte[] content;
private Map<String, String> attributes; private Map<String, String> attributes;

private transient DocumentStorageService service;


public DocumentImpl() { public DocumentImpl() {
// Setting default values for identifier && download link // Setting default values for identifier && download link
this.identifier = UUID.randomUUID().toString(); this.identifier = UUID.randomUUID().toString();
this.attributes = new HashMap<String, String>();
} }


public DocumentImpl(String identifier, public DocumentImpl(String identifier,
Expand Down Expand Up @@ -155,10 +164,13 @@ public Map<String, String> getAttributes() {
@Override @Override
public void setContent(byte[] content) { public void setContent(byte[] content) {
this.content = content; this.content = content;
addAttribute(UPDATED_ATTRIBUTE, "true");
} }


@Override @Override
public byte[] getContent() { public byte[] getContent() {
load();

return content; return content;
} }


Expand All @@ -167,4 +179,22 @@ public String toString() {
SimpleDateFormat sdf = new SimpleDateFormat(DOCUMENT_DATE_PATTERN); SimpleDateFormat sdf = new SimpleDateFormat(DOCUMENT_DATE_PATTERN);
return name + PROPERTIES_SEPARATOR + size + PROPERTIES_SEPARATOR + ((lastModified != null) ? sdf.format(lastModified) : "") + PROPERTIES_SEPARATOR + identifier; return name + PROPERTIES_SEPARATOR + size + PROPERTIES_SEPARATOR + ((lastModified != null) ? sdf.format(lastModified) : "") + PROPERTIES_SEPARATOR + identifier;
} }

/*
* lazy load support
*/

@Override
public void setLoadService(DocumentStorageService service) {
this.service = service;
}

@Override
public void load() {
if (content == null && service != null && identifier != null) {
content = service.loadContent(identifier);
} else {
logger.debug("Cannot load content due to missing service {} or identifier {}", service, identifier);
}
}
} }
Expand Up @@ -104,13 +104,13 @@ public Document getDocument(String id) {
if (file.exists() && !file.isFile() && !ArrayUtils.isEmpty(file.listFiles())) { if (file.exists() && !file.isFile() && !ArrayUtils.isEmpty(file.listFiles())) {
try { try {
File destination = file.listFiles()[0]; File destination = file.listFiles()[0];
Document doc = new DocumentImpl(id, DocumentImpl doc = new DocumentImpl(id,
destination.getName(), destination.getName(),
destination.length(), destination.length(),
new Date(destination.lastModified())); new Date(destination.lastModified()));
doc.setContent(FileUtils.readFileToByteArray(destination)); doc.setLoadService(this);
return doc; return doc;
} catch (IOException e) { } catch (Exception e) {
log.error("Error loading document '{}': {}", log.error("Error loading document '{}': {}",
id, id,
e); e);
Expand Down Expand Up @@ -217,4 +217,20 @@ public List<Document> listDocuments(Integer page, Integer pageSize) {
} }
return listOfDocs; return listOfDocs;
} }

@Override
public byte[] loadContent(String id) {
File file = getFileByPath(id);

if (file.exists() && !file.isFile() && !ArrayUtils.isEmpty(file.listFiles())) {
try {
File destination = file.listFiles()[0];
return FileUtils.readFileToByteArray(destination);
} catch (IOException e) {
log.error("Unable to laod content due to {}", e.getMessage(), e);
}
}

return null;
}
} }
Expand Up @@ -22,9 +22,11 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;


import org.jbpm.document.Document; import org.jbpm.document.Document;
import org.jbpm.document.Documents; import org.jbpm.document.Documents;
import org.jbpm.document.service.impl.CustomDocumentStorageServiceImpl;
import org.jbpm.document.service.impl.DocumentImpl; import org.jbpm.document.service.impl.DocumentImpl;
import org.junit.Test; import org.junit.Test;


Expand Down Expand Up @@ -78,6 +80,34 @@ public void testNoDocumentsMarshallUnmarshall() throws IOException, ClassNotFoun
assertEquals(docs.getDocuments().size(), unmarshalledDocuments.getDocuments().size()); assertEquals(docs.getDocuments().size(), unmarshalledDocuments.getDocuments().size());
} }


@Test
public void testSingleDocMarshallUnmarshallTracking() throws IOException, ClassNotFoundException {
final AtomicInteger counter = new AtomicInteger(0);
DocumentMarshallingStrategy docMarshallingStrategy = new DocumentMarshallingStrategy(new CustomDocumentStorageServiceImpl(){

@Override
public Document saveDocument(Document document, byte[] content) {
counter.incrementAndGet();
return super.saveDocument(document, content);
}

});
Document document = getDocument("docOne");
byte[] marshalledDocument = docMarshallingStrategy.marshal(null, null, document);
assertEquals(1, counter.get());
Document unmarshalledDocument = (Document) docMarshallingStrategy.unmarshal(null, null, marshalledDocument, this.getClass().getClassLoader());

assertEquals(document.getName(), unmarshalledDocument.getName());
assertEquals(document.getLink(), unmarshalledDocument.getLink());

// marhsall it again, it should not call the save on document service since document didn't change
marshalledDocument = docMarshallingStrategy.marshal(null, null, unmarshalledDocument);
assertEquals(1, counter.get());

unmarshalledDocument.setContent("updated content".getBytes());
marshalledDocument = docMarshallingStrategy.marshal(null, null, unmarshalledDocument);
assertEquals(2, counter.get());
}


private Document getDocument(String documentName) { private Document getDocument(String documentName) {
Document documentOne = new DocumentImpl(); Document documentOne = new DocumentImpl();
Expand Down
Expand Up @@ -60,6 +60,7 @@
import org.kie.internal.runtime.manager.context.CaseContext; import org.kie.internal.runtime.manager.context.CaseContext;
import org.kie.internal.runtime.manager.context.CorrelationKeyContext; import org.kie.internal.runtime.manager.context.CorrelationKeyContext;
import org.kie.internal.runtime.manager.context.ProcessInstanceIdContext; import org.kie.internal.runtime.manager.context.ProcessInstanceIdContext;
import org.kie.internal.utils.LazyLoaded;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


Expand Down Expand Up @@ -412,7 +413,11 @@ public Object execute(org.kie.api.runtime.Context context) {
if (pi == null) { if (pi == null) {
throw new ProcessInstanceNotFoundException("Process instance with id " + processInstanceId + " was not found"); throw new ProcessInstanceNotFoundException("Process instance with id " + processInstanceId + " was not found");
} }
return pi.getVariable(variableName); Object variable = pi.getVariable(variableName);
if (variable instanceof LazyLoaded<?>) {
((LazyLoaded<?>) variable).load();
}
return variable;
} }
}); });
} catch(SessionNotFoundException e) { } catch(SessionNotFoundException e) {
Expand Down Expand Up @@ -445,7 +450,15 @@ public Map<String, Object> getProcessInstanceVariables(String deploymentId, Long
KieSession ksession = engine.getKieSession(); KieSession ksession = engine.getKieSession();
WorkflowProcessInstanceImpl pi = (WorkflowProcessInstanceImpl) ksession.getProcessInstance(processInstanceId, true); WorkflowProcessInstanceImpl pi = (WorkflowProcessInstanceImpl) ksession.getProcessInstance(processInstanceId, true);


return pi.getVariables(); Map<String, Object> variables = pi.getVariables();

for (Object variable : variables.values()) {
if (variable instanceof LazyLoaded<?>) {
((LazyLoaded<?>) variable).load();
}
}

return variables;
} catch(SessionNotFoundException e) { } catch(SessionNotFoundException e) {
throw new ProcessInstanceNotFoundException("Process instance with id " + processInstanceId + " was not found", e); throw new ProcessInstanceNotFoundException("Process instance with id " + processInstanceId + " was not found", e);
} finally { } finally {
Expand Down

0 comments on commit ebc4f60

Please sign in to comment.