Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data (not file or network protocol) interface to libav #448

Closed
davidavdav opened this issue Oct 26, 2018 · 27 comments
Closed

Data (not file or network protocol) interface to libav #448

davidavdav opened this issue Oct 26, 2018 · 27 comments

Comments

@davidavdav
Copy link

Hi,

We've been struggling (#445) with using libav to decode packets of compressed audio that come into our server via a dedicated websocket connection.

It appears that the only way to send data to libav is though some file name (which may actually be a URL of some sort). So our solution is currently to use a unix named pipe, av.open(pipe) a container, then read packets, frames, from the stream in the container. This worked OK-ish, until we hit #445.

## consume reads data from a named pipe, and returns the data through framesq.
## It should be called from a separate process. 
## Decompressed data is returned in a Queue framesq.
def consume(pipename, framesq, doneq, sr):
    try:
        container = av.open(pipename, "r", options={'max_delay': '10000'})
        audio = container.streams[0]

        resampler = av.AudioResampler(
            format = av.AudioFormat("s16p").packed,
            layout = int(1),
            rate = sr,
        )

        for packet in container.demux(audio):
            for frame in packet.decode():
                frame.pts = None
                for i, plane in enumerate(frame.planes):
                    frame = resampler.resample(frame)
                    bytes = frame.planes[0].to_bytes()
                    framesq.put(bytes)

    except Exception as e:
        logger.debug("Error in AV consumption: %r", e)

    doneq.put(True) ## signal end of data  

In the light of solution #325, I've been experimenting with UDP and unix domain sockets to set up the transport of data, to circumvent the problem that ffmpeg options don't seem to make much of a difference. However I can't get any of these to work (a boilerplate example would be appreciated:-).

But really, we would just want to send the actual compressed data (incl headers etc) to libav synchoneously, and receive the decompressed data out whenever libav has enough information to output another frame. But we can't find an entry to pyav / libav / ffmpeg do do such a thing, hence the workaround above.

Does anyone know if it would be possible to get a chunk-by-chunk implementation of the decoding?

@jlaine
Copy link
Collaborator

jlaine commented Nov 5, 2018

If you want to feed raw packet data into a decoder, you don't need to use av.open at all, you can also construct Packet instances and feed them into CodecContext.decode. This is for example what aiortc does for H.264 decoding:

https://github.com/jlaine/aiortc/blob/ee7e745397f40094fb52cccbb6105f0297c11dda/aiortc/codecs/h264.py#L93

@jlaine
Copy link
Collaborator

jlaine commented Nov 6, 2018

@davidavdav Could you let me know whether this answers your question so we can close the issue?

@davidavdav
Copy link
Author

Hello @jlaine, thanks I will have to look into this. One thing I don't really understand is that the repository you point at seems to be a completely different thing---are you supposing we should use that instead of pyav?

@davidavdav
Copy link
Author

Or are you suggesting I can use av.packet.Packet(encoded_frame_data) directly? Is there documentation of this interface? I would need to know how to find packets/frames in the data stream, currently av.open does that for me.

@jlaine
Copy link
Collaborator

jlaine commented Nov 6, 2018

What I was suggesting was indeed using Packet() and feeding it to a CodecContext, the link was just an example of a codebase that does something similar : it receives H.264 data over RTP (where you use a websocket) and feeds it to a decoder.

WebSocket are message-oriented, so why not leverage this to ensure you send one packet in each message?

@davidavdav
Copy link
Author

Data is coming in in chunks, decided by the webm encoder of chrome/firefox, I would assume these are integer amounts of packets but I would not know if that is a single packet or more. I suppose I could try and see.

@mikeboers
Copy link
Member

While some codecs may require that our Packet objects are actually integer packets, some either don't require that, and some have another layer of packets underneath that must be in whole units.

I'd say... try just feeding it whatever data you have. If it absolutely needs to go through something to split it up, there is an unbaked and undocumented CodecContext.parse which will split raw bytes into packets. We could take a few minutes to look at that if needed.

@davidavdav
Copy link
Author

