Skip to content

mroumanos/aiopypes

Repository files navigation

a-i-o-p-y-p-e-s

(pronounced "a-i-o-pipes")

Status GitHub Issues GitHub Pull Requests License


Scalable asyncio pipelines in Python, made easy.

📝 Table of Contents

🧐 About

This package is designed to make building asynchronous streams that balance and scale automatically easy. Built on pure Python -- no dependencies -- this framework can be used for variable, decoupled, task-based workloads like web scraping, database management operations, and more. Scale this out-of-the-box, with minimal hardware and coding, to process 10k+/s on production loads.

Simple pipelines

import aiopypes

app = aiopypes.App()

@app.task(interval=1.0)
async def every_second():
    return datetime.utcnow()

@app.task()
async def task1(stream):
    async for s in stream:
        print(f"streaming from task1: {s}")
        yield obj

if __name__ == '__main__':

    pipeline = every_second \
               .map(task1)

    pipeline.run()

To scaled pipelines

import aiopypes
import aiohttp

app = aiopypes.App()

@app.task(interval=0.1)
async def every_second():
    return "http://www.google.com"

@app.task(scaler=aiopypes.scale.TanhTaskScaler()) #  this scales workers automatically to consume incoming requests
async def task1(stream):
    async for s in stream:
        yield await aiohttp.get(s)

async def task2(stream):
    async for s in stream:
        if s.response_code != 200:
            print("failed request: {s}")
        yield

if __name__ == '__main__':

    pipeline = every_second \
               .map(task1)
               .reduce(task2)

    pipeline.run()

🏁 Getting Started

Start with a simple pipeline, and build out from there!

import aiopypes

app = aiopypes.App()

@app.task(interval=1.0)
async def every_second():
    return datetime.utcnow()

@app.task()
async def task1(stream):
    async for s in stream:
        print(f"streaming from task1: {s}")
        yield obj

if __name__ == '__main__':

    pipeline = every_second \
               .map(task1)

    pipeline.run()

For more, see readthedocs

Prerequisites

aiopypes is based on pure Python (3.5+) and does not require other dependencies.

Installing

Available on PyPi here, installed with pip:

pip install aiopypes

🔧 Running the tests

To be created!

Break down into end to end tests

To be created!

And coding style tests

To be created!

🎈 Usage

Import the library

import aiopypes

Create an App object

app = aiopypes.App()

Create a trigger task

@app.task(interval=1.0)
async def every_second():
    return 1

Create downtream tasks

@app.task()
async def task_n(stream):
    async for s in stream:
        # process "s" here
        yield

Create + configure the pipeline

pipeline = every_second \
            .map(task_n)

Run the pipeline

pipeline.run()

This will run continuously until interrupted.

⛏️ Built Using

✔️ TODO

  • Extend to multithreads
  • Extend to multiprocess
  • Build visualization server
  • Add pipeline pipe functions (join, head, ...)

✍️ Authors

🎉 Acknowledgements

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages