Skip to content

Commit

Permalink
First commit
Browse files Browse the repository at this point in the history
  • Loading branch information
Jadiel deArmas authored and Jadiel deArmas committed Apr 15, 2019
0 parents commit f74f7ee
Show file tree
Hide file tree
Showing 20 changed files with 340 additions and 0 deletions.
123 changes: 123 additions & 0 deletions .gitignore
@@ -0,0 +1,123 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
.pytest_cache/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pyenv
.python-version

# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don’t work, or not
# install all needed dependencies.
#Pipfile.lock

# celery beat schedule file
celerybeat-schedule

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/
21 changes: 21 additions & 0 deletions LICENSE
@@ -0,0 +1,21 @@
The MIT License

Copyright (c) 2019 Jadiel de Armas

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
24 changes: 24 additions & 0 deletions README.md
@@ -0,0 +1,24 @@
# Flujo

**Flujo** is a Python library for stream processing. The library is designed to facilitate easy and quick definition of streaming applications. Yet, it can be also very efficient.

It can be used in any domain. For example, in the vision domain it can be used to quickly develop video streaming analytics applications such as people detection, people tracking, people counting, etc.

See the [**samples**](./samples/) folder for sample applications.

## Building Simple Flow applications

A flow application usually consists of three parts:

1. In the first part you define a flow as a directed acyclic graph. A flow is made of producers, processors and consumers. Producers create data and add it to the flow (commonly they will get the data from a source that is external to the flow and add it to it). Processors read data from the flow and write data back to the flow. Consumers read data from the flow but do not write back.

2. Once a flow is defined you can start it. Starting the flow means that the producers start putting data into the flow and processors and consumers start receiving data. Starting the flow also means allocating resources for producers, processors and consumers.

3. Once the flow starts, you can also stop it. When you stop the flow, it will happen organically. Producers will stop producing data. The rest of the nodes in the flow will continue running until the pipes run dry. The resources used in the flow are deallocated progressively (not all at the same time). For example, when a producer stops producing data, it deallocates itself and all the resources that are exclusive to him.

You can stop part of a flow instead of the entire flow. For example, in Figure 1, if producer B receives signal to stop producing data, then the pipe that connects nodes B, C, D, E will eventually run dry and the nodes will deallocate themselves. The rest of the flow in the graph will be left untouched. (remember to connect another producer F to B, as well as make other independent children of F).


### Meaning
**Flujo** is the word for **flow** in Spanish.

19 changes: 19 additions & 0 deletions samples/car_counting.py
@@ -0,0 +1,19 @@
'''
An idea of how a programmer should write a streaming program
'''

from flujo.core import Stream
from flujo.producers import VideoReader
from flujo.processors.vision import Detector, Tracker, Counter, ImageAnnotator
from flujo.consumers import StreamingServer, EndpointPublisher

video_reader = VideoReader()
detector = Detector()(video_reader)
tracker = Tracker()(detector)
counter = Counter()(tracker)
video_annotator = ImageAnnotator()(video_reader, detector, tracker, counter)
stream_server = StreamingServer()(video_annotator)
results_publisher = EndpointPublisher()(counter)

stream = Stream(consumers = [video_annotator, results_publisher])
stream_handler = stream.run() # So that the run method is non-blocking
36 changes: 36 additions & 0 deletions setup.py
@@ -0,0 +1,36 @@
"""Setup script for realpython-reader"""

import os.path
from setuptools import setup

# The directory containing this file
HERE = os.path.abspath(os.path.dirname(__file__))

# The text of the README file
with open(os.path.join(HERE, "README.md")) as fid:
README = fid.read()

# This call to setup() does all the work
exec(open('videoflow/version.py').read())
setup(
name = "videoflow",
version = __version__,
description="Python video streams processing library",
long_description = README,
long_description_content_type = "text/markdown",
url = "https://github.com/jadielam/videoflow",
author = "Jadiel de Armas",
author_email = "jadielam@gmail.com",
license = "MIT",
classifiers = [
"License :: OSI Approved :: MIT License",
"Programming Language :: Python",
"Programming Language :: Python :: 2",
"Programming Language :: Python :: 3",
],
packages = ["videoflow"],
include_package_data = True,
install_requires = [
"opencv-python"
]
)
1 change: 1 addition & 0 deletions videoflow/__init__.py
@@ -0,0 +1 @@
from .version import __version__
Empty file added videoflow/broker/__init__.py
Empty file.
1 change: 1 addition & 0 deletions videoflow/consumers/__init__.py
@@ -0,0 +1 @@
from .consumer import Consumer
6 changes: 6 additions & 0 deletions videoflow/consumers/consumer.py
@@ -0,0 +1,6 @@
from ..core.node import Leaf

class Consumer(Leaf):
def consume(item):
raise NotImplemented('consume function needs to be implemented\
by subclass')
2 changes: 2 additions & 0 deletions videoflow/core/__init__.py
@@ -0,0 +1,2 @@
from .stream import Stream
from .graph import add_node
11 changes: 11 additions & 0 deletions videoflow/core/graph.py
@@ -0,0 +1,11 @@
class ExecutionGraph:
pass

class TaskRegistry:
pass

_execution_graph = Graph()

def add_task():
pass

25 changes: 25 additions & 0 deletions videoflow/core/node.py
@@ -0,0 +1,25 @@
class Node():
def __init__(self):
self._parents = None

def __repr__(self):
return self.__class__.__name__

def __call__(self, *parents):
self._parents = []
for parent in parents:
assert isinstance(parent, Node) and not isinstance(parent, Leaf),
'%s is not a non-leaf node' % str(parent)
self._parents.append(parent)

@property
def parents(self):
return self._parents

class Leaf(Node):
def __init__(self):
pass




31 changes: 31 additions & 0 deletions videoflow/core/stream.py
@@ -0,0 +1,31 @@
from ..consumers import Consumer

class Stream:
def __init__(self, *consumers):
for consumer in consumers:
assert isinstance(consumer, Consumer), '%s is not instance of Consumer' % str(consumer)
self._consumers = consumers

def _compile(self):
pass

def run(self):
#1. Build graph from consumers all the way to producers
for consumer in self._consumers:
consumer.

#2. Create pub/sub plumbing and task wrappers around producers,
#consumers and processors. The task wrapper publishes and subscribes
#from pub/sub channels, and passes specific data from those channels
#to the processors and consumers.

#3. Build a topological sort of the graph from producers to consumers

#4. Build task registry.

#4. For each of the tasks, put them to run in the topological order.

#5. Return a stream handler that lets you stop the flow into the stream
#by turning off the producers, and turning off everyone else after
#that in the topological sort.
a = 5
Empty file added videoflow/core/task.py
Empty file.
Empty file.
9 changes: 9 additions & 0 deletions videoflow/processors/processor.py
@@ -0,0 +1,9 @@
from ..core.node import Node

class Processor(Node):
def __init__(self):
self._parents = None

def process(input):
raise NotImplemented("process function needs to be \
implemented by subclass")
2 changes: 2 additions & 0 deletions videoflow/producers/__init__.py
@@ -0,0 +1,2 @@
from .producer import Producer
from .video import VideoProducer
6 changes: 6 additions & 0 deletions videoflow/producers/producer.py
@@ -0,0 +1,6 @@
from ..core.node import Node

class Producer(Node):
def __iter__(self):
raise NotImplemented('Method needs to be implemented by subclass')

22 changes: 22 additions & 0 deletions videoflow/producers/video.py
@@ -0,0 +1,22 @@
import cv2

from .producer import Producer

class VideoProducer(Producer):
def __init__(self, video_file : str):
'''
Arguments:
- video_file: path to video file
'''
self._video_file = video_file

def __iter__(self):
video = cv2.VideoCapture(self._video_file)
while (video.isOpened()):
success, frame = video.read()
if not success:
break
yield frame
video.release()


1 change: 1 addition & 0 deletions videoflow/version.py
@@ -0,0 +1 @@
__version__ = '0.1'

0 comments on commit f74f7ee

Please sign in to comment.