# core

> A simple DSL for running tasks on string inputs. Built with parsing URLs in mind.

In [None]:
#| default_exp core

A recipe consists of two lines.
```
input: <some_txt>
actions: <step_name>.<other_step_name>
```

`input` is a string (could be a URL, or an ip address, or a word, anything).
`actions` is a dot-seperated string representation of actions to apply to the `input` in a chained manner.

In [None]:
#| export

from fastcore.all import * 

class Recipe:
    "A recipe that can be loaded from a file and executed"
    def __init__(self, name=None, input=None, actions=None):
        store_attr()
        
    def __repr__(self): 
        return basic_repr('name,input,actions')(self)
    
    @classmethod
    def from_file(cls, path):
        "Load recipe from a .recipe file"
        p = Path(path)
        lines = p.read_text().splitlines()
        d = {}
        for line in lines:
            if ':' not in line: continue
            k,v = line.split(':', 1)
            d[k.strip()] = v.strip()
        return cls(name=p.stem, **d)

Let's try to read in a recipe file.

In [None]:
import tempfile
import os

# Create and use the temp file
with tempfile.NamedTemporaryFile(mode='w', delete=False) as temp:
    temp.write("""input: <some_txt>
actions: <step_name>.<other_step_name>""")
    temp_path = temp.name

recipe = Recipe.from_file(temp_path)
print(recipe)

# Manually delete the file when done
os.unlink(temp_path)

Recipe(name='tmp9s5dlejd', input='<some_txt>', actions='<step_name>.<other_step_name>')


So far so good. But let's see if we can parse the `actions` string to a representation that will be easier to use.

In [None]:
#| exporti

def parse_action(action_str):
    "Parse a single action string like 'func|arg1=val1,arg2=val2' into (func_name, kwargs)"
    if '|' not in action_str: return action_str.strip(), {}
    fname, args = action_str.split('|')
    fname = fname.strip()
    if not args: return fname, {}
    
    # Split only on commas that are between key=value pairs
    kwargs = {}
    current_key = None
    current_val = []

    rest = args
    while True:
        current_key, rest = rest[:args.index('=')], rest[args.index('=')+1:]
        if ',' not in rest:
            current_val = rest
            break
        else:
            comma_idx = rest.index(',')
            if comma_idx == 0:  # period is an arg
                current_val = ','
                if len(rest) == 1: break  # no more args
                else: rest = rest[2:]  # we have at least one more key=val pair to process 
            else:  # comma is not an arg but a key=val pair separator OR a value with a comma, for instance the string "a,b"
                if '=' in rest:
                    current_val, rest = rest[:comma_idx], rest[comma_idx+1:]
                    # When we find a comma before an equals in the rest of the string
                    if ',' in rest and (rest.index(',') < rest.index('=')):
                        raise ValueError(
                            f"Invalid argument format in '{args}': Found a comma before key=value pair.\n"
                            "When using multiple arguments:\n"
                            "1. Use key1=val1,key2=val2 format\n"
                            "2. Commas can only appear between key=value pairs\n"
                            "Hint: If you need commas in a value, use only one argument (key=val1,val2)"
                        )
                else:
                    current_val = rest
                    break
            #
        kwargs[current_key] = current_val
        current_key = None

    if current_key:
        kwargs[current_key] = current_val

    for value in kwargs.values():
        if '=' in value and len(kwargs) > 1:
            raise ValueError(
                f"Invalid argument format in '{args}': Found multiple equals signs in one argument.\n"
                "Arguments must be either:\n"
                "1. Single argument: func|key=any value with = or ,\n"
                "2. Multiple arguments: func|key1=val1,key2=val2\n"
                "Hint: You cannot mix these styles - choose one approach"
        )

    kwargs = {k.strip(): v.strip() for k,v in kwargs.items()}
    
    return fname, kwargs

