Skip to content

Commit

Permalink
Merge pull request #137 from red5pro/feature/two-way-cluster
Browse files Browse the repository at this point in the history
Added a version of the Two Way test for clustering
  • Loading branch information
dheimann committed Nov 8, 2018
2 parents 902fb1e + bd36983 commit fe5c03a
Show file tree
Hide file tree
Showing 5 changed files with 248 additions and 0 deletions.
4 changes: 4 additions & 0 deletions R5ProTestbed.xcodeproj/project.pbxproj
Expand Up @@ -51,6 +51,7 @@
926794FF1D06199E004B0BCD /* TwoWayTest.swift in Sources */ = {isa = PBXBuildFile; fileRef = 926794FE1D06199E004B0BCD /* TwoWayTest.swift */; };
9278C9AC2098B09300861C79 /* SubscribeAudioDelayTest.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9278C9AB2098B09300861C79 /* SubscribeAudioDelayTest.swift */; };
9278C9B02098E1BE00861C79 /* PublishBackgroundTest.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9278C9AF2098E1BE00861C79 /* PublishBackgroundTest.swift */; };
927FA28A20EBCD2D00F16228 /* TwoWayStreamManagerTest.swift in Sources */ = {isa = PBXBuildFile; fileRef = 927FA28920EBCD2D00F16228 /* TwoWayStreamManagerTest.swift */; };
928B1CBD1CBC0DC5002C66BA /* SubscribeStreamManagerTest.swift in Sources */ = {isa = PBXBuildFile; fileRef = 928B1CBC1CBC0DC5002C66BA /* SubscribeStreamManagerTest.swift */; };
92A642E3200D40AC00882600 /* PublishAspectTest.swift in Sources */ = {isa = PBXBuildFile; fileRef = 92A642E2200D40AC00882600 /* PublishAspectTest.swift */; };
92A642E5200D416600882600 /* SubscribeBackgroundTest.swift in Sources */ = {isa = PBXBuildFile; fileRef = 92A642E4200D416500882600 /* SubscribeBackgroundTest.swift */; };
Expand Down Expand Up @@ -118,6 +119,7 @@
926794FE1D06199E004B0BCD /* TwoWayTest.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; name = TwoWayTest.swift; path = Tests/TwoWay/TwoWayTest.swift; sourceTree = "<group>"; };
9278C9AB2098B09300861C79 /* SubscribeAudioDelayTest.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; name = SubscribeAudioDelayTest.swift; path = Tests/SubscribeAudioDelay/SubscribeAudioDelayTest.swift; sourceTree = "<group>"; };
9278C9AF2098E1BE00861C79 /* PublishBackgroundTest.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; name = PublishBackgroundTest.swift; path = Tests/PublishBackground/PublishBackgroundTest.swift; sourceTree = "<group>"; };
927FA28920EBCD2D00F16228 /* TwoWayStreamManagerTest.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; name = TwoWayStreamManagerTest.swift; path = Tests/TwoWayStreamManager/TwoWayStreamManagerTest.swift; sourceTree = "<group>"; };
928B1CBC1CBC0DC5002C66BA /* SubscribeStreamManagerTest.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; name = SubscribeStreamManagerTest.swift; path = Tests/SubscribeStreamManager/SubscribeStreamManagerTest.swift; sourceTree = "<group>"; };
92A642E2200D40AC00882600 /* PublishAspectTest.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; name = PublishAspectTest.swift; path = Tests/PublishAspect/PublishAspectTest.swift; sourceTree = "<group>"; };
92A642E4200D416500882600 /* SubscribeBackgroundTest.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; name = SubscribeBackgroundTest.swift; path = Tests/SubscribeBackground/SubscribeBackgroundTest.swift; sourceTree = "<group>"; };
Expand Down Expand Up @@ -259,6 +261,7 @@
928B1CBC1CBC0DC5002C66BA /* SubscribeStreamManagerTest.swift */,
92CC78081CB6F8DD00CD4352 /* SubscribeTwoStreams.swift */,
926794FE1D06199E004B0BCD /* TwoWayTest.swift */,
927FA28920EBCD2D00F16228 /* TwoWayStreamManagerTest.swift */,
);
name = Tests;
sourceTree = "<group>";
Expand Down Expand Up @@ -375,6 +378,7 @@
92EBEF08212F1B6F003B72FB /* BandwidthDetectionUploadOnlyTest.swift in Sources */,
9278C9AC2098B09300861C79 /* SubscribeAudioDelayTest.swift in Sources */,
926794FB1D061968004B0BCD /* SubscribeCluster.swift in Sources */,
927FA28A20EBCD2D00F16228 /* TwoWayStreamManagerTest.swift in Sources */,
926794F51D061900004B0BCD /* PublishRemoteCallTest.swift in Sources */,
4874ECD01C483FDA002BDA6D /* SubscribeStreamImageTest.swift in Sources */,
485345091C23552F00D409F3 /* CameraSwapTest.swift in Sources */,
Expand Down
71 changes: 71 additions & 0 deletions R5ProTestbed/Tests/TwoWayStreamManager/README.md
@@ -0,0 +1,71 @@
#Two Way with the Stream Manager

