Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Interceptors can't be shared between PeerConnections #1749

Closed
Sean-Der opened this issue Apr 7, 2021 · 14 comments
Closed

Interceptors can't be shared between PeerConnections #1749

Sean-Der opened this issue Apr 7, 2021 · 14 comments
Assignees

Comments

@Sean-Der
Copy link
Member

Sean-Der commented Apr 7, 2021

Right now an interceptor can't be shared among interceptors. This is a problem because we support sharing an API between PeerConnections. This was an API we provided and can't break.

This means when we close the first PeerConnection here it stops everything. We also are sharing the Interceptors between PeerConnections which also seems really bad? I am not even sure of the behavior.

I think we have two choices here. We can add a copy() to interceptors, or we can support sharing them and do refcounting.

Sharing+refcounting could be good for high performance cases? It could be powerful to have one NACK loop for all your PeerConnections.

@Sean-Der
Copy link
Member Author

Sean-Der commented Apr 7, 2021

@at-wat @jeremija sorry it is late here so might be missing some details. Would love your help/thoughts on this one. @jeremija pointed this out the other day and is something we should resolve soon.

@Sean-Der
Copy link
Member Author

@at-wat @jeremija What do you think of an API like this?

  • refcounted. Internally each interceptor is in charge of starting/stopping it when refcount reaches 0
  • Multiple Peerconnections. When a stream is bound it also needs to pass an ID that is shared across all its Streams.

Add a SessionID to StreamInfo

type StreamInfo struct {
    ID                  string
    SessionID     string
    SSRC           uint32
    .....
}
  • Binding RTCP streams require the SessionId
  • Close removes the RTCP + Streams associated with SessionId
type Interceptor interface {
    BindRTCPReader(sessionId string, reader RTCPReader) RTCPReader
    BindRTCPWriter(sessionId string, writer RTCPWriter) RTCPWriter

    BindLocalStream(info *StreamInfo, writer RTPWriter) RTPWriter
    UnbindLocalStream(info *StreamInfo)

    BindRemoteStream(info *StreamInfo, reader RTPReader) RTPReader
    UnbindRemoteStream(info *StreamInfo)

    Close(sessionId string)
}

@at-wat
Copy link
Member

at-wat commented Apr 12, 2021

I think the memory space of the interceptors are better to be separated by default to prevent accidental interference among the sessions.
(Mishandling of sessionId in one interceptor can cause it.)

Instead, it would be safer to conditionally share data on creation of Interceptor instance by introducing InterceptorFactory.

type InterceptorFactory interface {
  NewInterceptor(sessionId string, or something) (Interceptor, error)
}

// Example of InterceptorFactory sharing something between Interceptors.
type SharedInterceptorFactory struct {
  sharedBuffer map[string]*sharedData
}

type sharedData struct{}

func (f *SharedInterceptorFactory) NewInterceptor(sessionId string) (Interceptor, error) {
  i := &SharedInterceptor{}
  if data, ok := f.sharedBuffer[sessionId]; ok {
    i.data = data
  } else {
    i.data = &sharedData{}
    f.sharedBuffer[sessionId] = i.data
  }
  return i, nil
}

type SharedInterceptor struct {
  data *sharedData
  ...
}

// Example of InterceptorFactory not sharing data between Interceptors.
type UnsharedInterceptorFactory struct {}
func (f *UnsharedInterceptorFactory) NewInterceptor(_ string) (Interceptor, error) {
  return &UnsharedInterceptor{}, nil
}

type UnsharedInterceptor struct {
  ...
}

@Sean-Der
Copy link
Member Author

Fantastic idea! Totally agree with that

Do you have any idea how we can fix the problems people are having today with interceptor re-use? People are creating one API and sharing between multiple PeerConnections. What do you think of NewPeerConnection doing a copy everytime?

I would rather not start a /v4 just for an interceptor API change. It has been a lot of work getting people on /v3 already.

@jeremija
Copy link
Member

I also had an idea to add some sort of identifier to the StreamInfo, but I think at-wat has a point.

If we decide to move in this direction, does it make sense to unify this Factory and the current interceptor.Registry.Build method somehow? I think having both a Builder and a Factory is not really necessary and might add confusion. WDYT?

@Sean-Der
Copy link
Member Author

@jeremija 100% agree. I think we just need to fix the re-use problem, we have two choices.

  • Copy interceptors, so Close doesn't effect others
  • Throw an error in Interceptor re-use.

@at-wat what do you think is better? I would prefer a copy because it means zero API breakage, it does make interceptors more complicated unfortunately.

jeremija added a commit to pion/interceptor that referenced this issue Apr 22, 2021
This commit addresses the changes discussed [here][1].

