A simple pipeline runner for Python tasks that allows you to define and execute a series of tasks in sequence.
pip install logycon-pipeline- Create a pipeline specification file (e.g.,
pipeline.json):
{
"name": "my-pipeline",
"version": "1.0.0",
"description": "My awesome pipeline",
"tasks": [
{
"name": "task1",
"script": "./scripts/task1.py",
"env": {
"CUSTOM_VAR": "value"
}
},
{
"name": "task2",
"script": "./scripts/task2.py"
}
]
}- Use the pipeline in your code:
from logycon_pipeline import Pipeline
async def main():
pipeline = Pipeline('./pipeline.json')
try:
results = await pipeline.run()
print('Pipeline completed successfully:', results)
except Exception as error:
print('Pipeline failed:', error)
if __name__ == '__main__':
import asyncio
asyncio.run(main())Each task script should be a Python script that:
- Performs the required operations
- Outputs its result as a JSON string on the last line of stdout
- Returns 0 on success, non-zero on failure
Example task script:
# scripts/task1.py
import json
import sys
def main():
# Do something
result = {"status": "success", "data": "some data"}
print(json.dumps(result))
return 0
if __name__ == '__main__':
sys.exit(main())The main class for running pipelines.
def __init__(self, spec_path: str)Creates a new pipeline instance from a specification file.
async def run() -> List[TaskResult]Runs all tasks in the pipeline and returns a list of results.
class Task:
name: str
script: str
env: Optional[Dict[str, str]]
class PipelineSpec:
name: str
version: str
description: str
tasks: List[Task]
class TaskResult:
task: str
success: bool
data: Optional[Any]
error: Optional[str]MIT