Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory file -> numpy -> memory file #156

Open
Sangkwun opened this issue Dec 21, 2018 · 5 comments
Open

Memory file -> numpy -> memory file #156

Sangkwun opened this issue Dec 21, 2018 · 5 comments
Labels

Comments

@Sangkwun
Copy link

Sangkwun commented Dec 21, 2018

Thanks for good library for us.
As mentioned in title I want to process video in raw byte to numpy
then encode it into raw byte video.
I succeed to read byte to mp4 with below comment.

decode = (
                ffmpeg
                .input('pipe:')
                .output('pipe:')
                .get_args()
            )
output_data = p.communicate(input=input_data)[0]

#49 (comment)

It's okay for don't process anything.
But I want to process like your tensorflow stream example

So I tried with two process but it's doesn't work and process.
Below test is for video(byte) => numpy => video(file)

def start_encode_process():
    logger.info('Starting ffmpeg process1')
    args = (
        ffmpeg
        .input('pipe:')
        .output('pipe:', format='rawvideo', pix_fmt='rgb24')
        .compile()
    )
    return subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE)

def start_decode_process(out_filename, width, height):
    logger.info('Starting ffmpeg process2')
    args = (
        ffmpeg
        .input('pipe:', format='rawvideo', pix_fmt='rgb24', s='{}x{}'.format(width, height))
        .output(out_filename, pix_fmt='yuv420p')
        .overwrite_output()
        .compile()
    )
    return subprocess.Popen(args, stdin=subprocess.PIPE)

process1.stdin.write(video_byte)
while True:
    in_frame = read_frame(process1, width, height)
    out_frame = process_frame_simple(in_frame)
    write_frame(process2, out_frame)

I think first problem is stdin. Above reading raw video byte use communicate method for stdin.
But below case it's not suit method cause need to process by each frame.
Do you have any idea for this?

Thanks for reading.

@kkroening
Copy link
Owner

Because you're reading and writing individual pipe file descriptors with a single python process, you're encountering a deadlock: The process1.stdin.write line blocks and waits until process1 is finished reading all the data that's written. Process1 reads a bit of data, does some work, then writes to its output stream. However, because nothing is reading from process1's output stream process1 blocks and waits. So then both your python process and process1 are blocked, and no progress is made.

The reason it's a problem in this example but fine in the tensorflow example is that each of the ffmpeg processes in the tensorflow example only uses one pipe, whereas process1 here has both an input and output pipe (and only a single python thread).

This is a common issue when working with blocking pipes, regardless of choice of language.

Here are a few options I can think of off the top of my head:

Option 1: Use threads (or python multiprocessing, gevent, etc) so that one thread is responsible for pumping data into process1, and another thread that's responsible for pumping data out of process1 and into process2. Both threads must be okay with being blocked.

Option 2: Don't use a stdin pipe for process1. If you don't need to feed data to process1 from the same python process/thread that's doing the in-memory numpy processing and can avoid doing so, then it gets a lot simpler because you avoid this deadlock scenario.

Option 3: Use non-blocking IO. Basically the process1.stdin.write gets replaced with a call that doesn't block so that you avoid the deadlock. (Error handling and system-specific quirks can be really annoying here though, so YMMV; from my experience this ends up being the most complex/error-prone solution unless you have something like gevent do it for you, but in that case see option 2)

Option 4: Run only one ffmpeg process at a time and use subprocess.communicate. The subprocess communicate method gets around deadlock issues with running a child process with both stdin+stdout pipes, but you'll need to have all the input data available beforehand, and then you'll have to process the output of process1 all at once, meaning the entire thing has to fit in memory. The processed data is then fed into process2. If you only have a few seconds of video and don't need it to run in realtime then this might be feasible (and pretty simple), otherwise it's completely impractical.

Some related, potentially useful search terms / research topics:

  • "Unix pipe deadlocks"
  • "Subprocess communicate deadlock"
  • "Non-blocking IO python"
  • etc

@kkroening
Copy link
Owner

kkroening commented Dec 22, 2018

Above reading raw video byte use communicate method for stdin.

FYI, the communicate method is fundamentally different than the second example you posted above - which I think you're aware of, but for reference, you might want to take a look at subprocess Popen.communicate to see how it differs from working with individual pipes. (Also pay close attention to the note about buffering)

@Sangkwun
Copy link
Author

Thanks for your kind suggestion!
As your suggestion I try to figure it with custom process of multiprocess.
I succeed to make file with your suggestion but the frame is gone somewhere.

Q. Mismatching of number of frames between in/out.

My test video has total 2793 frames but output video has 1396 frames.
the video speed is getting faster 2x than original cause of video frame in same frame rate.

get information from video memory

First of all I modified probe to read video information from video byte.
Some one can get hint from here.
I can make PR with combine version of probe later.

def probe(video_byte, cmd='ffprobe'):
    """Run ffprobe on the specified file and return a JSON representation of the output.

    Raises:
        :class:`ffmpeg.Error`: if ffprobe returns a non-zero exit code,
            an :class:`Error` is returned with a generic error message.
            The stderr output can be retrieved by accessing the
            ``stderr`` property of the exception.
    """
    args = [cmd, '-show_format', '-show_streams', '-of', 'json', '-i' ,'pipe:']
    p = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    out = p.communicate(input=video_byte)[0]
    return json.loads(out.decode('utf-8'))