In [None]:
# Test valid cases
test_eq(parse_action("func"), ("func", {}))
test_eq(parse_action("func|arg=val"), ("func", {"arg": "val"}))
test_eq(parse_action("func|arg=val1,val2"), ("func", {"arg": "val1,val2"}))  # Single arg with comma
test_eq(parse_action("func|arg1=val1,arg2=val2"), ("func", {"arg1": "val1", "arg2": "val2"}))
test_eq(parse_action("join|sep=,"), ("join", {"sep": ","}))
test_eq(parse_action("func|arg=a=b"), ("func", {"arg": "a=b"}))

# Test invalid cases - comma before key=value
test_fail(lambda: parse_action("func|arg1=val1,val2,arg2=val3"), 
         contains="Invalid argument format in")
test_fail(lambda: parse_action("func|arg1=val1,badpart,arg2=val2"), 
         contains="Found a comma before key=value pair")

# Test invalid cases - multiple equals in multiple arguments
test_fail(lambda: parse_action("func|arg1=a=b,arg2=c"), 
         contains="Found multiple equals signs in one argument")
test_fail(lambda: parse_action("func|key1=val1,key2=a=b"), 
         contains="Found multiple equals signs in one argument")

# Show that single argument can have multiple equals
test_eq(parse_action("func|arg=a=b=c"), ("func", {"arg": "a=b=c"}))

This is good, but our specification calls for the ability to chain methods and parse them from a dot-seperated string.

Let's implement this now.

In [None]:
#| exporti

def parse_actions(actions_str):
    "Parse a chain of actions like 'f1|a=1.f2|b=2' into a list of (func_name, kwargs)"
    return L(actions_str.split('.')).map(parse_action)

In [None]:
# Single action chain
test_eq(parse_actions("func"), 
       L([("func", {})]))

# Multiple actions without args
test_eq(parse_actions("func1.func2.func3"), 
       L([("func1", {}), ("func2", {}), ("func3", {})]))

# Multiple actions with args
test_eq(parse_actions("func1|a=1.func2|b=2.func3"), 
       L([("func1", {"a": "1"}), ("func2", {"b": "2"}), ("func3", {})]))

In [None]:
# Complex example with spaces
test_eq(parse_actions(" download_html | timeout=30 . select | css=section#foundations . html2md "), 
       L([
           ("download_html", {"timeout": "30"}),
           ("select", {"css": "section#foundations"}),
           ("html2md", {})
       ]))

In [None]:
# Empty string
test_eq(parse_actions(""), L([("", {})]))

Let's now look at calling actual functions based on our list of tuples representation of the action string.

To understand this bit of code, we need to know what `Transform` from fastcore can do.

Here is a basic example demonstrating its capabilities that we will use:

from fastcore.transform import *

### A simple transform that converts text to uppercase
```
class UpperCase(Transform):
    def encodes(self, x): return x.upper()

upper = UpperCase()
test_eq(upper('hello'), 'HELLO')
```

### Transforms can be composed into a pipeline
```
class AddExclamation(Transform):
    def encodes(self, x): return x + '!'
    
pipe = Pipeline([upper, AddExclamation()])
test_eq(pipe('hello'), 'HELLO!')
```

The key features of Transform we'll be using are:
- It provides a standard interface for functions that transform data
- Transforms can be composed into pipelines
- Each transform can have its own parameters
- The pipeline handles passing output from one transform to the next

In our recipe system, we'll use Transform to wrap our functions and create pipelines from our action strings. This gives us a clean way to chain operations while maintaining flexibility in how each operation works.

In [None]:
#| export

_registry = {}

def recipe_transform(name=None):
    "Decorator to register a function as a recipe transform"
    def _inner(f):
        fname = name or f.__name__
        t = Transform(f)
        t.__doc__ = f.__doc__  # Preserve the original docstring
        _registry[fname] = t
        return f
    return _inner

In [None]:
#| exporti

def get_transform(fname, kwargs):
    "Get a transform for function name with arguments"
    if fname not in _registry: 
        raise ValueError(f"Transform {fname} not found. Available: {list(_registry.keys())}")
    return partialler(_registry[fname], **kwargs)

In [None]:
# Test transform registration
@recipe_transform()
def upper(x):
    "Convert input string to uppercase"
    return x.upper()

@recipe_transform('lower')
def _lower(x):
    "Convert input string to lowercase"
    return x.lower()

# Check registry contents
test_eq(list(_registry.keys()), ['upper', 'lower'])

# Test transform retrieval and execution
t1 = get_transform('upper', {})
test_eq(t1('hello'), 'HELLO')

t2 = get_transform('lower', {})
test_eq(t2('HELLO'), 'hello')

# Test transform with arguments
@recipe_transform()
def repeat(x, n=1):
    "Repeats the string `n` times"
    return x * int(n)

t3 = get_transform('repeat', {'n': '3'})
test_eq(t3('ha'), 'hahaha')

# Test error for unknown transform
try: get_transform('unknown', {})
except ValueError as e: test_eq(str(e), f"Transform unknown not found. Available: {list(_registry.keys())}")

The `create_pipeline` function is the heart of our recipe system. It takes an action string and creates a pipeline that can:
1. Execute transforms in sequence, passing output from one to the next
2. Handle nested recipes by detecting when a transform returns a `Recipes` collection
3. Apply remaining transforms to each recipe in the collection

In [None]:
#| exporti

def create_pipeline(actions_str):
    "Create a Transform pipeline from action string that handles Recipes"
    actions = parse_actions(actions_str)
    transforms = L(actions).map(lambda x: get_transform(*x))
    
    def run_pipeline(x):
        for i, t in enumerate(transforms):
            x = t(x)
            # If we get Recipes, apply remaining transforms to each recipe
            if isinstance(x, Recipes):
                remaining = '.'.join(f"{n}|{','.join(f'{k}={v}' for k,v in kw.items())}" 
                                   for n,kw in actions[i+1:])
                if remaining:
                    for r in x.recipes:
                        r.actions = remaining
                return x.run()
        return x
    
    return run_pipeline

In [None]:
#| export

class Recipes:
    "Collection of recipes that can be loaded from a file and executed together"
    def __init__(self, recipes=None):
        self.recipes = L(recipes if recipes else [])
    
    @classmethod
    def from_file(cls, path):
        "Load multiple recipes from a file with recipe blocks"
        p = Path(path)
        text = p.read_text()
        # Split on double newline to separate recipe blocks
        blocks = L(text.split('\n\n')).map(str.strip).filter(bool)
        
        recipes = []
        for block in blocks:
            d = {}
            for line in block.splitlines():
                if ':' not in line: continue
                k,v = line.split(':', 1)
                d[k.strip()] = v.strip()
            if d: recipes.append(Recipe(**d))
        
        return cls(recipes)
    
    def run(self, separator='\n\n'):
        "Run all recipes and combine their outputs with separator"
        return separator.join([r.run() for r in self.recipes])
    
    def __repr__(self):
        return f"Recipes({len(self.recipes)} recipes)"

In [None]:
# Register all transforms we need
@recipe_transform()
def double(x): 
    "Multiply input by 2"
    return x * 2

@recipe_transform()
def upper(x):
    "Convert input string to uppercase"
    return x.upper()

@recipe_transform()
def split(x):
    "Split string into list of strings using optional separator"
    return x.split()

@recipe_transform()
def join(x, sep=' '):
    "Join list of strings using separator (default: space)"
    return sep.join(x)

