@@ -37,7 +37,6 @@
import com .google .common .util .concurrent .ListenableFuture ;
import com .google .common .util .concurrent .SettableFuture ;
import com .thoughtworks .xstream .XStream ;
import com .thoughtworks .xstream .XStreamException ;
import com .thoughtworks .xstream .converters .Converter ;
import com .thoughtworks .xstream .converters .MarshallingContext ;
import com .thoughtworks .xstream .converters .UnmarshallingContext ;
@@ -201,7 +200,7 @@ public class CpsFlowExecution extends FlowExecution {
/*package*/ /*final*/ Map <String ,String > loadedScripts = new HashMap <String , String >();
private final boolean sandbox ;
private /*almost final*/ FlowExecutionOwner owner ;
private transient /*almost final*/ FlowExecutionOwner owner ;
/**
* Loading of the program is asynchronous because it requires us to re-obtain stateful objects.
@@ -225,10 +224,12 @@ public class CpsFlowExecution extends FlowExecution {
* Start nodes that have been created, whose {@link BlockEndNode} is not yet created.
*/
@ GuardedBy ("this" )
/*package*/ final Stack <BlockStartNode > startNodes = new Stack <BlockStartNode >();
/*package*/ /* almost final*/ Stack <BlockStartNode > startNodes = new Stack <BlockStartNode >();
private transient List <String > startNodesSerial ; // used only between unmarshal and onLoad
@ GuardedBy ("this" )
private final NavigableMap <Integer ,FlowHead > heads = new TreeMap ();
private /* almost final*/ NavigableMap <Integer ,FlowHead > heads = new TreeMap <Integer ,FlowHead >();
private transient Map <Integer ,String > headsSerial ; // used only between unmarshal and onLoad
private final AtomicInteger iota = new AtomicInteger ();
@@ -383,7 +384,23 @@ public int iota() {
}
@ Override
public void onLoad () {
public void onLoad (FlowExecutionOwner owner ) throws IOException {
this .owner = owner ;
storage = createStorage ();
// heads could not be restored in unmarshal, so doing that now:
heads = new TreeMap <Integer ,FlowHead >();
for (Map .Entry <Integer ,String > entry : headsSerial .entrySet ()) {
FlowHead h = new FlowHead (this , entry .getKey ());
h .setForDeserialize (storage .getNode (entry .getValue ()));
heads .put (h .getId (), h );
}
headsSerial = null ;
// Same for startNodes:
startNodes = new Stack <BlockStartNode >();
for (String id : startNodesSerial ) {
startNodes .add ((BlockStartNode ) storage .getNode (id ));
}
startNodesSerial = null ;
try {
if (!isComplete ())
loadProgramAsync (getProgramDataFile ());
@@ -766,12 +783,10 @@ void notifyListeners(FlowNode node) {
// so that the execution gets suspended while we are getting serialized
public static final class ConverterImpl implements Converter {
private final XStream xs ;
private final ReflectionProvider ref ;
private final Mapper mapper ;
public ConverterImpl (XStream xs ) {
this .xs = xs ;
this .ref = xs .getReflectionProvider ();
this .mapper = xs .getMapper ();
}
@@ -786,7 +801,6 @@ public void marshal(Object source, HierarchicalStreamWriter w, MarshallingContex
writeChild (w , context , "result" , e .result , Result .class );
writeChild (w , context , "script" , e .script , String .class );
writeChild (w , context , "loadedScripts" , e .loadedScripts , Map .class );
writeChild (w , context , "owner" , e .owner , Object .class );
if (e .user != null ) {
writeChild (w , context , "user" , e .user , String .class );
}
@@ -814,17 +828,15 @@ private <T> void writeChild(HierarchicalStreamWriter w, MarshallingContext conte
}
public Object unmarshal (HierarchicalStreamReader reader , final UnmarshallingContext context ) {
try {
CpsFlowExecution result ;
if (context .currentObject ()!=null ) {
result = (CpsFlowExecution ) context .currentObject ();
} else {
result = (CpsFlowExecution ) ref .newInstance (CpsFlowExecution .class );
}
FlowNodeStorage storage = result .storage ;
Stack <BlockStartNode > startNodes = new Stack <BlockStartNode >();
Map <Integer ,FlowHead > heads = new TreeMap <Integer , FlowHead >();
result .startNodesSerial = new ArrayList <String >();
result .headsSerial = new TreeMap <Integer ,String >();
while (reader .hasMoreChildren ()) {
reader .moveDown ();
@@ -843,39 +855,29 @@ public Object unmarshal(HierarchicalStreamReader reader, final UnmarshallingCont
setField (result , "loadedScripts" , loadedScripts );
} else
if (nodeName .equals ("owner" )) {
FlowExecutionOwner owner = (FlowExecutionOwner ) readChild (reader , context , Object .class , result );
setField (result , "owner" , owner );
setField (result , "storage" , storage = result .createStorage ());
readChild (reader , context , Object .class , result ); // for compatibility; discarded
} else
if (nodeName .equals ("user" )) {
String user = readChild (reader , context , String .class , result );
setField (result , "user" , user );
} else
if (nodeName .equals ("head" )) {
String [] head = readChild (reader , context , String .class , result ).split (":" );
FlowHead h = new FlowHead (result , Integer .parseInt (head [0 ]));
h .setForDeserialize (storage .getNode (head [1 ]));
heads .put (h .getId (), h );
result .headsSerial .put (Integer .parseInt (head [0 ]), head [1 ]);
} else
if (nodeName .equals ("iota" )) {
Integer iota = readChild (reader , context , Integer .class , result );
setField (result , "iota" , new AtomicInteger (iota ));
} else
if (nodeName .equals ("start" )) {
String id = readChild (reader , context , String .class , result );
startNodes . add (( BlockStartNode ) storage . getNode ( id ) );
result . startNodesSerial . add (id );
}
reader .moveUp ();
}
setField (result , "startNodes" , startNodes );
setField (result , "heads" , heads );
return result ;
} catch (IOException e ) {
throw new XStreamException ("Failed to read back CpsFlowExecution" ,e );
}
}
private void setField (CpsFlowExecution result , String fieldName , Object value ) {