# Example usage

To use `nf_parser` in a project:

In [None]:
import nf_parser

print(nf_parser.__version__)

In [8]:
from pathlib import Path

def get_sample(name:str='1.nf'):
    txt = Path(f"../tests/samples/{name}").read_text()
    return txt
    

In [150]:
from lark import Lark

## test_file = get_sample('1.nf')  ## parsing
## test_file = get_sample('2.nf')  ## parsing
test_file = get_sample('2.nf') 
#nextflow_grammar = Path("../nf_parser/grammer/nf.lark").read_text()
nextflow_grammar = Path("./grammers/nf.lark").read_text()
parser = Lark(nextflow_grammar, parser='lalr')
tree = parser.parse(test_file)
#print(tree)
print(tree.pretty())

process
  align
  process_block
    directive
      publish_dir
        string	"${params.outdir}"
        map
          mode
          'copy'
    input
      comment	// some comment
      comment	/* file input_file */
      file
        variable	input_file
    output
      comment	// output comment
      comment	// output comment2
      file
        variable	output
    comment	// a comment
    script
      conditional_script
        if_script	if(1 == 1)
    // script comment
        """
        t_coffee -in $sequences > out_file
        """

    else if(2 == 2)
        """
        mafft --anysymbol --parttree --quiet $sequences > out_file
        """





**transformers**

In [99]:
## utils
def flatten_list(lst):
	result = []
	for item in lst:
		if isinstance(item, list):
			result.extend(flatten_list(item))
		else:
			result.append(item)
	return result

In [157]:
## constructs
from pydantic import validator, BaseModel, Field
from typing import List, Optional, Any, Union, Dict

class Comment(BaseModel):
	text: str = Field(..., description="the comment text value")

class Script(BaseModel):
	type: str = Field("bash", description="the script type")
	code: str = Field(..., description="the script source code text")
	template: Optional[str] = Field(None, description="path to shell script if template was provided. see [nextflow script template docs](https://www.nextflow.io/docs/latest/process.html#template)")
	condition: Optional[str] = Field(None, description="script conditional control flow expression type. see [nextflow conditional-scripts docs](https://www.nextflow.io/docs/latest/process.html#conditional-scripts)")

class Directive(BaseModel):
	name: str = Field(..., description="the directive name")
	value: str = Field(..., description="the directive value")
	options: Dict[str, Any] = Field(None, description="the directive options")

class Input(BaseModel):
	name: str = Field(..., description="the type")
	value: Union[str, List[str]] = Field(..., description="the value")
	comment: Optional[Comment] = None

class Output(Input):
	pass

class Process(BaseModel):
	name: str = Field(..., description="the name")
	inputs: List[Input] = []
	outputs: List[Output] = []
	comments: List[Comment] = None
	scripts: List[Script] = None
	directives: List[Directive] = None


In [158]:
from lark import Transformer, v_args, Discard
from typing import List