I added `SessionID` to all interceptor methods and ensured that the
existing interceptors can be shared by different `PeerConnection`s.

I defined an `interceptor.Factory` following AtWat's suggestion. It
took a while for this to click for me: I didn't realize that the
`interceptor.Registry` contained already prebuilt interceptors. I like
the idea of a `Factory` better than implementing `Copy`. I added the
`interceptor.Registry.AddFactory` method and modified the internals a
little. The new `interceptor.NewChainFromFactories` will now be called
from the `Registry`.

The `interceptor.SharedFactory` allows us to easily keep the current
behavior with existing interceptors.

I left a number of TODOs, main bullet points being:

1. I think binding anything before calling `BindRTCPWriter` should be an
   error, because otherwise it becomes difficult to track state (e.g. if
   we've started a write loop). But I did not want to add an error
   return value to all methods before discussing it.

   If we do decide that calling `BindRTCPWriter` is the first step, then
   I think it would make sense to pass `RTCPWriter` as an argument to
   `interceptor.Registry.Build`?

2. We could modify the interceptors to only have a single `loop`
   goroutine which would go through all SSRCs for all bound sessions at
   once, but I'm not sure what's the best way to concurrently handle
   RTCP writes - I'm sure we wouldn't want a hanging RTCPWrite to slow
   down any other processing or writes.

   I also think that spinning up an unbounded number of goroutines just
   to do these writes is not a good idea, so I'm not sure how to
   processed. I'm open to suggestions.

   Maybe this shouldn't be a part of this change, but it's something to
   think about.

[1]: pion/webrtc#1749
jeremija added a commit to pion/interceptor that referenced this issue Apr 22, 2021
Closes #37. This commit addresses the changes discussed [here][1].

I added `SessionID` to all interceptor methods and ensured that the
existing interceptors can be shared by different `PeerConnection`s.

I defined an `interceptor.Factory` following AtWat's suggestion. It
took a while for this to click for me: I didn't realize that the
`interceptor.Registry` contained already prebuilt interceptors. I like
the idea of a `Factory` better than implementing `Copy`. I added the
`interceptor.Registry.AddFactory` method and modified the internals a
little. The new `interceptor.NewChainFromFactories` will now be called
from the `Registry`.

The `interceptor.SharedFactory` allows us to easily keep the current
behavior with existing interceptors.

I left a number of TODOs, main bullet points being:

1. I think binding anything before calling `BindRTCPWriter` should be an
   error, because otherwise it becomes difficult to track state (e.g. if
   we've started a write loop). But I did not want to add an error
   return value to all methods before discussing it.

   If we do decide that calling `BindRTCPWriter` is the first step, then
   I think it would make sense to pass `RTCPWriter` as an argument to
   `interceptor.Registry.Build`?

2. We could modify the interceptors to only have a single `loop`
   goroutine which would go through all SSRCs for all bound sessions at
   once, but I'm not sure what's the best way to concurrently handle
   RTCP writes - I'm sure we wouldn't want a hanging RTCPWrite to slow
   down any other processing or writes.

   I also think that spinning up an unbounded number of goroutines just
   to do these writes is not a good idea, so I'm not sure how to
   processed. I'm open to suggestions.

   Maybe this shouldn't be a part of this change, but it's something to
   think about.

[1]: pion/webrtc#1749
jeremija added a commit to pion/interceptor that referenced this issue Apr 22, 2021
Closes #37. This commit addresses the changes discussed [here][1].

I added `SessionID` to all interceptor methods and ensured that the
existing interceptors can be shared by different `PeerConnection`s.

I defined an `interceptor.Factory` following AtWat's suggestion. It
took a while for this to click for me: I didn't realize that the
`interceptor.Registry` contained already prebuilt interceptors. I like
the idea of a `Factory` better than implementing `Copy`. I added the
`interceptor.Registry.AddFactory` method and modified the internals a
little. The new `interceptor.NewChainFromFactories` will now be called
from the `Registry`.

The `interceptor.SharedFactory` allows us to easily keep the current
behavior with existing interceptors.

I left a number of TODOs, main bullet points being:

1. I think binding anything before calling `BindRTCPWriter` should be an
   error, because otherwise it becomes difficult to track state (e.g. if
   we've started a write loop). But I did not want to add an error
   return value to all methods before discussing it.

   If we do decide that calling `BindRTCPWriter` is the first step, then
   I think it would make sense to pass `RTCPWriter` as an argument to
   `interceptor.Registry.Build`?

2. We could modify the interceptors to only have a single `loop`
   goroutine which would go through all SSRCs for all bound sessions at
   once, but I'm not sure what's the best way to concurrently handle
   RTCP writes - I'm sure we wouldn't want a hanging RTCPWrite to slow
   down any other processing or writes.

   I also think that spinning up an unbounded number of goroutines just
   to do these writes is not a good idea, so I'm not sure how to
   processed. I'm open to suggestions.

   Maybe this shouldn't be a part of this change, but it's something to
   think about.

[1]: pion/webrtc#1749
@jeremija
Copy link
Member

jeremija commented Apr 22, 2021

@at-wat @Sean-Der Just asked you to review my attempt in fixing this. It took a while for the Factory to click for me - I didn't realize that we had interceptor instances stored in the registry.

I left a number of TODOs which is the only reason the build is failing (linting), but I'd be curious to hear your thoughts! More info in the PR:

pion/interceptor#38

jeremija added a commit to pion/interceptor that referenced this issue Apr 22, 2021
Closes #37. This commit addresses the changes discussed [here][1].

I added `SessionID` to all interceptor methods and ensured that the
existing interceptors can be shared by different `PeerConnection`s.

I defined an `interceptor.Factory` following AtWat's suggestion. It
took a while for this to click for me: I didn't realize that the
`interceptor.Registry` contained already prebuilt interceptors. I like
the idea of a `Factory` better than implementing `Copy`. I added the
`interceptor.Registry.AddFactory` method and modified the internals a
little. The new `interceptor.NewChainFromFactories` will now be called
from the `Registry`.

The `interceptor.SharedFactory` allows us to easily keep the current
behavior with existing interceptors.

I left a number of TODOs, main bullet points being:

1. I think binding anything before calling `BindRTCPWriter` should be an
   error, because otherwise it becomes difficult to track state (e.g. if
   we've started a write loop). But I did not want to add an error
   return value to all methods before discussing it.

   If we do decide that calling `BindRTCPWriter` is the first step, then
   I think it would make sense to pass `RTCPWriter` as an argument to
   `interceptor.Registry.Build`?

2. We could modify the interceptors to only have a single `loop`
   goroutine which would go through all SSRCs for all bound sessions at
   once, but I'm not sure what's the best way to concurrently handle
   RTCP writes - I'm sure we wouldn't want a hanging RTCPWrite to slow
   down any other processing or writes.

   I also think that spinning up an unbounded number of goroutines just
   to do these writes is not a good idea, so I'm not sure how to
   processed. I'm open to suggestions.

   Maybe this shouldn't be a part of this change, but it's something to
   think about.

[1]: pion/webrtc#1749
jeremija added a commit to pion/interceptor that referenced this issue Apr 22, 2021
Closes #37. This commit addresses the changes discussed [here][1].

I added `SessionID` to all interceptor methods and ensured that the
existing interceptors can be shared by different `PeerConnection`s.

I defined an `interceptor.Factory` following AtWat's suggestion. It
took a while for this to click for me: I didn't realize that the
`interceptor.Registry` contained already prebuilt interceptors. I like
the idea of a `Factory` better than implementing `Copy`. I added the
`interceptor.Registry.AddFactory` method and modified the internals a
little. The new `interceptor.NewChainFromFactories` will now be called
from the `Registry`.

The `interceptor.SharedFactory` allows us to easily keep the current
behavior with existing interceptors.

I left a number of TODOs, main bullet points being:

1. I think binding anything before calling `BindRTCPWriter` should be an
   error, because otherwise it becomes difficult to track state (e.g. if
   we've started a write loop). But I did not want to add an error
   return value to all methods before discussing it.

   If we do decide that calling `BindRTCPWriter` is the first step, then
   I think it would make sense to pass `RTCPWriter` as an argument to
   `interceptor.Registry.Build`?

2. We could modify the interceptors to only have a single `loop`
   goroutine which would go through all SSRCs for all bound sessions at
   once, but I'm not sure what's the best way to concurrently handle
   RTCP writes - I'm sure we wouldn't want a hanging RTCPWrite to slow
   down any other processing or writes.

   I also think that spinning up an unbounded number of goroutines just
   to do these writes is not a good idea, so I'm not sure how to
   processed. I'm open to suggestions.

   Maybe this shouldn't be a part of this change, but it's something to
   think about.

[1]: pion/webrtc#1749
@at-wat
Copy link
Member

at-wat commented May 6, 2021

@jeremija sorry for the late response.

My idea on #1749 (comment) was intended not to pass sessionId argument to Bind methods.
Instead, SharedInterceptorFactory.NewInterceptor() (receiving sessionId) allocates shared buffers and/or starts shared routines if it is a new sessionId, and sets shared things to the returned Interceptor. Then, all Interceptors spawned from same SharedInterceptorFactory with same sessionId have access to the shared resources.

I'm not very sure it actually works, but it should be safer since each Interceptor instance doesn't need to care the sessionId in this structure.

@jeremija
Copy link
Member

jeremija commented May 8, 2021

Hi @at-wat, no worries and thanks for taking a look!

I see your point, but after talking to Sean it seemed like he wanted to provide the ability to share the same interceptor resources among multiple peer connections to conserve resources where that made sense, for example in the same conference room (@Sean-Der please correct me if I misunderstood). So that's the main reason why I added the SessionID to the interceptor.Interceptor interface too, otherwise we'd be at a risk of SSRC collisions.

Now, there's high probability I over-engineered the whole thing and that the correct thing to do is just to remove the SessionID from the interceptor.Interceptor interface. But then we'd have to create a new interceptor for each new peer connection, unless there's something I'm missing here?

@at-wat
Copy link
Member

at-wat commented Jul 8, 2021

Sorry to taking too long. I'll take a look again.

@at-wat
Copy link
Member

at-wat commented Jul 8, 2021

@jeremija sorry for the delayed response.

So that's the main reason why I added the SessionID to the interceptor.Interceptor interface too, otherwise we'd be at a risk of SSRC collisions.

I still don't understand why Interceptor.BindRTCPWriter() should receive SessionID.
When Interceptor is created, it already have information of the session ID when NewInterceptor(sessionID SessionID) is called.
If SessionID string is used in the instance of the Interceptor, Interceptor can hold the SessionID string.

I see your point, but after talking to Sean it seemed like he wanted to provide the ability to share the same interceptor resources among multiple peer connections to conserve resources where that made sense, for example in the same conference room (@Sean-Der please correct me if I misunderstood).

To support both globally shared resource and session designated shared resource, and hold SessionID in the shared interceptor, #1749 (comment) can be:

// Example of InterceptorFactory sharing something between Interceptors.
type SharedInterceptorFactory struct {
  sessionBuffer map[string]*sharedData // shared data in the session
  globalBuffer  *sharedData            // globally shared data among all Interceptors
}

type sharedData struct{}

func (f *SharedInterceptorFactory) NewInterceptor(sessionId string) (Interceptor, error) {
  i := &SharedInterceptor{
    globalData: f.globalBuffer,
    sessionID:  sessionId,
  }
  if data, ok := f.sessionBuffer[sessionId]; ok {
    i.sessionData = data
  } else {
    i.sessionData = &sharedData{}
    f.sessionBuffer[sessionId] = i.data
  }
  return i, nil
}

type SharedInterceptor struct {
  sessionData *sharedData
  globalData  *sharedData

  sessionID string // Optinal if sessionID is required in Interceptor's methods
  ...
}

Or, is there any case that SessionID passed to NewInterceptor and BindRTCPReader/BindRTCPWriter are different?

@jeremija
Copy link
Member

jeremija commented Jul 8, 2021

Or, is there any case that SessionID passed to NewInterceptor and BindRTCPReader/BindRTCPWriter are different?

This is where I was hoping to get Sean's input - the current implementation of SharedFactory ignores this parameter, because it shares the same interceptor chain across multiple peer connections.

At some point Sean said that he'd want to do processing for all sessions in a single goroutine, and your idea of using a globalBuffer could help us design that single goroutine better. However, if we were to share the globalBuffer across interceptors, we'd have to redesign the way all of our interceptor implementations work (let them know of this new type). But it's probably out of scope of this change.

We could also defer this decision to the users who need total control, and they could build their own interceptors and factories that are more tightly coupled.

That said, I think your comment makes total sense, and it would simplify implementations quite a bit.

@Sean-Der
Copy link
Member Author

@at-wat @jeremija The new design needs to accomodate the way API/Interceptor is used in pion/webrtc unfortunately. We have people that are doing this today.

var api *webrtc.API

func init() {
      m := &webrtc.MediaEngine{}
      if err := m.RegisterDefaultCodecs(); err != nil {
          panic(err)
      }

      if err := webrtc.RegisterDefaultInterceptors(m, i); err != nil {
          panic(err)
      }

     api = webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i))
}

// Lots of other code

a, _ := api.NewPeerConnection(config)
b, _ := api.NewPeerConnection(config)
c, _ := api.NewPeerConnection(config)

So my goal/question is 'How can we keep this code compiling and make it work'. Today when a, b or c is closed it closes the shared interceptor and the user loses everything.

@Sean-Der
Copy link
Member Author

Closing for #1956

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants