From f74f7ee6ed279e68fddd315bad8970b5ba66174c Mon Sep 17 00:00:00 2001 From: Jadiel deArmas Date: Mon, 15 Apr 2019 15:45:57 -0400 Subject: [PATCH] First commit --- .gitignore | 123 ++++++++++++++++++++++++++++++ LICENSE | 21 +++++ README.md | 24 ++++++ samples/car_counting.py | 19 +++++ setup.py | 36 +++++++++ videoflow/__init__.py | 1 + videoflow/broker/__init__.py | 0 videoflow/consumers/__init__.py | 1 + videoflow/consumers/consumer.py | 6 ++ videoflow/core/__init__.py | 2 + videoflow/core/graph.py | 11 +++ videoflow/core/node.py | 25 ++++++ videoflow/core/stream.py | 31 ++++++++ videoflow/core/task.py | 0 videoflow/processors/__init__.py | 0 videoflow/processors/processor.py | 9 +++ videoflow/producers/__init__.py | 2 + videoflow/producers/producer.py | 6 ++ videoflow/producers/video.py | 22 ++++++ videoflow/version.py | 1 + 20 files changed, 340 insertions(+) create mode 100644 .gitignore create mode 100644 LICENSE create mode 100644 README.md create mode 100644 samples/car_counting.py create mode 100644 setup.py create mode 100644 videoflow/__init__.py create mode 100644 videoflow/broker/__init__.py create mode 100644 videoflow/consumers/__init__.py create mode 100644 videoflow/consumers/consumer.py create mode 100644 videoflow/core/__init__.py create mode 100644 videoflow/core/graph.py create mode 100644 videoflow/core/node.py create mode 100644 videoflow/core/stream.py create mode 100644 videoflow/core/task.py create mode 100644 videoflow/processors/__init__.py create mode 100644 videoflow/processors/processor.py create mode 100644 videoflow/producers/__init__.py create mode 100644 videoflow/producers/producer.py create mode 100644 videoflow/producers/video.py create mode 100644 videoflow/version.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1fcca43 --- /dev/null +++ b/.gitignore @@ -0,0 +1,123 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don’t work, or not +# install all needed dependencies. +#Pipfile.lock + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..4157c2d --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +The MIT License + +Copyright (c) 2019 Jadiel de Armas + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..0034623 --- /dev/null +++ b/README.md @@ -0,0 +1,24 @@ +# Flujo + +**Flujo** is a Python library for stream processing. The library is designed to facilitate easy and quick definition of streaming applications. Yet, it can be also very efficient. + +It can be used in any domain. For example, in the vision domain it can be used to quickly develop video streaming analytics applications such as people detection, people tracking, people counting, etc. + +See the [**samples**](./samples/) folder for sample applications. + +## Building Simple Flow applications + +A flow application usually consists of three parts: + +1. In the first part you define a flow as a directed acyclic graph. A flow is made of producers, processors and consumers. Producers create data and add it to the flow (commonly they will get the data from a source that is external to the flow and add it to it). Processors read data from the flow and write data back to the flow. Consumers read data from the flow but do not write back. + +2. Once a flow is defined you can start it. Starting the flow means that the producers start putting data into the flow and processors and consumers start receiving data. Starting the flow also means allocating resources for producers, processors and consumers. + +3. Once the flow starts, you can also stop it. When you stop the flow, it will happen organically. Producers will stop producing data. The rest of the nodes in the flow will continue running until the pipes run dry. The resources used in the flow are deallocated progressively (not all at the same time). For example, when a producer stops producing data, it deallocates itself and all the resources that are exclusive to him. + +You can stop part of a flow instead of the entire flow. For example, in Figure 1, if producer B receives signal to stop producing data, then the pipe that connects nodes B, C, D, E will eventually run dry and the nodes will deallocate themselves. The rest of the flow in the graph will be left untouched. (remember to connect another producer F to B, as well as make other independent children of F). + + +### Meaning +**Flujo** is the word for **flow** in Spanish. + diff --git a/samples/car_counting.py b/samples/car_counting.py new file mode 100644 index 0000000..b79c42e --- /dev/null +++ b/samples/car_counting.py @@ -0,0 +1,19 @@ +''' +An idea of how a programmer should write a streaming program +''' + +from flujo.core import Stream +from flujo.producers import VideoReader +from flujo.processors.vision import Detector, Tracker, Counter, ImageAnnotator +from flujo.consumers import StreamingServer, EndpointPublisher + +video_reader = VideoReader() +detector = Detector()(video_reader) +tracker = Tracker()(detector) +counter = Counter()(tracker) +video_annotator = ImageAnnotator()(video_reader, detector, tracker, counter) +stream_server = StreamingServer()(video_annotator) +results_publisher = EndpointPublisher()(counter) + +stream = Stream(consumers = [video_annotator, results_publisher]) +stream_handler = stream.run() # So that the run method is non-blocking \ No newline at end of file diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..b033d83 --- /dev/null +++ b/setup.py @@ -0,0 +1,36 @@ +"""Setup script for realpython-reader""" + +import os.path +from setuptools import setup + +# The directory containing this file +HERE = os.path.abspath(os.path.dirname(__file__)) + +# The text of the README file +with open(os.path.join(HERE, "README.md")) as fid: + README = fid.read() + +# This call to setup() does all the work +exec(open('videoflow/version.py').read()) +setup( + name = "videoflow", + version = __version__, + description="Python video streams processing library", + long_description = README, + long_description_content_type = "text/markdown", + url = "https://github.com/jadielam/videoflow", + author = "Jadiel de Armas", + author_email = "jadielam@gmail.com", + license = "MIT", + classifiers = [ + "License :: OSI Approved :: MIT License", + "Programming Language :: Python", + "Programming Language :: Python :: 2", + "Programming Language :: Python :: 3", + ], + packages = ["videoflow"], + include_package_data = True, + install_requires = [ + "opencv-python" + ] +) \ No newline at end of file diff --git a/videoflow/__init__.py b/videoflow/__init__.py new file mode 100644 index 0000000..7152555 --- /dev/null +++ b/videoflow/__init__.py @@ -0,0 +1 @@ +from .version import __version__ \ No newline at end of file diff --git a/videoflow/broker/__init__.py b/videoflow/broker/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/videoflow/consumers/__init__.py b/videoflow/consumers/__init__.py new file mode 100644 index 0000000..09c568e --- /dev/null +++ b/videoflow/consumers/__init__.py @@ -0,0 +1 @@ +from .consumer import Consumer \ No newline at end of file diff --git a/videoflow/consumers/consumer.py b/videoflow/consumers/consumer.py new file mode 100644 index 0000000..430ce4c --- /dev/null +++ b/videoflow/consumers/consumer.py @@ -0,0 +1,6 @@ +from ..core.node import Leaf + +class Consumer(Leaf): + def consume(item): + raise NotImplemented('consume function needs to be implemented\ + by subclass') diff --git a/videoflow/core/__init__.py b/videoflow/core/__init__.py new file mode 100644 index 0000000..90f2643 --- /dev/null +++ b/videoflow/core/__init__.py @@ -0,0 +1,2 @@ +from .stream import Stream +from .graph import add_node \ No newline at end of file diff --git a/videoflow/core/graph.py b/videoflow/core/graph.py new file mode 100644 index 0000000..b12d47e --- /dev/null +++ b/videoflow/core/graph.py @@ -0,0 +1,11 @@ +class ExecutionGraph: + pass + +class TaskRegistry: + pass + +_execution_graph = Graph() + +def add_task(): + pass + diff --git a/videoflow/core/node.py b/videoflow/core/node.py new file mode 100644 index 0000000..d486ac0 --- /dev/null +++ b/videoflow/core/node.py @@ -0,0 +1,25 @@ +class Node(): + def __init__(self): + self._parents = None + + def __repr__(self): + return self.__class__.__name__ + + def __call__(self, *parents): + self._parents = [] + for parent in parents: + assert isinstance(parent, Node) and not isinstance(parent, Leaf), + '%s is not a non-leaf node' % str(parent) + self._parents.append(parent) + + @property + def parents(self): + return self._parents + +class Leaf(Node): + def __init__(self): + pass + + + + diff --git a/videoflow/core/stream.py b/videoflow/core/stream.py new file mode 100644 index 0000000..a9fa16f --- /dev/null +++ b/videoflow/core/stream.py @@ -0,0 +1,31 @@ +from ..consumers import Consumer + +class Stream: + def __init__(self, *consumers): + for consumer in consumers: + assert isinstance(consumer, Consumer), '%s is not instance of Consumer' % str(consumer) + self._consumers = consumers + + def _compile(self): + pass + + def run(self): + #1. Build graph from consumers all the way to producers + for consumer in self._consumers: + consumer. + + #2. Create pub/sub plumbing and task wrappers around producers, + #consumers and processors. The task wrapper publishes and subscribes + #from pub/sub channels, and passes specific data from those channels + #to the processors and consumers. + + #3. Build a topological sort of the graph from producers to consumers + + #4. Build task registry. + + #4. For each of the tasks, put them to run in the topological order. + + #5. Return a stream handler that lets you stop the flow into the stream + #by turning off the producers, and turning off everyone else after + #that in the topological sort. + a = 5 \ No newline at end of file diff --git a/videoflow/core/task.py b/videoflow/core/task.py new file mode 100644 index 0000000..e69de29 diff --git a/videoflow/processors/__init__.py b/videoflow/processors/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/videoflow/processors/processor.py b/videoflow/processors/processor.py new file mode 100644 index 0000000..b21b101 --- /dev/null +++ b/videoflow/processors/processor.py @@ -0,0 +1,9 @@ +from ..core.node import Node + +class Processor(Node): + def __init__(self): + self._parents = None + + def process(input): + raise NotImplemented("process function needs to be \ + implemented by subclass") diff --git a/videoflow/producers/__init__.py b/videoflow/producers/__init__.py new file mode 100644 index 0000000..d9d585a --- /dev/null +++ b/videoflow/producers/__init__.py @@ -0,0 +1,2 @@ +from .producer import Producer +from .video import VideoProducer \ No newline at end of file diff --git a/videoflow/producers/producer.py b/videoflow/producers/producer.py new file mode 100644 index 0000000..85f3eb3 --- /dev/null +++ b/videoflow/producers/producer.py @@ -0,0 +1,6 @@ +from ..core.node import Node + +class Producer(Node): + def __iter__(self): + raise NotImplemented('Method needs to be implemented by subclass') + diff --git a/videoflow/producers/video.py b/videoflow/producers/video.py new file mode 100644 index 0000000..1558ce8 --- /dev/null +++ b/videoflow/producers/video.py @@ -0,0 +1,22 @@ +import cv2 + +from .producer import Producer + +class VideoProducer(Producer): + def __init__(self, video_file : str): + ''' + Arguments: + - video_file: path to video file + ''' + self._video_file = video_file + + def __iter__(self): + video = cv2.VideoCapture(self._video_file) + while (video.isOpened()): + success, frame = video.read() + if not success: + break + yield frame + video.release() + + diff --git a/videoflow/version.py b/videoflow/version.py new file mode 100644 index 0000000..5e3048b --- /dev/null +++ b/videoflow/version.py @@ -0,0 +1 @@ +__version__ = '0.1' \ No newline at end of file