@recipe_transform()
def split_into_two(x):
    "Split string into two equal parts. String length must be even."
    assert len(x) % 2 == 0
    return x[:len(x)//2], x[len(x)//2:]

# Basic pipeline with single transform
pipe1 = create_pipeline("double")
test_eq(pipe1("ha"), "haha")

# Pipeline with multiple transforms
pipe2 = create_pipeline("double.upper")
test_eq(pipe2("ha"), "HAHA")  # Fixed: double then upper

# Pipeline with with arguments creating recipes
pipe3 = create_pipeline("double.upper.split_into_two.join|sep=,")
test_eq(pipe3("ha"), ["H,A", "H,A"])

Let us also implement adding a `Recipe` to a `Recipe`.

The resultant `Recipe` will first apply the actions from `Recipe#1` and then will process the output with actions from `Recipe#2`.

In [None]:
#| export

@patch
def __add__(self:Recipe, other):
    "Combine two recipes, using output of self as input to other"
    name = f"{self.name}_{other.name}" if self.name and other.name else None
    actions = f"{self.actions}.{other.actions}"
    return self.__class__(name=name, input=self.input, actions=actions)    

In [None]:
# Test basic recipe combination
r1 = Recipe(name='first', input='hello', actions='upper')
r2 = Recipe(name='second', actions='split')
combined = r1 + r2

test_eq(combined.name, 'first_second')
test_eq(combined.input, 'hello')
test_eq(combined.actions, 'upper.split')

# Test combining with missing names
r3 = Recipe(input='world', actions='upper')
r4 = Recipe(actions='split')
combined_no_names = r3 + r4

test_eq(combined_no_names.name, None)
test_eq(combined_no_names.input, 'world')
test_eq(combined_no_names.actions, 'upper.split')

# Test combining with one missing name
r5 = Recipe(name='named', input='test', actions='upper')
r6 = Recipe(actions='split')
combined_one_name = r5 + r6

test_eq(combined_one_name.name, None)
test_eq(combined_one_name.input, 'test')
test_eq(combined_one_name.actions, 'upper.split')

# Test chaining multiple combinations
r7 = Recipe(name='first', input='hello', actions='upper')
r8 = Recipe(name='second', actions='split')
r9 = Recipe(name='third', actions='join|sep=,')
chained = r7 + r8 + r9

test_eq(chained.name, 'first_second_third')
test_eq(chained.input, 'hello')
test_eq(chained.actions, 'upper.split.join|sep=,')

Now let's give our `Recipe` the ability to execute its actions.

In [None]:
#| export

@patch
def run(self:Recipe):
    "Execute the recipe pipeline"
    if self.input is None:
        raise ValueError("Recipe requires input to run")
    if not self.actions: return self.input
    pipe = create_pipeline(self.actions)
    return pipe(self.input)

In [None]:
# Test basic recipe execution
r1 = Recipe(input='hello world', actions='upper')
test_eq(r1.run(), 'HELLO WORLD')

# Test recipe with multiple actions
r2 = Recipe(input='hello world', actions='upper.split')
test_eq(r2.run(), ['HELLO', 'WORLD'])

# Test recipe with parameters
r3 = Recipe(input='hello world', actions='upper.split.join|sep=,')
test_eq(r3.run(), 'HELLO,WORLD')

# Test recipe combination and execution
r4 = Recipe(input='hello world', actions='upper')
r5 = Recipe(actions='split.join|sep=_')
combined = r4 + r5
test_eq(combined.run(), 'HELLO_WORLD')

# Test empty or None actions
r6 = Recipe(input='hello', actions='')
test_eq(r6.run(), 'hello')

r7 = Recipe(input='hello', actions=None)
test_eq(r7.run(), 'hello')

# Test missing input scenarios
test_fail(lambda: Recipe(actions='upper').run(), 
         contains="Recipe requires input to run")

test_fail(lambda: Recipe(input=None, actions='upper').run(), 
         contains="Recipe requires input to run")

# Test recipe with invalid action
test_fail(lambda: Recipe(input='hello', actions='invalid').run(), 
         contains="Transform invalid not found")

In [None]:
#| export

def list_actions():
    "List all registered actions and their docstrings"
    return {name: func.__doc__ for name, func in _registry.items()}

list_actions()

{'upper': 'Convert input string to uppercase',
 'lower': 'Convert input string to lowercase',
 'repeat': 'Repeats the string `n` times',
 'double': 'Multiply input by 2',
 'split': 'Split string into list of strings using optional separator',
 'join': 'Join list of strings using separator (default: space)',
 'split_into_two': 'Split string into two equal parts. String length must be even.'}

In [None]:
#| hide
from nbdev.showdoc import *

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()