process for encode and decode

My process for ffmpeg is like below.
I thought vframes argument can handle this problem but not worked well.
stdout in decode_process is not needed now but I let them for test.

def create_encode_process(nframes):
    logger.info('Create ffmpeg process1')
    args = (
        ffmpeg
        .input('pipe:')
        .output('pipe:', format='rawvideo', pix_fmt='rgb24', r=frame_rate, vframes=nframes)
        .compile()
    )
    return subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE)

def create_decode_process(out_filename, width, height, frame_rate, nfames):
    logger.info('Create ffmpeg process2')
    args = (
        ffmpeg
        .input('pipe:', format='rawvideo', pix_fmt='rgb24', s='{}x{}'.format(width, height), r=frame_rate)
        .output(out_filename, pix_fmt='yuv420p', vframes=nframes)
        .overwrite_output()
        .compile()
    )
    return subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE)

My Feeder process

Below two things are Process for feeding and numpy process.
I think this process is not good doesn't quit. So I need to fix.

class Feeder(Process):
    def __init__(self, input, process):
        super().__init__()
        self._input = input
        self._process = process
        self._buffer = 1
        logger.info('Create Feeder')
    
    def run(self):
        logger.info('write data start')
        for i in range(0, len(self._input), self._buffer):
            self._process.stdin.write(
                self._input[i:i + self._buffer]
            )
        self._process.stdin.close()
class Processor(Process):
    def __init__(self, process1, process2, width, height, nframe):
        super().__init__()
        self._process1 = process1
        self._process2 = process2
        self._width = width
        self._height = height
        self._frame_size = self._width * self._height * 3
        self._nframe = nframe
        logger.info('Create Processor')

    def _read_frame(self, process):
        in_bytes = process.stdout.read(self._frame_size)

        frame = (
            np
            .frombuffer(in_bytes, np.uint8)
            .reshape([self._width, self._height, 3])
        )

        return frame

    def _process_frame_simple(self, frame):
        '''Simple processing example: darken frame.'''
        return frame * 0.3

    def _write_frame(self, process, frame):
        logger.debug('Writing frame')
        process.stdin.write(
            frame
            .astype(np.uint8)
            .tobytes()
        )

    def run(self):
        n = 0
        while True:
            self._process1.stdout.read(width * height * 3)
            in_bytes = process1.stdout.read(width * height * 3)
            if not in_bytes:
                break
            frame = (
                        np
                        .frombuffer(in_bytes, np.uint8)
                    )
            frame = self._process_frame_simple(frame)
            self._process2.stdin.write(
                frame
                .astype(np.uint8)
                .tobytes()
            )
            n += 1
            logger.info('{} frames'.format(n))
        self._process2.stdin.close()
        logger.info(len(self._process2.stdout.read()))

Futhermore question

My final goal is receive video data in memory is it need one more process for store?

Anyway I really appreciate your advice and it makes me to understand python and ffmpeg.
Happy x-mas and New year.

@Sangkwun
Copy link
Author

Sorry The frame rate was cause of my mistake.
I read stout twice one was for test but i forgot to delete it.
Anyway frame and framerate doesn't have problem!

@fake-warrior8
Copy link

Thanks for good library for us. As mentioned in title I want to process video in raw byte to numpy then encode it into raw byte video. I succeed to read byte to mp4 with below comment.

decode = (
                ffmpeg
                .input('pipe:')
                .output('pipe:')
                .get_args()
            )
output_data = p.communicate(input=input_data)[0]

#49 (comment)

It's okay for don't process anything. But I want to process like your tensorflow stream example

So I tried with two process but it's doesn't work and process. Below test is for video(byte) => numpy => video(file)

def start_encode_process():
    logger.info('Starting ffmpeg process1')
    args = (
        ffmpeg
        .input('pipe:')
        .output('pipe:', format='rawvideo', pix_fmt='rgb24')
        .compile()
    )
    return subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE)

def start_decode_process(out_filename, width, height):
    logger.info('Starting ffmpeg process2')
    args = (
        ffmpeg
        .input('pipe:', format='rawvideo', pix_fmt='rgb24', s='{}x{}'.format(width, height))
        .output(out_filename, pix_fmt='yuv420p')
        .overwrite_output()
        .compile()
    )
    return subprocess.Popen(args, stdin=subprocess.PIPE)

process1.stdin.write(video_byte)
while True:
    in_frame = read_frame(process1, width, height)
    out_frame = process_frame_simple(in_frame)
    write_frame(process2, out_frame)

I think first problem is stdin. Above reading raw video byte use communicate method for stdin. But below case it's not suit method cause need to process by each frame. Do you have any idea for this?

Thanks for reading.

I used these commands. However, I get some errors.

1. Could not find codec parameters for stream 0 (Video: mpeg4 (mp4v / 0x7634706D), none, 210 kb/s): unspecified size
Consider increasing the value for the 'analyzeduration' and 'probesize' options

2. [mov,mp4,m4a,3gp,3g2,mj2 @ 0x55ee01839e80] stream 0, offset 0x1bc655: partial file
pipe:: Invalid data found when processing input
Cannot determine format of input stream 0:0 after EOF
Error marking filters as finished
Conversion failed!

How to solve these problems?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants