Skip to content

Look into adding a gstreamer fifo sink #775

@adamcik

Description

@adamcik

This keeps coming up as people really seem to like ncmpcpp's visualizer

Trick for making a reliable FIFO sink is twofold. You need to use non-blocking writes, which means opening the write socket will fail unless there is a reader, hence you need a reader as well. Secondly if the buffer fills up because the external reader falls behind or there is none, your internal reader needs to clear the buffer.

Following code captures the basic gist of this, but still needs some more work with respect to error handling.

import errno
import os
import stat

LINUX_FIFO_BUFFER_SIZE = 65536


class FifoStreamer(object):                                                     
    def __init__(self, location):                                               
        self.location = location                                                
        self.reader = None                                                      
        self.writer = None                                                      

    def create(self):                                                           
        try:                                                                    
            mode = os.stat(self.location).st_mode                               
            if not stat.S_ISFIFO(mode):                                         
                raise Exception('File exists but is not a FIFO')                
        except OSError as e:                                                    
            if e.errno == errno.ENOENT:                                         
                os.mkfifo(self.location)                                        
            else:                                                               
                raise                                                           

        # TODO: wrap in could not open reader / writer?
        self.reader = os.open(self.location, os.O_NONBLOCK | os.O_RDONLY)       
        self.writer = os.open(self.location, os.O_NONBLOCK | os.O_WRONLY)       

    def close(self):                                                            
        # TODO: make closing robust
        os.close(self.writer)                                                   
        os.close(self.reader)                                                   

    def write(self, data):                                                      
        while data:                                                             
            try:                                                                
                written = os.write(self.writer, data)                           
                data = data[written:]                                           
            except OSError as e:                                                
                if e.errno == errno.EINTR:                                      
                    continue                                                    
                elif e.errno in (errno.EAGAIN, errno.EWOULDBLOCK):              
                    self.flush()                                                
                else:                                                           
                    raise                                                       

    def flush(self):                                                            
        while True:                                                             
            try:                                                                    
                if not os.read(self.reader, LINUX_FIFO_BUFFER_SIZE):                             
                    break                                                       
            except OSError as e:                                                
                if e.errno in (errno.EAGAIN, errno.EWOULDBLOCK):                
                    break                                                       
                elif e.errno == errno.EINTR:                                    
                    continue                                                    
                else:                                                           
                    raise 

The code above now needs to be integrated with GStreamer do actually do it's thing. Assuming 0.10 this is actually very much doable, reason I am hesitant is that we are planing a move to 1.x and the gir binding are not suited for making elements in python. In the case of this code in 1.x the problem boils down to not being able to create the pad templates that BaseSink expect to find. Creating this as a plain element might be doable, but you would need to re-implement way to much of the handling BaseSink makes sure you get right.

import gobject                                                                  

import pygst                                                                    
pygst.require('0.10')                                                           
import gst                                                             


class FifoSink(gst.BaseSink):                                                   
    __gstdetails__ = (                                                          
        'FifoSink',                                                             
        'Sink',                                                                 
        'Sink designed to handle FIFO output.',                        
        'Mopidy')                                                               

    __gsttemplates__ = (gst.PadTemplate('sink', gst.PAD_SINK, gst.PAD_ALWAYS,   
                                        gst.caps_new_any()),)                   

    # TODO: don't allow changing location in flight, i.e. create getter/setter
    location = gobject.property(type=str)                                       

    def __init__(self):                                                         
        gst.BaseSink.__init__(self)                                             
        self.streamer = None                                                    

    def do_start(self):                                                                                               
        self.streamer = FifoStreamer(self.location)                             
        self.streamer.create()                                                  
        return True                                                             

    def do_stop(self):                                                          
        self.streamer.close()                                                   
        return True                                                             

    def do_render(self, buf):                                                   
        try:                                                                    
            self.streamer.write(bytes(buf))                                     
            return gst.FLOW_OK                                                  
        except OSError as e:                                                    
            self.error("Failed: %s", e)                                         
            return gst.FLOW_ERROR                                               


gobject.type_register(FifoSink)                                                 
gst.element_register(                                                           
    FifoSink, 'fifosink', gst.RANK_MARGINAL)                                    

if __name__ == '__main__':                                                      
    import gobject                                                              
    gobject.threads_init()                                                      

    output = """                                                                
capsfilter caps="audio/x-raw-int,width=16,rate=44100,channels=1" ! 
tee name=t 
t. ! queue ! alsasink                                                           
t. ! queue ! fifosink location=/tmp/test2.fifo                                  
"""                                                                             

    sink = gst.parse_bin_from_description(                                      
        output, ghost_unconnected_pads=True)                                    

    playbin = gst.element_factory_make('playbin2')                              
    playbin.set_property('audio_sink', sink) 

Note that one problem I ran into testing this was actually forgetting to match the audio format expected by ncmpcpp, so make sure mono/stereo do indeed match up.

Metadata

Metadata

Assignees

No one assigned

    Labels

    A-audioArea: Audio layer

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions