Skip to content

Commit

Permalink
Update to MediaMTX 1.8.1
Browse files Browse the repository at this point in the history
  • Loading branch information
mrlt8 committed May 18, 2024
1 parent fb45512 commit 9d41def
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 91 deletions.
2 changes: 1 addition & 1 deletion app/.env
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
VERSION=2.9.2
MTX_TAG=1.1.1
MTX_TAG=1.8.1
IOS_VERSION=17.1.1
APP_VERSION=2.50.6.1
MTX_HLSVARIANT=fmp4
Expand Down
6 changes: 3 additions & 3 deletions app/frontend.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,12 +233,12 @@ def restart_bridge(restart_cmd: str):
"""
if restart_cmd == "cameras":
wb.streams.stop_all()
wb.streams.monitor_streams(wb.rtsp.health_check)
wb.streams.monitor_streams(wb.mtx.health_check)
elif restart_cmd == "rtsp_server":
wb.rtsp.restart()
wb.mtx.restart()
elif restart_cmd == "all":
wb.streams.stop_all()
wb.rtsp.stop()
wb.mtx.stop()
wb.run(fresh_data=True)
restart_cmd = "cameras,rtsp_server"
else:
Expand Down
1 change: 1 addition & 0 deletions app/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ paho-mqtt
pydantic
python-dotenv
requests
pyyaml
xxtea # windows .whl currently only exist for Python 3.8.x
16 changes: 10 additions & 6 deletions app/static/webrtc.js
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class Receiver {
onOpen() {
const direction = this.whep ? "sendrecv" : "recvonly";

this.pc = new RTCPeerConnection({ iceServers: this.signalJson.servers });
this.pc = new RTCPeerConnection({ iceServers: this.signalJson.servers, sdpSemantics: 'unified-plan' });
this.pc.addTransceiver("video", { direction });
this.pc.addTransceiver("audio", { direction });
this.pc.ontrack = (evt) => this.onTrack(evt);
Expand Down Expand Up @@ -121,18 +121,22 @@ class Receiver {
method: 'PATCH',
headers: {
'Content-Type': 'application/trickle-ice-sdpfrag',
'If-Match': this.eTag,
'If-Match': '*',
},
body: generateSdpFragment(this.offerData, candidates),
})
.then((res) => {
if (res.status !== 204) {
throw new Error('bad status code');
switch (res.status) {
case 204:
break;
case 404:
throw new Error('stream not found');
default:
throw new Error(`bad status code ${res.status}`);
}
})
.catch((err) => {
console.error('error: ' + err);
this.scheduleRestart();
onError(err.toString());
});
}

Expand Down
16 changes: 8 additions & 8 deletions app/wyze_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,18 @@ def __init__(self) -> None:
print(f"\n🚀 DOCKER-WYZE-BRIDGE v{config.VERSION} {config.BUILD_STR}\n")
self.api: WyzeApi = WyzeApi()
self.streams: StreamManager = StreamManager()
self.rtsp: MtxServer = MtxServer(config.BRIDGE_IP)
self.mtx: MtxServer = MtxServer(config.BRIDGE_IP, config.WB_API)

if config.LLHLS:
self.rtsp.setup_llhls(config.TOKEN_PATH, bool(config.HASS_TOKEN))
self.mtx.setup_llhls(config.TOKEN_PATH, bool(config.HASS_TOKEN))

def run(self, fresh_data: bool = False) -> None:
self.api.login(fresh_data=fresh_data)
self.setup_streams()
if self.streams.total < 1:
return signal.raise_signal(signal.SIGINT)
self.rtsp.start()
self.streams.monitor_streams(self.rtsp.health_check)
self.mtx.start()
self.streams.monitor_streams(self.mtx.health_check)

def setup_streams(self):
"""Gather and setup streams for each camera."""
Expand All @@ -54,15 +54,15 @@ def setup_streams(self):
stream = WyzeStream(cam, options)
stream.rtsp_fw_enabled = self.rtsp_fw_proxy(cam, stream)

self.rtsp.add_path(stream.uri, not options.reconnect, config.WB_API)
self.mtx.add_path(stream.uri, not options.reconnect)
self.streams.add(stream)

def rtsp_fw_proxy(self, cam: WyzeCamera, stream: WyzeStream) -> bool:
if rtsp_fw := env_bool("rtsp_fw").lower():
if rtsp_path := stream.check_rtsp_fw(rtsp_fw == "force"):
rtsp_uri = f"{cam.name_uri}-fw"
logger.info(f"Adding /{rtsp_uri} as a source")
self.rtsp.add_source(rtsp_uri, rtsp_path)
self.mtx.add_source(rtsp_uri, rtsp_path)
return True
return False

Expand All @@ -75,7 +75,7 @@ def add_substream(self, cam: WyzeCamera, options: WyzeStreamOptions):
record = bool(env_cam("sub_record", cam.name_uri))
sub_opt = replace(options, substream=True, quality=quality, record=record)
sub = WyzeStream(cam, sub_opt)
self.rtsp.add_path(sub.uri, not options.reconnect, config.WB_API)
self.mtx.add_path(sub.uri, not options.reconnect)
self.streams.add(sub)

def clean_up(self, *_):
Expand All @@ -84,7 +84,7 @@ def clean_up(self, *_):
sys.exit(0)
if self.streams:
self.streams.stop_all()
self.rtsp.stop()
self.mtx.stop()
logger.info("👋 goodbye!")
sys.exit(0)

Expand Down
2 changes: 2 additions & 0 deletions app/wyzebridge/mtx_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ def log_event(self, event_data: str):

if event == "start":
self.streams.get(uri).start()
elif event == "stop":
self.streams.get(uri).stop()
elif event in {"read", "unread"}:
read_event(uri, event)
elif event in {"ready", "notready"}:
Expand Down
176 changes: 103 additions & 73 deletions app/wyzebridge/mtx_server.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,61 @@
from os import environ, getenv
from os import getenv
from pathlib import Path
from signal import SIGINT
from subprocess import DEVNULL, Popen
from typing import Optional, Protocol
from typing import Optional

import yaml
from wyzebridge.logging import logger


class MtxInterface(Protocol):
def set(self, uri: str, path: str, value: str) -> None: ...

def get(self, uri: str, path: str) -> Optional[str]: ...

def set_opt(self, option: str, value: str) -> None: ...

def get_opt(self, option: str) -> Optional[str]: ...


class MtxEnv:
"""Use environment variables to interface with mediamtx."""

def set(self, uri: str, path: str, value: str) -> None:
env = f"MTX_PATHS_{uri}_{path}".upper()
if not getenv(env):
environ[env] = value

def get(self, uri: str, path: str) -> Optional[str]:
env = f"MTX_PATHS_{{}}_{path}".upper()
return getenv(env.format(uri.upper()), getenv(env.format("ALL")))

def set_opt(self, option: str, value: str) -> None:
env = f"MTX_{option}".upper()
if not getenv(env):
environ[env] = value

def get_opt(self, option: str) -> Optional[str]:
return getenv(f"MTX_{option}".upper())
MTX_CONFIG = "/app/mediamtx.yml"


class MtxInterface:
def __init__(self):
self.data = {}
self._modified = False

def __enter__(self):
self._load_config()
return self

def __exit__(self, exc_type, exc_value, traceback):
if self._modified:
self._save_config()

def _load_config(self):
with open(MTX_CONFIG, "r") as f:
self.data = yaml.safe_load(f) or {}

def _save_config(self):
with open(MTX_CONFIG, "w") as f:
yaml.safe_dump(self.data, f)

def get(self, path: str):
keys = path.split(".")
current = self.data
for key in keys:
if current is None:
return None
current = current.get(key)
return current

def set(self, path: str, value):
keys = path.split(".")
current = self.data
for key in keys[:-1]:
current = current.setdefault(key, {})
current[keys[-1]] = value
self._modified = True

def add(self, path: str, value):
current = self.data.get(path)
if isinstance(current, list):
if not isinstance(value, list):
value = [value]
current.extend([item for item in value if item not in current])
else:
self.data[path] = value


class MtxServer:
Expand All @@ -44,34 +64,43 @@ class MtxServer:
__slots__ = "rtsp", "sub_process"

def __init__(
self,
bridge_ip: Optional[str] = None,
mtx_interface: MtxInterface = MtxEnv(),
self, bridge_ip: Optional[str] = None, api_auth: Optional[str] = None
) -> None:
self.rtsp: MtxInterface = mtx_interface
self.sub_process: Optional[Popen] = None
self._setup(api_auth)
if bridge_ip:
self.setup_webrtc(bridge_ip)

def _setup(self, api_auth: Optional[str]):
publisher = [
{
"ips": ["127.0.0.1"],
"permissions": [{"action": "read"}, {"action": "publish"}],
}
]
with MtxInterface() as mtx:
mtx.set("paths", {})
mtx.set("authInternalUsers", publisher)
for event in {"Read", "Unread", "Ready", "NotReady"}:
bash_cmd = f"echo $MTX_PATH,{event}! > /tmp/mtx_event;"
mtx.set(f"pathDefaults.runOn{event}", f"bash -c '{bash_cmd}'")
# mtx.set(f"pathDefaults.runOnDemandStartTimeout", "30s")
# mtx.set(f"pathDefaults.runOnDemandCloseAfter", "60s")
client: dict = {"permissions": [{"action": "read"}]}
if api_auth:
client.update({"user": "wb", "pass": api_auth})
mtx.add("authInternalUsers", client)

def add_path(self, uri: str, on_demand: bool = True, auth: str = ""):
for event in {"Read", "Unread", "Ready", "NotReady"}:
bash_cmd = f"echo $MTX_PATH,{event}! > /tmp/mtx_event;"
self.rtsp.set(uri, f"RunOn{event}", f"bash -c '{bash_cmd}'")
if on_demand:
cmd = "bash -c 'echo $MTX_PATH,start! > /tmp/mtx_event'"
self.rtsp.set(uri, "runOnDemand", cmd)
self.rtsp.set(uri, "runOnDemandStartTimeout", "30s")
self.rtsp.set(uri, "runOnDemandCloseAfter", "60s")
if read_user := self.rtsp.get(uri, "readUser"):
self.rtsp.set(uri, "readUser", read_user)
if read_pass := self.rtsp.get(uri, "readPass"):
self.rtsp.set(uri, "readPass", read_pass)
elif auth:
self.rtsp.set(uri, "readUser", "wb")
self.rtsp.set(uri, "readPass", auth)
with MtxInterface() as mtx:
if on_demand:
cmd = f"bash -c 'echo $MTX_PATH,{{}}! > /tmp/mtx_event'"
mtx.set(f"paths.{uri}.runOnDemand", cmd.format("start"))
mtx.set(f"paths.{uri}.runOnUnDemand", cmd.format("stop"))

def add_source(self, uri: str, value: str):
self.rtsp.set(uri, "source", value)
with MtxInterface() as mtx:
mtx.set(f"paths.{uri}.source", value)

def start(self):
if self.sub_process:
Expand Down Expand Up @@ -102,30 +131,31 @@ def setup_webrtc(self, bridge_ip: str):
if not bridge_ip:
logger.warning("SET WB_IP to allow WEBRTC connections.")
return
logger.debug(f"Using {bridge_ip} for webrtc")
self.rtsp.set_opt("webrtcICEHostNAT1To1IPs", bridge_ip)
if self.sub_process:
self.restart()
ips = bridge_ip.split(",")
logger.debug(f"Using {' and '.join(ips)} for webrtc")
with MtxInterface() as mtx:
mtx.add("webrtcAdditionalHosts", ips)

def setup_llhls(self, token_path: str = "/tokens/", hass: bool = False):
logger.info("Configuring LL-HLS")
self.rtsp.set_opt("hlsVariant", "lowLatency")
self.rtsp.set_opt("hlsEncryption", "yes")
if self.rtsp.get_opt("hlsServerKey"):
return

key = "/ssl/privkey.pem"
cert = "/ssl/fullchain.pem"
if hass and Path(key).is_file() and Path(cert).is_file():
logger.info("🔐 Using existing SSL certificate from Home Assistant")
self.rtsp.set_opt("hlsServerKey", key)
self.rtsp.set_opt("hlsServerCert", cert)
return

cert_path = f"{token_path}hls_server"
generate_certificates(cert_path)
self.rtsp.set_opt("hlsServerKey", f"{cert_path}.key")
self.rtsp.set_opt("hlsServerCert", f"{cert_path}.crt")
with MtxInterface() as mtx:
mtx.set("hlsVariant", "lowLatency")
mtx.set("hlsEncryption", "yes")
if mtx.get("hlsServerKey"):
return

key = "/ssl/privkey.pem"
cert = "/ssl/fullchain.pem"
if hass and Path(key).is_file() and Path(cert).is_file():
logger.info("🔐 Using existing SSL certificate from Home Assistant")
mtx.set("hlsServerKey", key)
mtx.set("hlsServerCert", cert)
return

cert_path = f"{token_path}hls_server"
generate_certificates(cert_path)
mtx.set("hlsServerKey", f"{cert_path}.key")
mtx.set("hlsServerCert", f"{cert_path}.crt")


def mtx_version() -> str:
Expand Down

0 comments on commit 9d41def

Please sign in to comment.