Skip to content

Commit

Permalink
Merge pull request #4 from yukihiko-shinoda/support-sigterm
Browse files Browse the repository at this point in the history
Support sigterm
  • Loading branch information
yukihiko-shinoda committed Aug 16, 2021
2 parents a96e0df + cb80880 commit 58c84a6
Show file tree
Hide file tree
Showing 27 changed files with 325 additions and 188 deletions.
19 changes: 14 additions & 5 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,11 +1,20 @@
FROM mstmelody/python-ffmpeg:20201114221500
RUN python -m pip install --upgrade pip \
&& pip --no-cache-dir install pipenv
FROM jrottenberg/ffmpeg:4.4-ubuntu2004
# see: https://linuxize.com/post/how-to-install-python-3-9-on-ubuntu-20-04/
RUN apt-get update && apt-get install -y \
software-properties-common \
&& rm -rf /var/lib/apt/lists/* \
&& add-apt-repository ppa:deadsnakes/ppa
RUN apt-get update && apt-get install -y \
python3.9 \
python3-pip \
&& rm -rf /var/lib/apt/lists/*
# Switch default Python3 to Python 3.9
RUN update-alternatives --install /usr/bin/python3 python3 /usr/bin/python3.9 1
# see: https://pythonspeed.com/articles/activate-virtualenv-dockerfile/
RUN pip --no-cache-dir install pipenv
ENV PIPENV_VENV_IN_PROJECT=1
WORKDIR /workspace
# COPY ./Pipfile ./Pipfile.lock /workspace/
# RUN pipenv install --deploy --dev
COPY . /workspace
RUN pipenv install --skip-lock --dev
ENTRYPOINT [ "pipenv", "run" ]
CMD ["pytest"]
4 changes: 4 additions & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ name = "pypi"
asynccpu = "*"
asyncffmpeg = {path = ".", editable = true}
bump2version = "*"
# Hotfix for Requirementslib which is dependency of Pipenv
# see: https://github.com/sarugaku/requirementslib/issues/296
chardet = "*"
# Hotfix for Pipenv's Bug @see https://github.com/pypa/pipenv/issues/4101
colorama = "*"
coverage = "*"
Expand All @@ -21,6 +24,7 @@ pytest = "*"
pytest-resource-path = "*"
radon = "*"
xenon = "*"
types-psutil = "*"

[packages]
# To control ffmpeg by Python
Expand Down
2 changes: 1 addition & 1 deletion asyncffmpeg/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@ class Error(Exception):
class FFmpegProcessError(Error):
"""FFmpeg process failed."""

def __init__(self, message, exit_code):
def __init__(self, message: str, exit_code: int) -> None:
super().__init__(message)
self.exit_code = exit_code
18 changes: 13 additions & 5 deletions asyncffmpeg/ffmpeg_coroutine.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
"""FFmpeg coroutine interface."""
import asyncio
from asyncio.exceptions import CancelledError
from logging import getLogger
from typing import Awaitable, Callable, Optional, Type, TypeVar
from signal import SIGTERM, signal
from typing import Any, Awaitable, Callable, NoReturn, Optional, Type, TypeVar

from asyncffmpeg.ffmpegprocess.interface import FFmpegProcess
from asyncffmpeg.type_alias import StreamSpec
Expand Down Expand Up @@ -35,14 +37,15 @@ async def execute(
self,
create_stream_spec: Callable[[], Awaitable[StreamSpec]],
*,
after_start: Optional[Callable[[TypeVarFFmpegProcess], Awaitable]] = None
after_start: Optional[Callable[[TypeVarFFmpegProcess], Awaitable[Any]]] = None
) -> None:
"""
Executes FFmpeg process.
This method defines workflow including interruption and logging.
"""
try:
self.logger.debug("FFmpeg coroutine start")
signal(SIGTERM, self.sigterm_hander)
self.ffmpeg_process = self.class_ffmpeg_process(self.time_to_force_termination, await create_stream_spec())
self.logger.debug("Instantiate FFmpeg process finish")
if after_start:
Expand All @@ -58,9 +61,14 @@ async def execute(
self.logger.info("FFmpeg process quit start")
await self.ffmpeg_process.quit(self.time_to_force_termination)
self.logger.info("FFmpeg process quit finish")
raise error
raise
except Exception as error:
self.logger.exception("%s", error)
raise error
self.logger.exception(str(error))
raise
finally:
self.logger.debug("FFmpeg coroutine finish")

# Reason: Can't collect coverage because of termination.
def sigterm_hander(self, _signum: int, _frame: Optional[Any]) -> NoReturn: # pragma: no cover
self.logger.debug("SIGTERM handler: Start")
raise CancelledError()
6 changes: 3 additions & 3 deletions asyncffmpeg/ffmpegprocess/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def __init__(self, time_to_force_termination: int) -> None:
self.realtime_pipe_reader = self.create_realtime_pipe_reader()

@abstractmethod
def create_popen(self) -> Popen:
def create_popen(self) -> Popen[bytes]:
raise NotImplementedError() # pragma: no cover

def create_realtime_pipe_reader(self) -> RealtimePipeReader:
Expand All @@ -51,7 +51,7 @@ async def wait(self) -> None:
self.logger.error("return_code = %d", return_code)
raise FFmpegProcessError(stderr, return_code)

async def quit(self, time_to_force_termination: Optional[int] = None) -> None:
async def quit(self, time_to_force_termination: Optional[float] = None) -> None:
"""
Quits FFmpeg process.
see: https://github.com/kkroening/ffmpeg-python/issues/162#issuecomment-571820244
Expand Down Expand Up @@ -83,5 +83,5 @@ def __init__(self, time_to_force_termination: int, stream_spec: StreamSpec) -> N
super().__init__(time_to_force_termination)

@abstractmethod
def create_popen(self) -> Popen:
def create_popen(self) -> Popen[bytes]:
raise NotImplementedError() # pragma: no cover
8 changes: 5 additions & 3 deletions asyncffmpeg/ffmpegprocess/posix.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Archive process."""
from subprocess import Popen
from typing import Optional

import ffmpeg

Expand All @@ -9,10 +10,11 @@
class FFmpegProcessPosix(FFmpegProcess):
"""FFmpeg process wrapping Popen object."""

def create_popen(self) -> Popen:
return ffmpeg.run_async(self.stream_spec, pipe_stdin=True, pipe_stdout=True, pipe_stderr=True)
def create_popen(self) -> Popen[bytes]:
# Reason: Requires to update ffmpeg-python side.
return ffmpeg.run_async(self.stream_spec, pipe_stdin=True, pipe_stdout=True, pipe_stderr=True) # type: ignore

async def quit(self, time_to_force_termination: int = None) -> None:
async def quit(self, time_to_force_termination: Optional[float] = None) -> None:
# Otherwise, we'll get OSError: [Errno 9] Bad file descriptor.
self.realtime_pipe_reader.stop()
await super().quit(time_to_force_termination)
5 changes: 3 additions & 2 deletions asyncffmpeg/ffmpegprocess/windows.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ def __init__(self, time_to_force_termination: int, argument: List[str]) -> None:
win32api.SetConsoleCtrlHandler(None, False)
win32api.SetConsoleCtrlHandler(self.handle, True)

def create_popen(self) -> Popen:
def create_popen(self) -> Popen[bytes]:
# Reason: This method is instead of ffmpeg.run_async(). pylint: disable=consider-using-with
return Popen(["ffmpeg", *self.argument], stdin=PIPE, stdout=PIPE, stderr=PIPE)

def handle(self, event):
def handle(self, event: int) -> int:
"""Handle console control events (like Ctrl-C)."""
if event in (
win32con.CTRL_C_EVENT,
Expand Down
6 changes: 4 additions & 2 deletions asyncffmpeg/ffmpegprocess/windows_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

# Reason: CREATE_NEW_PROCESS_GROUP is packaged only in Windows
from subprocess import CREATE_NEW_PROCESS_GROUP, PIPE, Popen # type: ignore
from typing import Optional

import ffmpeg

Expand All @@ -21,20 +22,21 @@
class FFmpegProcessWindowsWrapper(FFmpegProcess):
"""FFmpeg process wrapping Popen object."""

def create_popen(self) -> Popen:
def create_popen(self) -> Popen[bytes]:
argument = [
sys.executable,
str(Path(__file__).resolve().parent / "windows.py"),
str(self.time_to_force_termination),
*ffmpeg.get_args(self.stream_spec),
]
self.logger.debug(argument)
# Reason: This method is instead of ffmpeg.run_async(). pylint: disable=consider-using-with
return Popen(argument, creationflags=CREATE_NEW_PROCESS_GROUP, stdout=PIPE, stderr=PIPE)

def create_realtime_pipe_reader(self) -> RealtimePipeReader:
return StringRealtimePipeReader(self.popen)

async def quit(self, time_to_force_termination: int = None) -> None:
async def quit(self, time_to_force_termination: Optional[float] = None) -> None:
self.logger.info(self.realtime_pipe_reader.read_stdout())
self.logger.error(self.realtime_pipe_reader.read_stderr())
self.popen.wait(time_to_force_termination)
Expand Down
22 changes: 11 additions & 11 deletions asyncffmpeg/pipe/pipe_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,41 +3,41 @@
from logging import getLogger
from queue import Queue
from threading import Event, Thread
from typing import IO
from typing import IO, List, Union


class PipeManager:
"""Logs pipe output and stores it into queue."""

def __init__(self, event: Event, pipe: IO):
def __init__(self, event: Event, pipe: IO[bytes]) -> None:
self.event = event
self.queue: Queue = Queue()
self.queue: Queue[bytes] = Queue()
self.logger = getLogger(__name__)
self.thread = self.create_thread(pipe)

def create_thread(self, pipe: IO):
def create_thread(self, pipe: IO[bytes]) -> Thread:
thread = Thread(target=self.log, args=(pipe,))
thread.daemon = True # thread dies with the program
thread.start()
return thread

@abstractmethod
def log(self, pipe: IO):
def log(self, pipe: IO[bytes]) -> None:
raise NotImplementedError() # pragma: no cover

@abstractmethod
def read(self):
def read(self) -> Union[str, List[bytes]]:
raise NotImplementedError() # pragma: no cover


class BytesPipeManager(PipeManager):
"""For bytes."""

def __init__(self, event: Event, pipe: IO, frame_bytes: int):
def __init__(self, event: Event, pipe: IO[bytes], frame_bytes: int):
self.frame_bytes = frame_bytes
super().__init__(event, pipe)

def log(self, pipe: IO):
def log(self, pipe: IO[bytes]) -> None:
with pipe:
try:
while True:
Expand All @@ -50,7 +50,7 @@ def log(self, pipe: IO):
except ValueError as error: # pragma: no cover
self.logger.info(error, exc_info=True)

def read(self):
def read(self) -> List[bytes]:
"""
Vacuums stderr by get_nowait().
see:
Expand All @@ -66,7 +66,7 @@ def read(self):
class StringPipeManager(PipeManager):
"""For strings."""

def log(self, pipe: IO):
def log(self, pipe: IO[bytes]) -> None:
with pipe:
try:
for line in iter(pipe.readline, b""):
Expand All @@ -78,7 +78,7 @@ def log(self, pipe: IO):
except ValueError as error: # pragma: no cover
self.logger.info(error, exc_info=True)

def read(self):
def read(self) -> str:
"""
Vacuums stderr by get_nowait().
see:
Expand Down
29 changes: 15 additions & 14 deletions asyncffmpeg/pipe/realtime_pipe_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,47 +7,47 @@
from abc import abstractmethod
from subprocess import Popen
from threading import Event
from typing import Optional
from typing import List, Optional, Union

from asyncffmpeg.pipe.pipe_manager import BytesPipeManager, StringPipeManager


class RealtimePipeReader:
"""Abstract class."""

def __init__(self):
def __init__(self) -> None:
self.event = Event()

@abstractmethod
def read_stdout(self):
def read_stdout(self) -> Union[str, List[bytes]]:
raise NotImplementedError() # pragma: no cover

@abstractmethod
def read_stderr(self):
def read_stderr(self) -> str:
raise NotImplementedError() # pragma: no cover

@abstractmethod
def stop(self):
def stop(self) -> None:
raise NotImplementedError() # pragma: no cover


class StringRealtimePipeReader(RealtimePipeReader):
"""For strings."""

def __init__(self, popen: Popen):
def __init__(self, popen: Popen[bytes]) -> None:
super().__init__()
assert popen.stdout is not None
assert popen.stderr is not None
self.pipe_manager_stdout = StringPipeManager(self.event, popen.stdout)
self.pipe_manager_stderr = StringPipeManager(self.event, popen.stderr)

def read_stdout(self):
def read_stdout(self) -> str:
return self.pipe_manager_stdout.read()

def read_stderr(self):
def read_stderr(self) -> str:
return self.pipe_manager_stderr.read()

def stop(self):
def stop(self) -> None:
self.event.set()
self.pipe_manager_stdout.thread.join()
self.pipe_manager_stderr.thread.join()
Expand All @@ -56,7 +56,7 @@ def stop(self):
class FFmpegRealtimePipeReader(RealtimePipeReader):
"""For FFmpeg."""

def __init__(self, popen: Popen, *, frame_bytes: Optional[int] = None):
def __init__(self, popen: Popen[bytes], *, frame_bytes: Optional[int] = None):
super().__init__()
assert popen.stdout is not None
assert popen.stderr is not None
Expand All @@ -65,13 +65,14 @@ def __init__(self, popen: Popen, *, frame_bytes: Optional[int] = None):
None if frame_bytes is None else BytesPipeManager(self.event, popen.stdout, frame_bytes)
)

def read_stdout(self):
return self.pipe_manager_stdout.read()
def read_stdout(self) -> List[bytes]:
# Reason: omit if statement for excluding None for performance.
return self.pipe_manager_stdout.read() # type: ignore

def read_stderr(self):
def read_stderr(self) -> str:
return self.pipe_manager_stderr.read()

def stop(self):
def stop(self) -> None:
self.event.set()
self.pipe_manager_stderr.thread.join()
if self.pipe_manager_stdout is not None:
Expand Down
7 changes: 4 additions & 3 deletions asyncffmpeg/type_alias.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
"""Type alias."""
from typing import Dict, List, Tuple, Union
from typing import Any, Dict, List, Tuple, Union

from ffmpeg.nodes import Stream
# Reason: Maybe, requires to update ffmpeg-python side.
from ffmpeg.nodes import Stream # type: ignore

__all__ = ["StreamSpec"]

StreamSpec = Union[None, Stream, List, Tuple, Dict]
StreamSpec = Union[None, Stream, List, Tuple[Any], Dict]
19 changes: 0 additions & 19 deletions mypy.ini

This file was deleted.

0 comments on commit 58c84a6

Please sign in to comment.