@v_args(inline=True)
class NextflowTransformer(Transformer):
	proceses: List[Process] = []
	#def __default__(self, data, c, h):
	#	return Discard
	CNAME = lambda _, v: str(v)
	STRING = lambda _, v: str(v)
	float = lambda _, v: float(v)
	int = lambda _, v: int(v)
	string = lambda _, v: str(v)
	variable = lambda _, v: str(v)
	map = lambda _, k, v: {k: v}

	def __init__(self):
		super().__init__()
	
	def dsl(self, item):
		pass

	def comment(self, item):
		return Comment(text=str(item))

	def module_import(self, items):
		return f'Module Import: {items}'

	def param(self, items):
		return f'Param: {items}'

	def function(self, items):
		return f'Function: {items}'

	def workflow(self, items):
		return f'Workflow: {items}'

	def process(self, name, items) -> Process:
		print('process:',items)
		return Process(name=name, **items)

	def workflow_block(self, items):
		return f'Workflow Block: {items}'

	def workflow_input(self, items):
		return f'Workflow Input: {items[0]}'

	def main(self, items):
		return f'Main: {items}'

	def emit(self, items):
		return f'Emit: {items}'

	def wf_output(self, items):
		return f'Workflow Output: {items}'

	def wf_named_output(self, items):
		return f'Workflow Named Output: {items}'

	def channel(self, items):
		return f'Channel: {items}'

	def function_call(self, items):
		return f'Function Call: {items}'

	@v_args(inline=False)
	def process_block(self, items):
		inputs, outputs, comments, scripts, directives = [], [], [], [], []
		items = flatten_list(items)
		print("process_block:", items)
		for item in items:
			if isinstance(item, Comment):
				comments.append(item)
			elif type(item) == Input:
				inputs.append(item)
			elif type(item) == Output:
				outputs.append(item)
			elif type(item) == Script:
				scripts.append(item)
			elif type(item) == Directive:
				directives.append(item)
			
		return {
			"inputs": inputs,
			"outputs": outputs,
			"comments": comments, 
			"scripts": scripts,
			"directives": directives,
		}
		#return f'Process Block: {items}'

	@v_args(inline=False)
	def input(self, items):
		items = flatten_list(items)
		#print('input:', items)
		out = []
		for x in items:
			if isinstance(x, dict):
				x = Input(**x)
			out.append(x)
		#print('input-transformed:', items)
		return out

	@v_args(inline=False)
	def output(self, items):
		items = flatten_list(items)
		#print('output:', items)
		out = []
		for x in items:
			if isinstance(x, dict):
				x = Output(**x)
			out.append(x)
		#print('output-transformed:', items)
		return out
	
	@v_args(inline=False)
	def script(self, items):
		items = flatten_list(items)
		print('script:', items)
		return items

	def shell(self, items):
		return items

	def exec(self, val):
		## todo: improve grammer parsing, might break
		return Script(type="exec", code=str(val))

	def directive(self, item):
		print('directive:', item)
		return item

	def bash_script(self, val):
		## todo: check the shebang of script if provided to annotate correct script type
		return Script(type="bash", code=str(val))

	def shell_script(self, val):
		return Script(type="shell", code=str(val))

	def template(self, val):
		# todo: get the script template source from the template path
		return Script(type="shell", code=str(val), template=str(val))

	def if_script(self, val):
		## todo: separate condition from expression
		return Script(type="bash", code=str(val), condition="if")

	def elif_script(self, val):
		## todo: separate condition from expression
		return Script(type="bash", code=str(val), condition="elif")

	def else_script(self, val):
		## todo: separate condition from expression
		return Script(type="bash", code=str(val), condition="else")

	@v_args(inline=False)
	def conditional_script(self, items):
		print('conditional_script:', items)
		return items

	def val(self, value):
		return {"name":"val", "value":value}

	def file(self, value):
		print('file:', value)
		return {"name":"file", "value":value}
	
	def path(self, value):
		return {"name":"path", "value":value}
	
	def env(self, value):
		return {"name":"env", "value":value}
	
	def stdin(self, value):
		return {"name":"stdin", "value":value}

	def tuple(self, value):
		return {"name":"tuple", "value":value}
	
	def each(self, value):
		return {"name":"each", "value":value}

	def accelerator(self, val, options=None):
		print(f'queue:', val, options)
		return Directive(name="accelerator", value=str(val), options=options)
	
	def before_script(self, val, options=None):
		print(f'before_script:', val, options)
		return Directive(name="before_script", value=str(val), options=options)
	
	def after_script(self, val, options=None):
		print(f'after_script:', val, options)
		return Directive(name="after_script", value=str(val), options=options)
	
	def cluster_options(self, val, options=None):
		print(f'cluster_options:', val, options)
		return Directive(name="cluster_options", value=str(val), options=options)
	
	def conda(self, val, options=None):
		print(f'conda:', val, options)
		return Directive(name="conda", value=str(val), options=options)
	
	def cache(self, val, options=None):
		print(f'cache:', val, options)
		return Directive(name="cache", value=str(val), options=options)
	
	def cpus(self, val, options=None):
		print(f'cpus:', val, options)
		return Directive(name="cpus", value=str(val), options=options)
	
	def container(self, val, options=None):
		print(f'container:', val, options)
		return Directive(name="container", value=str(val), options=options)
	
	def container_options(self, val, options=None):
		print(f'container_options:', val, options)
		return Directive(name="container_options", value=str(val), options=options)
	
	def debug(self, val, options=None):
		print(f'debug:', val, options)
		return Directive(name="debug", value=str(val), options=options)
	
	def disk(self, val, options=None):
		print(f'disk:', val, options)
		return Directive(name="disk", value=str(val), options=options)
	
	def echo(self, val, options=None):
		print(f'echo:', val, options)
		return Directive(name="echo", value=str(val), options=options)
	
	def error_strategy(self, val, options=None):
		print(f'error_strategy:', val, options)
		return Directive(name="error_strategy", value=str(val), options=options)
	
	def executor(self, val, options=None):
		print(f'executor:', val, options)
		return Directive(name="executor", value=str(val), options=options)
	
	def ext(self, val, options=None):
		print(f'ext:', val, options)
		return Directive(name="ext", value=str(val), options=options)
	
	def fair(self, val, options=None):
		print(f'fair:', val, options)
		return Directive(name="fair", value=str(val), options=options)
	
	def label(self, val, options=None):
		print(f'label:', val, options)
		return Directive(name="label", value=str(val), options=options)
	
	def machine_type(self, val, options=None):
		print(f'machine_type:', val, options)
		return Directive(name="machine_type", value=str(val), options=options)
	
	def max_errors(self, val, options=None):
		print(f'max_errors:', val, options)
		return Directive(name="max_errors", value=str(val), options=options)
	
	def max_forks(self, val, options=None):
		print(f'max_forks:', val, options)
		return Directive(name="max_forks", value=str(val), options=options)
	
	def max_retries(self, val, options=None):
		print(f'max_retries:', val, options)
		return Directive(name="max_retries", value=str(val), options=options)
	
	def memory(self, val, options=None):
		print(f'memory:', val, options)
		return Directive(name="memory", value=str(val), options=options)
	
	def module(self, val, options=None):
		print(f'module:', val, options)
		return Directive(name="module", value=str(val), options=options)
	
	def penv(self, val, options=None):
		print(f'penv:', val, options)
		return Directive(name="penv", value=str(val), options=options)
	
	def pod(self, val, options=None):
		print(f'pod:', val, options)
		return Directive(name="pod", value=str(val), options=options)
	
	#@v_args(inline=False)
	def publish_dir(self, val, options=None):
		print(f'publish_dir:', val, options)
		return Directive(name="publish_dir", value=str(val), options=options)

	def queue(self, val, options=None):
		print(f'queue:', val, options)
		return Directive(name="queue", value=str(val), options=options)
	
	def resource_labels(self, val, options=None):
		print(f'resource_labels:', val, options)
		return Directive(name="resource_labels", value=str(val), options=options)
	
	def scratch(self, val, options=None):
		print(f'scratch:', val, options)
		return Directive(name="scratch", value=str(val), options=options)
	
	def spack(self, val, options=None):
		print(f'spack:', val, options)
		return Directive(name="spack", value=str(val), options=options)
	
	def store_dir(self, val, options=None):
		print(f'store_dir:', val, options)
		return Directive(name="store_dir", value=str(val), options=options)
	
	def stage_in_mode(self, val, options=None):
		print(f'stage_in_mode:', val, options)
		return Directive(name="stage_in_mode", value=str(val), options=options)
	
	def stage_out_mode(self, val, options=None):
		print(f'stage_out_mode:', val, options)
		return Directive(name="stage_out_mode", value=str(val), options=options)
	
	def tag(self, val, options=None):
		print(f'tag:', val, options)
		return Directive(name="tag", value=str(val), options=options)

	def time(self, val, options=None):
		print(f'time:', val, options)
		return Directive(name="time", value=str(val), options=options)

	def parameters(self, items):
		return items

	def arg(self, items):
		return items

	def identifier(self, val):
		return val

	def declaration(self, val):
		return val

	def value(self, val):
		return val

	def code_block(self, val):
		return str(val)

	def statement(self, val):
		return str(val)

	def operator(self, val):
		## todo
		return str(val)

## testing transformer
transformer=NextflowTransformer()
data = transformer.transform(tree)
print("\n\n--Results:--\n", data)


publish_dir: "${params.outdir}" {'mode': "'copy'"}
directive: name='publish_dir' value='"${params.outdir}"' options={'mode': "'copy'"}
file: input_file
file: output
conditional_script: [Script(type='bash', code='if(1 == 1)\n    // script comment\n        """\n        t_coffee -in $sequences > out_file\n        """\n\n    else if(2 == 2)\n        """\n        mafft --anysymbol --parttree --quiet $sequences > out_file\n        """\n\n', template=None, condition='if')]
script: [Script(type='bash', code='if(1 == 1)\n    // script comment\n        """\n        t_coffee -in $sequences > out_file\n        """\n\n    else if(2 == 2)\n        """\n        mafft --anysymbol --parttree --quiet $sequences > out_file\n        """\n\n', template=None, condition='if')]
process_block: [Directive(name='publish_dir', value='"${params.outdir}"', options={'mode': "'copy'"}), Comment(text='// some comment'), Comment(text='/* file input_file */'), Input(name='file', value='input_file', comment=None), Commen