From 2abd9e787bf95d75ee6ae385180141b9b84347b4 Mon Sep 17 00:00:00 2001 From: diamondburned Date: Sat, 8 Oct 2022 09:18:01 -0700 Subject: [PATCH] Add PipeWire support --- catnip.go | 1 + input/pipewire/cli.go | 20 ++++ input/pipewire/dump.go | 188 ++++++++++++++++++++++++++++++ input/pipewire/pipewire.go | 231 +++++++++++++++++++++++++++++++++++++ 4 files changed, 440 insertions(+) create mode 100644 input/pipewire/cli.go create mode 100644 input/pipewire/dump.go create mode 100644 input/pipewire/pipewire.go diff --git a/catnip.go b/catnip.go index 0a6ab3d..d2fc682 100644 --- a/catnip.go +++ b/catnip.go @@ -13,6 +13,7 @@ import ( _ "github.com/noriah/catnip/input/ffmpeg" _ "github.com/noriah/catnip/input/parec" + _ "github.com/noriah/catnip/input/pipewire" "github.com/integrii/flaggy" "github.com/pkg/errors" diff --git a/input/pipewire/cli.go b/input/pipewire/cli.go new file mode 100644 index 0000000..533d8c9 --- /dev/null +++ b/input/pipewire/cli.go @@ -0,0 +1,20 @@ +package pipewire + +import ( + "fmt" + "os/exec" + + "github.com/pkg/errors" +) + +func pwLink(outPortID, inPortID pwObjectID) error { + cmd := exec.Command("pw-link", "-L", fmt.Sprint(outPortID), fmt.Sprint(inPortID)) + if err := cmd.Run(); err != nil { + var exitErr *exec.ExitError + if errors.As(err, &exitErr) { + return errors.Wrapf(err, "failed to run pw-link: %s", exitErr.Stderr) + } + return err + } + return nil +} diff --git a/input/pipewire/dump.go b/input/pipewire/dump.go new file mode 100644 index 0000000..d75245f --- /dev/null +++ b/input/pipewire/dump.go @@ -0,0 +1,188 @@ +package pipewire + +import ( + "encoding/json" + "os/exec" + + "github.com/pkg/errors" +) + +type pwObjectType string + +const ( + pwInterfaceDevice pwObjectType = "PipeWire:Interface:Device" + pwInterfaceNode pwObjectType = "PipeWire:Interface:Node" + pwInterfacePort pwObjectType = "PipeWire:Interface:Port" + pwInterfaceLink pwObjectType = "PipeWire:Interface:Link" +) + +type pwDump []pwObject + +func pwObjects() (pwDump, error) { + dumpOutput, err := exec.Command("pw-dump").Output() + if err != nil { + var execErr *exec.ExitError + if errors.As(err, &execErr) { + return nil, errors.Wrapf(err, "failed to run pw-dump: %s", execErr.Stderr) + } + return nil, errors.Wrap(err, "failed to run pw-dump") + } + + var dump pwDump + if err := json.Unmarshal(dumpOutput, &dump); err != nil { + return nil, errors.Wrap(err, "failed to parse pw-dump output") + } + + return dump, nil +} + +func (d pwDump) Links() pwDump { + return d. + Filter(func(o pwObject) bool { return o.Type == pwInterfaceLink }) +} + +func (d pwDump) AudioSinks() pwDump { + return d.Filter( + func(o pwObject) bool { return o.Type == pwInterfaceNode }, + func(o pwObject) bool { return o.Info.Props.MediaClass == pwAudioSink }, + ) +} + +// Filter filters for the devices that satisfies f. +func (d pwDump) Filter(fns ...func(pwObject) bool) pwDump { + filtered := make(pwDump, 0, len(d)) +loop: + for _, device := range d { + for _, f := range fns { + if !f(device) { + continue loop + } + } + filtered = append(filtered, device) + } + return filtered +} + +// Find returns the first object that satisfies f. +func (d pwDump) Find(f func(pwObject) bool) *pwObject { + for i, device := range d { + if f(device) { + return &d[i] + } + } + return nil +} + +// Object gets the object with the given ID. +func (d pwDump) Object(id pwObjectID) *pwObject { + for i, o := range d { + if o.ID == id { + return &d[i] + } + } + return nil +} + +// TODO: generate a unique ID for catnip, so we can easier look up this stuff. + +type pwPortDirection string + +const ( + pwPortIn = "in" + pwPortOut = "out" +) + +// ResolvePorts returns all PipeWire port objects that belong to the given +// object. +func (d pwDump) ResolvePorts(object *pwObject, dir pwPortDirection) pwDump { + return d.Filter( + func(o pwObject) bool { return o.Type == pwInterfacePort }, + func(o pwObject) bool { + return o.Info.Props.NodeID == object.ID && o.Info.Props.PortDirection == dir + }, + ) +} + +// OutputLinks returns all links that are connected to the given object. +func (d pwDump) OutputLinks(output *pwObject) pwDump { + return d.Filter( + func(o pwObject) bool { return o.Type == pwInterfaceLink }, + func(o pwObject) bool { return o.Info.Props.LinkOutputNode == output.ID }, + ) +} + +// InputLinks returns all links that are connected to the given object. +func (d pwDump) InputLinks(output *pwObject) pwDump { + return d.Filter( + func(o pwObject) bool { return o.Type == pwInterfaceLink }, + func(o pwObject) bool { return o.Info.Props.LinkInputNode == output.ID }, + ) +} + +type pwObjectID int64 + +type pwObject struct { + ID pwObjectID `json:"id"` + Type pwObjectType `json:"type"` + Info struct { + Props pwInfoProps `json:"props"` + } `json:"info"` +} + +type pwInfoProps struct { + pwDeviceProps + pwNodeProps + pwLinkProps + pwPortProps + MediaClass string `json:"media.class"` + + JSON json.RawMessage `json:"-"` +} + +func (p *pwInfoProps) UnmarshalJSON(data []byte) error { + type Alias pwInfoProps + if err := json.Unmarshal(data, (*Alias)(p)); err != nil { + return err + } + p.JSON = append([]byte(nil), data...) + return nil +} + +type pwDeviceProps struct { + DeviceName string `json:"device.name"` +} + +// pwNodeProps is for Audio/Sink only. +type pwNodeProps struct { + NodeName string `json:"node.name"` + NodeNick string `json:"node.nick"` + NodeDescription string `json:"node.description"` +} + +// Constants for MediaClass. +const ( + pwAudioDevice string = "Audio/Device" + pwAudioSink string = "Audio/Sink" +) + +// pwLinkProps is for links only. NodeID is the object ID; PortID is the port +// object's ID. Input would be our catnip. Output would be the sink. +// +// We don't actually need to resolve the PortID to remove it, but we do need to +// find all relevant ports to create new links. We can do this by filtering for +// all Ports with the same NodeID. +type pwLinkProps struct { + LinkOutputNode pwObjectID `json:"link.output.node"` + LinkOutputPort pwObjectID `json:"link.output.port"` + LinkInputNode pwObjectID `json:"link.input.node"` + LinkInputPort pwObjectID `json:"link.input.port"` +} + +type pwPortProps struct { + PortID pwObjectID `json:"port.id"` + PortName string `json:"port.name"` + PortAlias string `json:"port.alias"` + PortDirection pwPortDirection `json:"port.direction"` + NodeID pwObjectID `json:"node.id"` + ObjectPath string `json:"object.path"` +} diff --git a/input/pipewire/pipewire.go b/input/pipewire/pipewire.go new file mode 100644 index 0000000..adcf2a7 --- /dev/null +++ b/input/pipewire/pipewire.go @@ -0,0 +1,231 @@ +package pipewire + +import ( + "context" + "encoding/base64" + "encoding/binary" + "encoding/json" + "fmt" + "os" + "os/exec" + "sync" + "sync/atomic" + "time" + + "github.com/noriah/catnip/input" + "github.com/noriah/catnip/input/common/execread" + "github.com/pkg/errors" +) + +func init() { + input.RegisterBackend("pipewire", Backend{}) +} + +type Backend struct{} + +func (p Backend) Init() (err error) { + return +} + +func (p Backend) Close() error { + return nil +} + +func (p Backend) Devices() ([]input.Device, error) { + pwObjs, err := pwObjects() + if err != nil { + return nil, err + } + + pwSinks := pwObjs.AudioSinks() + + devices := make([]input.Device, len(pwSinks)) + for i, device := range pwSinks { + devices[i] = AudioDevice{device.Info.Props.NodeName} + } + + return devices, nil +} + +func (p Backend) DefaultDevice() (input.Device, error) { + return AudioDevice{"auto"}, nil +} + +func (p Backend) Start(cfg input.SessionConfig) (input.Session, error) { + return NewSession(cfg) +} + +type AudioDevice struct { + name string +} + +func (d AudioDevice) String() string { + return d.name +} + +type catnipProps struct { + ApplicationName string `json:"application.name"` + CatnipID string `json:"catnip.id"` +} + +// Session is a PipeWire session. +type Session struct { + session execread.Session + props catnipProps + targetName string +} + +// NewSession creates a new PipeWire session. +func NewSession(cfg input.SessionConfig) (*Session, error) { + currentProps := catnipProps{ + ApplicationName: "catnip", + CatnipID: generateID(), + } + + propsJSON, err := json.Marshal(currentProps) + if err != nil { + return nil, errors.Wrap(err, "failed to marshal props") + } + + dv, ok := cfg.Device.(AudioDevice) + if !ok { + return nil, fmt.Errorf("invalid device type %T", cfg.Device) + } + + // pw-record --rate 44100 --channels 1 --format f32 --latency 1024 --target 'bluez_card.34_28_40_C6_6D_BD' + args := []string{ + "pw-record", + "--format", "f32", + "--rate", fmt.Sprint(cfg.SampleRate), + "--latency", fmt.Sprint(cfg.SampleSize), + "--channels", fmt.Sprint(cfg.FrameSize), + "--target", "0", // see .relink comment below + "--quality", "0", + "--media-category", "Capture", + "--media-role", "DSP", + "--properties", string(propsJSON), + "-", + } + + session := Session{ + session: *execread.NewSession(args, true, cfg), + props: currentProps, + targetName: dv.name, + } + session.session.OnStart = session.onStart + + return &session, nil +} + +// Start starts the session. It implements input.Session. +func (s *Session) Start(ctx context.Context, dst [][]input.Sample, kickChan chan bool, mu *sync.Mutex) error { + return s.session.Start(ctx, dst, kickChan, mu) +} + +func (s *Session) onStart(ctx context.Context, cmd *exec.Cmd) error { + var err error + for i := 0; i < 20; i++ { + if err = s.relink(ctx, cmd); err == nil { + return nil + } + time.Sleep(100 * time.Millisecond) + } + + return err +} + +// We do a bit of tomfoolery here. Wireplumber actually is pretty incompetent at +// handling target.device, so our --target flag is pretty much useless. We have +// to do the node links ourselves. +// +// Relevant issues: +// +// - https://gitlab.freedesktop.org/pipewire/pipewire/-/issues/2731 +// - https://gitlab.freedesktop.org/pipewire/wireplumber/-/issues/358 +// +func (s *Session) relink(ctx context.Context, cmd *exec.Cmd) error { + objs, err := pwObjects() + if err != nil { + return errors.Wrap(err, "failed to get pw-dump") + } + + // Find the device node. + deviceObj := objs.Find(func(obj pwObject) bool { + return obj.Type == pwInterfaceNode && obj.Info.Props.NodeName == s.targetName + }) + if deviceObj == nil { + return errors.New("failed to find device node in PipeWire") + } + + // Find all of the device's ports. We want the device's output ports. + devicePorts := objs.ResolvePorts(deviceObj, pwPortOut) + if len(devicePorts) == 0 { + return errors.New("failed to find any device port in PipeWire") + } + + // Find the catnip node. + catnipObj := objs.Find(func(obj pwObject) bool { + if obj.Type != pwInterfaceNode { + return false + } + var props catnipProps + err := json.Unmarshal(obj.Info.Props.JSON, &props) + return err == nil && props == s.props + }) + if catnipObj == nil { + return errors.New("failed to find catnip node in PipeWire") + } + + // Find all of catnip's ports. We want catnip's input ports. + catnipPorts := objs.ResolvePorts(catnipObj, pwPortIn) + if len(catnipPorts) == 0 { + return errors.New("failed to find any catnip port in PipeWire") + } + + // log.Printf("device %d ports:", deviceObj.ID) + // for _, port := range devicePorts { + // log.Printf(" %d (%s)", port.ID, port.Info.Props.PortName) + // } + + // log.Printf("catnip %d ports:", catnipObj.ID) + // for _, port := range catnipPorts { + // log.Printf(" %d (%s)", port.ID, port.Info.Props.PortName) + // } + + // Link the catnip node to the device node. + + for i := 0; i < len(devicePorts) && i < len(catnipPorts); i++ { + catnipPort := catnipPorts[i] + devicePort := devicePorts[i] + + // log.Println("linking port", devicePort.ID, "to", catnipPort.ID) + if err := pwLink(devicePort.ID, catnipPort.ID); err != nil { + return errors.Wrapf(err, + "failed to link catnip port %d to device port %d", + catnipPort.ID, devicePort.ID) + } + } + + return nil +} + +var sessionCounter uint64 + +// generateID generates a unique ID for this session. +func generateID() string { + sessionNum := atomic.AddUint64(&sessionCounter, 1) + return fmt.Sprintf( + "%d@%s#%d", + os.Getpid(), + shortEpoch(), + sessionNum, + ) +} + +// shortEpoch generates a small string that is unique to the current epoch. +func shortEpoch() string { + now := time.Now().Unix() + var buf [8]byte + binary.LittleEndian.PutUint64(buf[:], uint64(now)) + return base64.RawURLEncoding.EncodeToString(buf[:]) +}