2020 Set ,
2121)
2222
23- from .exceptions import ContextNotPresent , DefinitionNotInContext
23+ from .exceptions import (
24+ ContextNotPresent ,
25+ DefinitionNotInContext ,
26+ ValidatorMissing ,
27+ )
2428from .types import Input , Parameter , Definition , Operation , Stage , DataFlow
2529from .base import (
2630 OperationException ,
@@ -122,6 +126,26 @@ async def inputs(self) -> AsyncIterator[Input]:
122126 for item in self .__inputs :
123127 yield item
124128
129+ def remove_input (self , item : Input ):
130+ for x in self .__inputs [:]:
131+ if x .uid == item .uid :
132+ self .__inputs .remove (x )
133+ break
134+
135+ def remove_unvalidated_inputs (self ) -> "MemoryInputSet" :
136+ """
137+ Removes `unvalidated` inputs from internal list and returns the same.
138+ """
139+ unvalidated_inputs = []
140+ for x in self .__inputs [:]:
141+ if not x .validated :
142+ unvalidated_inputs .append (x )
143+ self .__inputs .remove (x )
144+ unvalidated_input_set = MemoryInputSet (
145+ MemoryInputSetConfig (ctx = self .ctx , inputs = unvalidated_inputs )
146+ )
147+ return unvalidated_input_set
148+
125149
126150class MemoryParameterSetConfig (NamedTuple ):
127151 ctx : BaseInputSetContext
@@ -249,15 +273,19 @@ async def add(self, input_set: BaseInputSet):
249273 handle_string = handle .as_string ()
250274 # TODO These ctx.add calls should probably happen after inputs are in
251275 # self.ctxhd
276+
277+ # remove unvalidated inputs
278+ unvalidated_input_set = input_set .remove_unvalidated_inputs ()
279+
252280 # If the context for this input set does not exist create a
253281 # NotificationSet for it to notify the orchestrator
254282 if not handle_string in self .input_notification_set :
255283 self .input_notification_set [handle_string ] = NotificationSet ()
256284 async with self .ctx_notification_set () as ctx :
257- await ctx .add (input_set .ctx )
285+ await ctx .add (( None , input_set .ctx ) )
258286 # Add the input set to the incoming inputs
259287 async with self .input_notification_set [handle_string ]() as ctx :
260- await ctx .add (input_set )
288+ await ctx .add (( unvalidated_input_set , input_set ) )
261289 # Associate inputs with their context handle grouped by definition
262290 async with self .ctxhd_lock :
263291 # Create dict for handle_string if not present
@@ -921,6 +949,7 @@ async def run_dispatch(
921949 octx : BaseOrchestratorContext ,
922950 operation : Operation ,
923951 parameter_set : BaseParameterSet ,
952+ set_valid : bool = True ,
924953 ):
925954 """
926955 Run an operation in the background and add its outputs to the input
@@ -952,14 +981,14 @@ async def run_dispatch(
952981 if not key in expand :
953982 output = [output ]
954983 for value in output :
955- inputs .append (
956- Input (
957- value = value ,
958- definition = operation .outputs [key ],
959- parents = parents ,
960- origin = (operation .instance_name , key ),
961- )
984+ new_input = Input (
985+ value = value ,
986+ definition = operation .outputs [key ],
987+ parents = parents ,
988+ origin = (operation .instance_name , key ),
962989 )
990+ new_input .validated = set_valid
991+ inputs .append (new_input )
963992 except KeyError as error :
964993 raise KeyError (
965994 "Value %s missing from output:definition mapping %s(%s)"
@@ -1020,6 +1049,38 @@ async def operations_parameter_set_pairs(
10201049 ):
10211050 yield operation , parameter_set
10221051
1052+ async def validator_target_set_pairs (
1053+ self ,
1054+ octx : BaseOperationNetworkContext ,
1055+ rctx : BaseRedundancyCheckerContext ,
1056+ ctx : BaseInputSetContext ,
1057+ dataflow : DataFlow ,
1058+ unvalidated_input_set : BaseInputSet ,
1059+ ):
1060+ async for unvalidated_input in unvalidated_input_set .inputs ():
1061+ validator_instance_name = unvalidated_input .definition .validate
1062+ validator = dataflow .validators .get (validator_instance_name , None )
1063+ if validator is None :
1064+ raise ValidatorMissing (
1065+ "Validator with instance_name {validator_instance_name} not found"
1066+ )
1067+ # There is only one `input` in `validators`
1068+ input_name , input_definition = list (validator .inputs .items ())[0 ]
1069+ parameter = Parameter (
1070+ key = input_name ,
1071+ value = unvalidated_input .value ,
1072+ origin = unvalidated_input ,
1073+ definition = input_definition ,
1074+ )
1075+ parameter_set = MemoryParameterSet (
1076+ MemoryParameterSetConfig (ctx = ctx , parameters = [parameter ])
1077+ )
1078+ async for parameter_set , exists in rctx .exists (
1079+ validator , parameter_set
1080+ ):
1081+ if not exists :
1082+ yield validator , parameter_set
1083+
10231084
10241085@entrypoint ("memory" )
10251086class MemoryOperationImplementationNetwork (
@@ -1382,17 +1443,44 @@ async def run_operations_for_ctx(
13821443 task .print_stack (file = output )
13831444 self .logger .error ("%s" , output .getvalue ().rstrip ())
13841445 output .close ()
1446+
13851447 elif task is input_set_enters_network :
13861448 (
13871449 more ,
13881450 new_input_sets ,
13891451 ) = input_set_enters_network .result ()
1390- for new_input_set in new_input_sets :
1452+ for (
1453+ unvalidated_input_set ,
1454+ new_input_set ,
1455+ ) in new_input_sets :
1456+ async for operation , parameter_set in self .nctx .validator_target_set_pairs (
1457+ self .octx ,
1458+ self .rctx ,
1459+ ctx ,
1460+ self .config .dataflow ,
1461+ unvalidated_input_set ,
1462+ ):
1463+ await self .rctx .add (
1464+ operation , parameter_set
1465+ ) # is this required here?
1466+ dispatch_operation = await self .nctx .dispatch (
1467+ self , operation , parameter_set
1468+ )
1469+ dispatch_operation .operation = operation
1470+ dispatch_operation .parameter_set = (
1471+ parameter_set
1472+ )
1473+ tasks .add (dispatch_operation )
1474+ self .logger .debug (
1475+ "[%s]: dispatch operation: %s" ,
1476+ ctx_str ,
1477+ operation .instance_name ,
1478+ )
13911479 # forward inputs to subflow
13921480 await self .forward_inputs_to_subflow (
13931481 [x async for x in new_input_set .inputs ()]
13941482 )
1395- # Identify which operations have complete contextually
1483+ # Identify which operations have completed contextually
13961484 # appropriate input sets which haven't been run yet
13971485 async for operation , parameter_set in self .nctx .operations_parameter_set_pairs (
13981486 self .ictx ,
@@ -1402,6 +1490,9 @@ async def run_operations_for_ctx(
14021490 self .config .dataflow ,
14031491 new_input_set = new_input_set ,
14041492 ):
1493+ # Validation operations shouldn't be run here
1494+ if operation .validator :
1495+ continue
14051496 # Add inputs and operation to redundancy checker before
14061497 # dispatch
14071498 await self .rctx .add (operation , parameter_set )
0 commit comments