Skip to content

Commit

Permalink
Implement FlowPipe input_transform support, and test output_transforms
Browse files Browse the repository at this point in the history
  • Loading branch information
willprice committed Apr 30, 2019
1 parent 7f21397 commit 0f73dc0
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 7 deletions.
9 changes: 8 additions & 1 deletion src/flowty/flow_pipe.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import time
from collections import deque
from typing import Iterator

from flowty.cv import Mat

Expand All @@ -20,9 +21,15 @@ def __init__(self,
self.stride = stride
self.dilation = dilation

def _frame_generator(self) -> Iterator:
for frame in iter(self.src):
for transform in self.input_transforms:
frame = transform(frame)
yield frame

def run(self):
frame_iter = self._frame_generator()
frame_queue = deque()
frame_iter = iter(self.src)
while len(frame_queue) < self.dilation:
frame_queue.append(next(frame_iter))
assert len(frame_queue) == self.dilation
Expand Down
34 changes: 28 additions & 6 deletions tests/unit/test_flow_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ def write(self, flow):


class TestFlowPipe:
def difference(reference, target):
print("target: {}, reference: {}".format(target ,reference))
return target - reference

def test_1_flow_field_between_2_frames(self):
flow = self.compute_flow([1, 2])
assert len(flow) == 1
Expand All @@ -30,16 +34,34 @@ def test_2_flow_fields_between_4_frames_when_dilation_is_2(self):
flow = self.compute_flow([1, 2, 3, 5], dilation=2)
assert flow == [np.array([2]), np.array([3])]

def compute_flow(self, frames, dilation=1, stride=1):
src = [np.array([f]) for f in frames]
def test_input_transform_is_applied_to_reference_frames(self):
flow = self.compute_flow([1, 2, 3],
flow_algorithm=lambda reference, target: reference,
input_transforms=[lambda f: f+1])
assert flow == [np.array([2]), np.array([3])]

def flow_algorithm(reference, target):
print("target: {}, reference: {}".format(target ,reference))
return target - reference
def test_input_transform_is_applied_to_target_frames(self):
flow = self.compute_flow([1, 2, 3],
flow_algorithm=lambda reference, target: target,
input_transforms=[lambda f: f+1])
assert flow == [np.array([3]), np.array([4])]

def test_output_transform_is_applied_to_flow(self):
flow = self.compute_flow([1, 2, 3],
output_transforms=[lambda f: f+1])
assert flow == [np.array([2]), np.array([2])]

def compute_flow(self, frames, flow_algorithm=difference, dilation=1, stride=1,
input_transforms=None, output_transforms=None):
src = [np.array([f]) for f in frames]

dest = RecordingDestination()
pipe = FlowPipe(
src, flow_algorithm, dest, dilation=dilation, stride=stride
src, flow_algorithm, dest, dilation=dilation, stride=stride,
input_transforms=input_transforms, output_transforms=output_transforms
)
pipe.run()
return dest.flow



0 comments on commit 0f73dc0

Please sign in to comment.