With clustering, we need to determine which red5 pro instance the client will use. The other examples used a static configuration ip for streaming endpoints. Basic clustering uses more than one stream endpoint for subscribers. Advanced clustering uses more than one endpoint for publishers also.

With the Stream Manager, our configuration ip will be used similarly for publishers and subscribers. Both publishers and subscribers will call a web service to receive the ip that should be used.

###Example Code
- ***[BaseTest.swift](../BaseTest.swift)***
- ***[TwoWayStreamManagerTest.swift](TwoWayStreamManagerTest.swift)***

###Running the Example
Like the other Two Way example, you need two devices running it, and the second will need to hit `swap streams` in the home settings, so that they're publishing and subscribing to each other. You will also need to have pointed the app to a properly deployed cluster origin server.

###Setup
In order to stream, you first need to connect to the origin server's Stream Manager. The Stream Manager will know which edges are active and provide the one that needs to be published to. For the publisher we add the action `broadcast` to the web call, while we send `subscribe` for the subscribers.

```Swift
let originURI = "http://" + (Testbed.getParameter(param: "host") as! String) + portURI + "/streammanager/api/2.0/event/" +
(Testbed.getParameter(param: "context") as! String) + "/" + streamName + "?action=" + action

NSURLConnection.sendAsynchronousRequest(
NSURLRequest( url: NSURL(string: originURI)! as URL ) as URLRequest,
queue: OperationQueue(),
completionHandler:{ (response: URLResponse?, data: Data?, error: Error?) -> Void in
```
<sup>
[TwoWayStreamManagerTest.swift #30](TwoWayStreamManagerTest.swift#L30)
</sup>

The service returns a json object with the information needed to connect to publish.

```Swift
do{
json = try JSONSerialization.jsonObject(with: data!, options: JSONSerialization.ReadingOptions()) as! [String: AnyObject]
}catch{
print(error)
return
}

if let ip = json["serverAddress"] as? String {
```
<sup>
[TwoWayStreamManagerTest.swift #45](TwoWayStreamManagerTest.swift#L45)
</sup>

###Knowing When to Subscribe
Like with any stream, you can't subscribe to a stream until it's been published. To know what streams are available to subscribe to with clustering, use the `list` function of the Stream Manager api.

```Swift
let url = "http://" + domain + ":5080/streammanager/api/2.0/event/list"
let request = URLRequest.init(url: URL.init(string: url)!)

NSURLConnection.sendAsynchronousRequest(request, queue: OperationQueue.init(), completionHandler: { (response: URLResponse?, data: Data?, error: Error?) -> Void in
```
<sup>
[TwoWayStreamManagerTest.swift #69](TwoWayStreamManagerTest.swift#L69)
</sup>

Like using `streams.jsp` on a solo server, on success this returns a JSON array of dictionaries. For our purposes, the only property we care about in the dictionary is `name` - as we need to compare it against the name we've set up to subscribe to.

```Swift
let list = try JSONSerialization.jsonObject(with: data!) as! Array<Dictionary<String, String>>;

for dict:Dictionary<String, String> in list {
if(dict["name"] == (Testbed.getParameter(param: "stream2") as! String)){
```
<sup>
[TwoWayStreamManagerTest.swift #80](TwoWayStreamManagerTest.swift#L80)
</sup>

For more information on this and other parts of the Stream Manager API, see our dcumentation [here](https://www.red5pro.com/docs/autoscale/streammanagerapi-v2.html)
159 changes: 159 additions & 0 deletions R5ProTestbed/Tests/TwoWayStreamManager/TwoWayStreamManagerTest.swift
@@ -0,0 +1,159 @@
//
// TwoWayStreamManagerTest.swift
// R5ProTestbed
//
// Created by David Heimann on 7/2/18.
// Copyright © 2018 Infrared5. All rights reserved.
//

import UIKit
import R5Streaming

@objc(TwoWayStreamManagerTest)
class TwoWayStreamManagerTest: BaseTest {
var publishView : R5VideoViewController? = nil
var timer : Timer? = nil

override func viewDidAppear(_ animated: Bool) {
super.viewDidAppear(animated)

requestServer(Testbed.getParameter(param: "stream1") as! String, action: "broadcast", resolve: { (url) in
self.publishTo(url: url)
})
callForStreamList()
}

func requestServer(_ streamName: String, action: String, resolve: @escaping (_ ip: String) -> Void) {

let port = (Testbed.getParameter(param: "server_port") as! String)
let portURI = port == "80" ? "" : ":" + port
let originURI = "http://" + (Testbed.getParameter(param: "host") as! String) + portURI + "/streammanager/api/2.0/event/" +
(Testbed.getParameter(param: "context") as! String) + "/" + streamName + "?action=" + action

NSURLConnection.sendAsynchronousRequest(
NSURLRequest( url: NSURL(string: originURI)! as URL ) as URLRequest,
queue: OperationQueue(),
completionHandler:{ (response: URLResponse?, data: Data?, error: Error?) -> Void in

if ((error) != nil) {
print(error!)
return
}

// The string above is in JSON format, we specifically need the serverAddress value
var json: [String: AnyObject]
do{
json = try JSONSerialization.jsonObject(with: data!, options: JSONSerialization.ReadingOptions()) as! [String: AnyObject]
}catch{
print(error)
return
}

if let ip = json["serverAddress"] as? String {
resolve(ip)
}
else if let errorMessage = json["errorMessage"] as? String {
print(AccessError.error(message: errorMessage))
}

})
}

func delayCallForList() {
self.timer = Timer.scheduledTimer(timeInterval: 1, target: self, selector: #selector(callForStreamList), userInfo: nil, repeats: false)
}

func callForStreamList(){

let domain = Testbed.getParameter(param: "host") as! String
let url = "http://" + domain + ":5080/streammanager/api/2.0/event/list"
let request = URLRequest.init(url: URL.init(string: url)!)

NSURLConnection.sendAsynchronousRequest(request, queue: OperationQueue.init(), completionHandler: { (response: URLResponse?, data: Data?, error: Error?) -> Void in

//parse the response
if (error != nil) {
NSLog("Error, %@", error!.localizedDescription)
} else {

do{
let list = try JSONSerialization.jsonObject(with: data!) as! Array<Dictionary<String, String>>;

for dict:Dictionary<String, String> in list {
if(dict["name"] == (Testbed.getParameter(param: "stream2") as! String)){
self.requestServer(Testbed.getParameter(param: "stream2") as! String, action: "subscribe", resolve: { (url) in
self.subscribeTo(url: url)
})
return
}
}


}catch let error as NSError {
print(error)
}
}

self.delayCallForList()
})
}

func publishTo( url: String ){
let config = getConfig()
config.host = url

// Create a new connection using the configuration above
let connection = R5Connection(config: config)

// UI updates must be on the main queue
DispatchQueue.main.async(execute: {
// Create our new stream that will utilize that connection
self.setupPublisher(connection: connection!)

// show preview and debug info
self.publishView!.attach(self.publishStream!)

self.publishStream!.publish(Testbed.getParameter(param: "stream1") as! String, type: R5RecordTypeLive)

let label = UILabel(frame: CGRect(x: 0, y: self.view.frame.height-24, width: self.view.frame.width, height: 24))
label.textAlignment = NSTextAlignment.left
label.backgroundColor = UIColor.lightGray
label.text = "Pub Connected to: " + url
self.view.addSubview(label)
})
}

func subscribeTo( url: String ) {
let config = getConfig()
config.host = url

// Create a new connection using the configuration above
let connection = R5Connection(config: config)

// UI updates must be on the main queue
DispatchQueue.main.async(execute: {
// Create our new stream that will utilize that connection
self.subscribeStream = R5Stream(connection: connection)

// show preview and debug info
self.currentView?.attach(self.subscribeStream!)

self.subscribeStream!.play(Testbed.getParameter(param: "stream2") as! String)

let label = UILabel(frame: CGRect(x: 0, y: 0, width: self.view.frame.width, height: 24))
label.textAlignment = NSTextAlignment.left
label.backgroundColor = UIColor.lightGray
label.text = "Sub Connected to: " + url
self.view.addSubview(label)
})
}

override func closeTest() {

if( self.timer != nil ){
self.timer!.invalidate();
}

super.closeTest()
}
}
11 changes: 11 additions & 0 deletions R5ProTestbed/tests.plist
Expand Up @@ -467,6 +467,17 @@
<key>class</key>
<string>TwoWayTest</string>
</dict>
<key>TwoWay - Stream Manager</key>
<dict>
<key>description</key>
<string>Publish and subscribe simultaneously through a stream manager</string>
<key>LocalProperties</key>
<dict/>
<key>name</key>
<string>Two Way - Stream Manager</string>
<key>class</key>
<string>TwoWayStreamManagerTest</string>
</dict>
</dict>
</dict>
</plist>
3 changes: 3 additions & 0 deletions Readme.md
Expand Up @@ -81,6 +81,9 @@ Once you have modified your settings, you can run the application for simulator
| **[Two Way](R5ProTestbed/Tests/TwoWay)**
| *An example of simultaneously publishing while subscribing - allowing a conversation. Includes stream detection and auto-connection.*
| -
| **[Two Way - Stream Manager](R5ProTestbed/Tests/TwoWayStreamManager)**
| *The two way example, modified to work with a stream manager. Includes stream detection and auto-connection.*
| -
| **[Shared Object](R5ProTestbed/Tests/SharedObject)**
| *An example of sending data and messages between clients through remote shared objects.*
| -
Expand Down

0 comments on commit fe5c03a

Please sign in to comment.