Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge GlobalNode and MergeOutputNode #325

Merged
merged 6 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/ffmpeg/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from ffmpeg.schema import StreamType

from .dag.nodes import FilterableStream, FilterNode, InputNode, MergeOutputsNode, OutputNode, OutputStream
from .dag.nodes import FilterableStream, FilterNode, GlobalNode, GlobalStream, InputNode, OutputNode, OutputStream
from .streams.audio import AudioStream
from .streams.av import AVStream
from .streams.video import VideoStream
Expand Down Expand Up @@ -47,7 +47,7 @@ def output(*streams: FilterableStream, filename: str, **kwargs: Any) -> OutputSt
return OutputNode(filename=filename, inputs=streams, kwargs=tuple(kwargs.items())).stream()


def merge_outputs(*streams: OutputStream) -> OutputStream:
def merge_outputs(*streams: OutputStream) -> GlobalStream:
"""
Merge multiple output streams into one.

Expand All @@ -57,7 +57,7 @@ def merge_outputs(*streams: OutputStream) -> OutputStream:
Returns:
The merged output stream.
"""
return MergeOutputsNode(inputs=streams).stream()
return GlobalNode(inputs=streams).stream()


def vfilter(
Expand Down
3 changes: 1 addition & 2 deletions src/ffmpeg/dag/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .nodes import FilterableStream, FilterNode, GlobalNode, InputNode, MergeOutputsNode, OutputNode, OutputStream
from .nodes import FilterableStream, FilterNode, GlobalNode, InputNode, OutputNode, OutputStream
from .schema import Node, Stream

__all__ = [
Expand All @@ -10,5 +10,4 @@
"InputNode",
"OutputNode",
"OutputStream",
"MergeOutputsNode",
]
247 changes: 174 additions & 73 deletions src/ffmpeg/dag/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import os.path
import shlex
import subprocess
from dataclasses import dataclass
from dataclasses import dataclass, replace
from typing import TYPE_CHECKING, Any

from ..exceptions import FFMpegExecuteError, FFMpegTypeError, FFMpegValueError
Expand Down Expand Up @@ -287,45 +287,6 @@ def __post_init__(self) -> None:
assert self.index is not None, "Filter streams must have an index"


@dataclass(frozen=True, kw_only=True)
class GlobalNode(Node):
"""
A node that can be used to set global options
"""

inputs: tuple["OutputStream"]

@override
def repr(self) -> str:
return " ".join(self.get_args())

def stream(self) -> "OutputStream":
"""
Return the output stream of this node

Returns:
the output stream
"""
return OutputStream(node=self)

@override
def get_args(self, context: DAGContext = None) -> list[str]:
commands = []
for key, value in self.kwargs:
# Options which do not take arguments are boolean options,
# and set the corresponding value to true. They can be set to
# false by prefixing the option name with "no". For example
# using "-nofoo" will set the boolean option with name "foo" to false.
if isinstance(value, bool):
if value is True:
commands += [f"-{key}"]
else:
commands += [f"-no{key}"]
else:
commands += [f"-{key}", str(value)]
return commands


@dataclass(frozen=True, kw_only=True)
class InputNode(Node):
"""
Expand Down Expand Up @@ -434,9 +395,9 @@ def get_args(self, context: DAGContext = None) -> list[str]:

@dataclass(frozen=True, kw_only=True)
class OutputStream(Stream):
node: OutputNode | GlobalNode | MergeOutputsNode
node: OutputNode

def global_args(self, **kwargs: Any) -> "OutputStream":
def global_args(self, **kwargs: Any) -> GlobalStream:
"""
Add extra global command-line argument

Expand All @@ -448,7 +409,7 @@ def global_args(self, **kwargs: Any) -> "OutputStream":
"""
return GlobalNode(inputs=(self,), kwargs=tuple(kwargs.items())).stream()

def merge_outputs(self, *streams: OutputStream) -> OutputStream:
def merge_outputs(self, *streams: OutputStream) -> GlobalStream:
"""
Merge multiple output streams into one.

Expand All @@ -458,9 +419,9 @@ def merge_outputs(self, *streams: OutputStream) -> OutputStream:
Returns:
The merged output stream.
"""
return MergeOutputsNode(inputs=(self, *streams)).stream()
return GlobalNode(inputs=(self, *streams)).stream()

def overwrite_output(self) -> "OutputStream":
def overwrite_output(self) -> GlobalStream:
"""
Overwrite output files without asking (ffmpeg `-y` option)

Expand All @@ -478,6 +439,173 @@ def compile(
"""
Build command-line for invoking ffmpeg.

Args:
cmd: the command to invoke ffmpeg
overwrite_output: whether to overwrite output files without asking
auto_fix: whether to automatically fix the stream

Returns:
the command-line
"""
return self.global_args().compile(cmd, overwrite_output=overwrite_output, auto_fix=auto_fix)

def compile_line(
self,
cmd: str | list[str] = "ffmpeg",
overwrite_output: bool = False,
auto_fix: bool = True,
) -> str:
"""
Build command-line for invoking ffmpeg.

Args:
cmd: the command to invoke ffmpeg
overwrite_output: whether to overwrite output files without asking
auto_fix: whether to automatically fix the stream

Returns:
the command-line
"""
return self.global_args().compile_line(cmd, overwrite_output=overwrite_output, auto_fix=auto_fix)

def run_async(
self,
cmd: str | list[str] = "ffmpeg",
pipe_stdin: bool = False,
pipe_stdout: bool = False,
pipe_stderr: bool = False,
quiet: bool = False,
overwrite_output: bool = False,
auto_fix: bool = True,
) -> subprocess.Popen[bytes]:
"""
Run ffmpeg asynchronously.

Args:
cmd: the command to invoke ffmpeg
pipe_stdin: whether to pipe stdin
pipe_stdout: whether to pipe stdout
pipe_stderr: whether to pipe stderr
quiet: whether to pipe stderr to stdout
overwrite_output: whether to overwrite output files without asking
auto_fix: whether to automatically fix the stream

Returns:
the process
"""

return self.global_args().run_async(
cmd,
pipe_stdin=pipe_stdin,
pipe_stdout=pipe_stdout,
pipe_stderr=pipe_stderr,
quiet=quiet,
overwrite_output=overwrite_output,
auto_fix=auto_fix,
)

def run(
self,
cmd: str | list[str] = "ffmpeg",
capture_stdout: bool = False,
capture_stderr: bool = False,
input: bytes | None = None,
quiet: bool = False,
overwrite_output: bool = False,
auto_fix: bool = True,
) -> tuple[bytes, bytes]:
"""
Run ffmpeg synchronously.

Args:
cmd: the command to invoke ffmpeg
capture_stdout: whether to capture stdout
capture_stderr: whether to capture stderr
input: the input
quiet: whether to pipe stderr to stdout
overwrite_output: whether to overwrite output files without asking
auto_fix: whether to automatically fix the stream

Returns:
the stdout
the stderr
"""

return self.global_args().run(
cmd,
capture_stdout=capture_stdout,
capture_stderr=capture_stderr,
input=input,
quiet=quiet,
overwrite_output=overwrite_output,
auto_fix=auto_fix,
)


@dataclass(frozen=True, kw_only=True)
class GlobalNode(Node):
"""
A node that can be used to set global options
"""

inputs: tuple[OutputStream, ...]

@override
def repr(self) -> str:
return " ".join(self.get_args())

def stream(self) -> "GlobalStream":
"""
Return the output stream of this node

Returns:
the output stream
"""
return GlobalStream(node=self)

@override
def get_args(self, context: DAGContext = None) -> list[str]:
commands = []
for key, value in self.kwargs:
# Options which do not take arguments are boolean options,
# and set the corresponding value to true. They can be set to
# false by prefixing the option name with "no". For example
# using "-nofoo" will set the boolean option with name "foo" to false.
if isinstance(value, bool):
if value is True:
commands += [f"-{key}"]
else:
commands += [f"-no{key}"]
else:
commands += [f"-{key}", str(value)]
return commands


@dataclass(frozen=True, kw_only=True)
class GlobalStream(Stream):
node: GlobalNode

def global_args(self, **kwargs: Any) -> GlobalStream:
"""
Add extra global command-line argument

Args:
**kwargs: the extra arguments

Returns:
the output stream
"""
kwargs = dict(self.node.kwargs) | kwargs

new_node = replace(self.node, kwargs=tuple(kwargs.items()))
return new_node.stream()

def compile(
self, cmd: str | list[str] = "ffmpeg", overwrite_output: bool = False, auto_fix: bool = True
) -> list[str]:
"""
Build command-line for invoking ffmpeg.

Args:
cmd: the command to invoke ffmpeg
overwrite_output: whether to overwrite output files without asking
Expand All @@ -492,7 +620,7 @@ def compile(
cmd = [cmd]

if overwrite_output:
return cmd + compile(self, auto_fix=auto_fix) + ["-y"]
return self.global_args(y=True).compile(cmd, auto_fix=auto_fix)

return cmd + compile(self, auto_fix=auto_fix)

Expand Down Expand Up @@ -605,30 +733,3 @@ def run(
)

return stdout, stderr


@dataclass(frozen=True, kw_only=True)
class MergeOutputsNode(Node):
"""
A node that can be used to merge multiple outputs
"""

inputs: tuple[OutputStream, ...]

@override
def repr(self) -> str:
return "Merge"

def stream(self) -> "OutputStream":
"""
Return the output stream of this node

Returns:
the output stream
"""
return OutputStream(node=self)

@override
def get_args(self, context: DAGContext = None) -> list[str]:
# NOTE: the node just used to group outputs, no need to add any commands
return []
4 changes: 2 additions & 2 deletions src/ffmpeg/dag/tests/__snapshots__/test_nodes.ambr
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@
])
# ---
# name: test_node_prop[merge-output-node][__repr__]
"MergeOutputsNode(kwargs=(), inputs=(OutputStream(node=OutputNode(kwargs=(), inputs=(AVStream(node=InputNode(kwargs=(), inputs=(), filename='tmp1.mp4'), index=None),), filename='out1.mp4'), index=None), OutputStream(node=OutputNode(kwargs=(), inputs=(AVStream(node=InputNode(kwargs=(), inputs=(), filename='tmp2.mp4'), index=None),), filename='out2.mp4'), index=None)))"
"GlobalNode(kwargs=(), inputs=(OutputStream(node=OutputNode(kwargs=(), inputs=(AVStream(node=InputNode(kwargs=(), inputs=(), filename='tmp1.mp4'), index=None),), filename='out1.mp4'), index=None), OutputStream(node=OutputNode(kwargs=(), inputs=(AVStream(node=InputNode(kwargs=(), inputs=(), filename='tmp2.mp4'), index=None),), filename='out2.mp4'), index=None)))"
# ---
# name: test_node_prop[merge-output-node][f.repr]
'Merge'
''
# ---
# name: test_node_prop[merge-output-node][get_args]
list([
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[
"ffmpeg",
"-noy",
"-i",
"tmp1.mp4",
"-i",
"tmp2.mp4",
"-filter_complex",
"[0:v][1:v]concat[s0]",
"-map",
"[s0]",
"output.mp4"
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[
"-noy"
]
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Loading