If I send the packets as they come in to a decoder initialized with av.CodecContext.create('opus', 'r') (the codec being used by chrome for audio) I get decoding errors Invalid data found when processing input (16: opus) after a few packets. I think the first data chunk contains some header information (I see "Chrome" and "A_OPUS" in the data), and that is probably not dealt with by packet.decode()---av.open(), however, does this all magically (but requires a file-like input and buffers the input #445).

@davidavdav
Copy link
Author

A few minutes would be appreciated... I've been struggling with this for a few weeks now. CodecContext.parse() gives empty lists on the chunks of data that gave errors with packet.decode() but skipping these still results in wrongly decoded data.

I'd like to be able to skip the webm header, but I can't find the definition of the webm header anywhere.

@mikeboers
Copy link
Member

I'm interested in seeing where this goes, so... can you write something that dumps out the chunks that you're getting in a manner that I can recover and simulate, and hand off that data?

So either a sequence of numbered files for each packet, JSON of a number of packets, or something something.

At least as many packets as cause your stuff to error out, and ideally as many as you think are required for a few frames of output.

@davidavdav
Copy link
Author

Here are the packets as they come in through the websocket...

packets.tar.gz

I read them using:

    for file in sorted(glob.glob("data/be*.00*")):
        with open(file) as fd:
            data = fd.read()

@jlaine
Copy link
Collaborator

jlaine commented Nov 6, 2018

Some of the packets seem to be corrupted, I'd venture something like invalid null byte handling in your transport. I tried the following:

import glob
  
import av

codec_context = av.CodecContext.create('libopus', 'r')
for filename in sorted(glob.glob("data/be*.00*")):
    with open(filename, 'rb') as fd:
        packet = av.packet.Packet(fd.read())
    try:
        for frame in codec_context.decode(packet):
            print(filename, frame)
    except:
        print(filename, 'error')

And the output looks like:

data/be9afe4f-7f52-40c9-82c5-c05b22a84829.0000 <av.AudioFrame 0, pts=None, 5760 samples at 48000Hz, 0 channels, s16 at 0x7f673fd400b8>
data/be9afe4f-7f52-40c9-82c5-c05b22a84829.0001 <av.AudioFrame 1, pts=None, 960 samples at 48000Hz, 0 channels, s16 at 0x7f673fd40518>
Decoding error: corrupted stream
data/be9afe4f-7f52-40c9-82c5-c05b22a84829.0002 error
data/be9afe4f-7f52-40c9-82c5-c05b22a84829.0003 <av.AudioFrame 2, pts=None, 120 samples at 48000Hz, 0 channels, s16 at 0x7f673fd40898>
data/be9afe4f-7f52-40c9-82c5-c05b22a84829.0004 <av.AudioFrame 3, pts=None, 480 samples at 48000Hz, 0 channels, s16 at 0x7f673fd40908>
data/be9afe4f-7f52-40c9-82c5-c05b22a84829.0005 <av.AudioFrame 4, pts=None, 480 samples at 48000Hz, 0 channels, s16 at 0x7f673fd40978>
data/be9afe4f-7f52-40c9-82c5-c05b22a84829.0006 <av.AudioFrame 5, pts=None, 480 samples at 48000Hz, 0 channels, s16 at 0x7f673fd409e8>
data/be9afe4f-7f52-40c9-82c5-c05b22a84829.0007 <av.AudioFrame 6, pts=None, 240 samples at 48000Hz, 0 channels, s16 at 0x7f673fd40a58>
data/be9afe4f-7f52-40c9-82c5-c05b22a84829.0008 <av.AudioFrame 7, pts=None, 120 samples at 48000Hz, 0 channels, s16 at 0x7f673fd40ac8>
data/be9afe4f-7f52-40c9-82c5-c05b22a84829.0009 error

@davidavdav
Copy link
Author

Yes, I get similar behavior (I tried 'opus' and 'libopus' for the codeccontext, I don't know which is the correct one). Also, the packages that do not amount to an error still give an incorrectly decoded sound.

When you cat the data to a file, and open with av.open(), decoding works fine. My assumption is that it is a stream header of some kind, that av.open() can deal with.

@jlaine
Copy link
Collaborator

jlaine commented Nov 6, 2018

Yeah, this doesn't look like opus packets, but like arbitrary fragments of opus in a WebM container. This reads them fine:

import io
import glob

import av

data = b''
for filename in sorted(glob.glob("data/be*.00*")):
    with open(filename, 'rb') as fd:
        data += fd.read()

container = av.open(io.BytesIO(data), 'r')
for frame in container.demux(audio=0):
    print(frame)

It feels as though you're approaching the problem in a bizarre fashion though by inventing your own transport for realtime audio. Unless you have a strong understanding of what you are doing I would switch to a more standard solution, and the canonical way of sending real-time audio from a browser is WebRTC.

@davidavdav
Copy link
Author

Yes! --- And how can we deal with such webm containers? (The chrome mediarecorder api generates these streams, there is not much we can do about that)

@doender
Copy link

doender commented Nov 6, 2018

@jlaine The audio was recorded through this API (https://developer.mozilla.org/en-US/docs/Web/API/MediaRecorder) and the blobs received in the MediaRecorder.ondataavailable event handler are directly sent as separate messages through a websocket connection.

@mikeboers
Copy link
Member

My first attempt was to write a buffer object that PyAV's Python-IO would work with. Then I pushed one of your packets at a time into it, and tried to demux the container.

But it seems like once it hits the end of the buffer, FFmpeg has decided it is the end of the file and never tries again.

I think that the Python-IO would need to be extended so that we can say there isn't data yet, but to keep trying. Don't know how to do that...

@mikeboers
Copy link
Member

I think that we could do it by implementing the interrupt function. Then, if they ask for more data when we have none, we return EAGAIN, and then trigger the interrupt, which gives us the chance to put more data in the buffer. But I think these would need to be in separate threads or something.

Huh... I wonder if I can do it with a single thread and a queue. Let me try!

@mikeboers
Copy link
Member

This is not robust in the slightest, but it seems to work (with my local patches so that it doesn't need to be able to seek). -> https://gist.github.com/mikeboers/091669058cd565381ed16c436cd7cc87

@mikeboers
Copy link
Member

If we can use greenlets, then I think I could make a pretty nice version of this sort of thing which gives an API that you can push data into.

@jlaine
Copy link
Collaborator

jlaine commented Nov 6, 2018

Here's a naive stab at it:

import glob
import queue
import threading
import time

import av


class QueueFile:
    def __init__(self):
        self.finished = False
        self.queue = queue.Queue()

    def read(self, n):
        print('read(%d)' % n)
        if self.finished:
            return b''

        data = self.queue.get()
        if not data:
            self.finished = True
        assert len(data) <= n
        return data


def receiver_worker(qf):
    for filename in sorted(glob.glob("data/be*.00*")):
        with open(filename, 'rb') as fd:
            qf.queue.put(fd.read())
        time.sleep(1)
    qf.queue.put(b'')


def decoder_worker(qf):
    container = av.open(qf, format='webm')
    for frame in container.decode(audio=0):
        print(frame)


qf = QueueFile()
receiver_thread = threading.Thread(target=receiver_worker, args=(qf,))
receiver_thread.start()

decoder_thread = threading.Thread(target=decoder_worker, args=(qf,))
decoder_thread.start()

receiver_thread.join()
decoder_thread.join()

@mikeboers this reveals there is a bug in PyAV with non-seekable Python I/O. In addition to not setting the "seekable" flag, we should be passing NULL as the last argument of avio_alloc_context. Otherwise pyio_seek is invoked, which bombs.

@davidavdav
Copy link
Author

Thanks, I think your gist is a nicer version of our named-pipe + subprocess version, but the behavior appears to be very similar: only after 8 packets of 0.25 seconds, so after 2 seconds of audio, ffmpeg has decided to start to output frames, see #445 . This is an unnecessary latency, we believe, hence the original question.

@mikeboers
Copy link
Member

@jlaine That is exactly what I've done on my local one (re: NULL seek callback).

@mikeboers
Copy link
Member

I can't get it to probe less than 2048 bytes.

But you can disable the probe by av.open(whatever, format='webm').

Then I get audio frames out of the first 318B packet.

@davidavdav
Copy link
Author

OK, thanks, specifying av.open(format='webm') solved the latency problem that was the cause of #445 and which inspired this issue.

I must have been confused by the format option example in the documentation which is used to specify a device.

@jlaine
Copy link
Collaborator

jlaine commented Nov 6, 2018

@mikeboers I've put a fix for the "seek" handling in #453, if you're OK with merging it I think this issue is done.

@jlaine
Copy link
Collaborator

jlaine commented Nov 7, 2018

PR #453 has been merged, so I'm closing this issue.

@jlaine jlaine closed this as completed Nov 7, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants