@@ -713,3 +713,192 @@ def modification_timestamp(self):
713713 @staticmethod
714714 def max_samples ():
715715 return qdb .util .max_preparation_samples ()
716+
717+ def add_default_workflow (self , user ):
718+ """The modification timestamp of the prep information
719+
720+
721+ Parameters
722+ ----------
723+ user : The user that requested to add the default workflows
724+
725+ Returns
726+ -------
727+ ProcessingWorkflow
728+ The workflow created
729+
730+ Raises
731+ ------
732+ ValueError
733+ If this preparation doesn't have valid workflows
734+ If this preparation has been fully processed
735+ """
736+ # helper functions to avoid duplication of code
737+
738+ def _get_node_info (node ):
739+ # retrieves the merging scheme of a node
740+ parent = list (wk .graph .predecessors (node ))
741+ if parent :
742+ parent = parent .pop ()
743+ pdp = parent .default_parameter
744+ pcmd = pdp .command
745+ pparams = pdp .values
746+ else :
747+ pcmd = None
748+ pparams = {}
749+
750+ dp = node .default_parameter
751+ cparams = dp .values
752+ ccmd = dp .command
753+
754+ parent_cmd_name = None
755+ parent_merging_scheme = None
756+ if pcmd is not None :
757+ parent_cmd_name = pcmd .name
758+ parent_merging_scheme = pcmd .merging_scheme
759+
760+ return qdb .util .human_merging_scheme (
761+ ccmd .name , ccmd .merging_scheme , parent_cmd_name ,
762+ parent_merging_scheme , cparams , [], pparams )
763+
764+ def _get_predecessors (node ):
765+ # recursive method to get predecessors of a given node
766+ for pnode in wk .graph .predecessors (node ):
767+ pred = _get_predecessors (pnode )
768+ cxns = {x [0 ]: x [2 ]
769+ for x in wk .graph .get_edge_data (
770+ pnode , node )['connections' ].connections }
771+ data = [pnode , node , cxns ]
772+ if pred is None :
773+ pred = [data ]
774+ else :
775+ pred .append (data )
776+ return pred
777+
778+ # Note: we are going to use the final BIOMs to figure out which
779+ # processing is missing from the back/end to the front, as this
780+ # will prevent generating unnecessary steps (AKA already provided
781+ # by another command), like "Split Library of Demuxed",
782+ # when "Split per Sample" is alrady generated
783+ #
784+ # The steps to generate the default workflow are as follow:
785+ # 1. retrieve all valid merging schemes from valid jobs in the
786+ # current preparation
787+ # 2. retrive all the valid workflows for the preparation data type and
788+ # find the final BIOM missing from the valid available merging
789+ # schemes
790+ # 3. loop over the missing merging schemes and create the commands
791+ # missing to get to those processed samples and add them to a new
792+ # workflow
793+ # 4.
794+
795+ # 1.
796+ prep_jobs = [j for c in self .artifact .descendants .nodes ()
797+ for j in c .jobs (show_hidden = True )
798+ if j .command .software .type == 'artifact transformation' ]
799+ merging_schemes = {
800+ qdb .archive .Archive .get_merging_scheme_from_job (j ): {
801+ x : y .id for x , y in j .outputs .items ()}
802+ for j in prep_jobs if j .status == 'success' and not j .hidden }
803+
804+ # 2.
805+ pt_dt = self .data_type ()
806+ workflows = [wk for wk in qdb .software .DefaultWorkflow .iter ()
807+ if pt_dt in wk .data_type ]
808+ if not workflows :
809+ raise ValueError (f'This preparation data type: "{ pt_dt } " does not '
810+ 'have valid workflows' )
811+ missing_artifacts = dict ()
812+ for wk in workflows :
813+ missing_artifacts [wk ] = dict ()
814+ for node , degree in wk .graph .out_degree ():
815+ if degree != 0 :
816+ continue
817+ mscheme = _get_node_info (node )
818+ if mscheme not in merging_schemes :
819+ missing_artifacts [wk ][mscheme ] = node
820+ if not missing_artifacts [wk ]:
821+ del missing_artifacts [wk ]
822+ if not missing_artifacts :
823+ raise ValueError ('This preparation is complete' )
824+
825+ # 3.
826+ workflow = None
827+ for wk , wk_data in missing_artifacts .items ():
828+ previous_jobs = dict ()
829+ for ma , node in wk_data .items ():
830+ predecessors = _get_predecessors (node )
831+ predecessors .reverse ()
832+ cmds_to_create = []
833+ init_artifacts = None
834+ for i , (pnode , cnode , cxns ) in enumerate (predecessors ):
835+ cdp = cnode .default_parameter
836+ cdp_cmd = cdp .command
837+ params = cdp .values .copy ()
838+
839+ icxns = {y : x for x , y in cxns .items ()}
840+ reqp = {x : icxns [y [1 ][0 ]]
841+ for x , y in cdp_cmd .required_parameters .items ()}
842+ cmds_to_create .append ([cdp_cmd , params , reqp ])
843+
844+ info = _get_node_info (pnode )
845+ if info in merging_schemes :
846+ if set (merging_schemes [info ]) >= set (cxns ):
847+ init_artifacts = merging_schemes [info ]
848+ break
849+ if init_artifacts is None :
850+ pdp = pnode .default_parameter
851+ pdp_cmd = pdp .command
852+ params = pdp .values .copy ()
853+ reqp = {x : y [1 ][0 ]
854+ for x , y in pdp_cmd .required_parameters .items ()}
855+ cmds_to_create .append ([pdp_cmd , params , reqp ])
856+
857+ init_artifacts = {
858+ self .artifact .artifact_type : self .artifact .id }
859+
860+ cmds_to_create .reverse ()
861+ current_job = None
862+ for i , (cmd , params , rp ) in enumerate (cmds_to_create ):
863+ previous_job = current_job
864+ if previous_job is None :
865+ req_params = dict ()
866+ for iname , dname in rp .items ():
867+ if dname not in init_artifacts :
868+ msg = (f'Missing Artifact type: "{ dname } " in '
869+ 'this preparation; are you missing a '
870+ 'step to start?' )
871+ raise ValueError (msg )
872+ req_params [iname ] = init_artifacts [dname ]
873+ else :
874+ req_params = dict ()
875+ connections = dict ()
876+ for iname , dname in rp .items ():
877+ req_params [iname ] = f'{ previous_job .id } { dname } '
878+ connections [dname ] = iname
879+ params .update (req_params )
880+ job_params = qdb .software .Parameters .load (
881+ cmd , values_dict = params )
882+
883+ if job_params in previous_jobs .values ():
884+ for x , y in previous_jobs .items ():
885+ if job_params == y :
886+ current_job = x
887+ continue
888+
889+ if workflow is None :
890+ PW = qdb .processing_job .ProcessingWorkflow
891+ workflow = PW .from_scratch (user , job_params )
892+ current_job = [j for j in workflow .graph .nodes ()][0 ]
893+ else :
894+ if previous_job is None :
895+ current_job = workflow .add (
896+ job_params , req_params = req_params )
897+ else :
898+ current_job = workflow .add (
899+ job_params , req_params = req_params ,
900+ connections = {previous_job : connections })
901+
902+ previous_jobs [current_job ] = job_params
903+
904+ return workflow
0 commit comments