-
-
Notifications
You must be signed in to change notification settings - Fork 151
/
rtsp_event.py
147 lines (130 loc) · 5.61 KB
/
rtsp_event.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import sys
try:
import os
import signal
import time
from datetime import datetime as dt
except ImportError:
sys.exit(1)
class RtspEvent:
"""Handle events from rtsp-simple-server."""
def __init__(self) -> None:
"""Run the script until it receives a termination signal."""
for sig in ("SIGQUIT", "SIGTERM", "SIGINT"):
signal.signal(getattr(signal, sig), lambda n, f: self.clean_up())
self.uri: str
self.type: str
self.__dict__.update(
dict(zip(["uri", "type", "mac", "model", "firmware"], sys.argv[1:]))
)
self.mqtt_connect()
def write_log(self, txt: str) -> None:
"""Format and print logging messages to stdout."""
date = dt.now().strftime("%Y/%m/%d %X")
print(date, f"[RTSP][{self.uri.upper()}] {txt}")
def pub_start(self) -> None:
"""Handle a 'READY' event when publishing a stream to rtsp-simple-server."""
self.write_log(f"✅ '/{self.uri}' stream is UP! (3/3)")
img_file = os.getenv("IMG_PATH", "/img/") + self.uri + ".jpg"
env_snap = os.getenv("SNAPSHOT", "NA").ljust(5, "0").upper()
self.send_mqtt("image", None)
# if env_bool("HASS"):
# import json
# if (host := env_bool("HOSTNAME", "localhost")) == "homeassistant":
# host += ".local"
# self.send_mqtt(
# "attributes",
# json.dumps(
# {
# "stream": f"rtsp://{host}:8554/{self.uri}",
# "image": f"http://{host}:8123/local/{self.uri}.jpg",
# }
# ),
# )
if rtsp := (env_snap[:4] == "RTSP"):
from subprocess import Popen, TimeoutExpired
rtsp_addr = "127.0.0.1:8554"
if env_bool(f"RTSP_PATHS_{self.uri.upper()}_READUSER"):
rtsp_addr = (
env_bool(f"RTSP_PATHS_{self.uri.upper()}_READUSER")
+ ":"
+ env_bool(f"RTSP_PATHS_{self.uri.upper()}_READPASS")
+ f"@{rtsp_addr}"
)
ffmpeg_cmd = f"ffmpeg -loglevel fatal -skip_frame nokey -rtsp_transport tcp -i rtsp://{rtsp_addr}/{self.uri} -vframes 1 -y {img_file}"
while True:
self.send_mqtt("state", "online")
if rtsp:
ffmpeg_sub = Popen(ffmpeg_cmd.split())
try:
ffmpeg_sub.wait(25)
except TimeoutExpired:
ffmpeg_sub.kill()
self.write_log("snapshot timed out")
continue
if os.path.exists(img_file) and os.path.getsize(img_file) > 1:
with open(img_file, "rb") as img:
self.send_mqtt("image", img.read())
time.sleep(int(env_snap[4:]) if env_snap[4:].isdigit() else 180)
def read_start(self) -> None:
"""Handle 'READ' events when a client starts consuming a stream fromrtsp-simple-server."""
self.write_log("📖 New client reading ")
self.send_mqtt(f"clients/{os.getpid()}", "reading")
signal.pause()
def mqtt_connect(self) -> None:
"""Connect to an MQTT if the env option is enabled."""
if not env_bool("MQTT_HOST"):
self.mqtt_connected = False
return
try:
import paho.mqtt.client as mqtt
except ImportError:
return
self.base = f"wyzebridge/{self.uri}/"
if env_bool("MQTT_TOPIC"):
self.base = f'{env_bool("MQTT_TOPIC")}/{self.base}'
host = os.getenv("MQTT_HOST").split(":")
self.mqtt = mqtt.Client()
if env_bool("MQTT_AUTH"):
auth = os.getenv("MQTT_AUTH").split(":")
self.mqtt.username_pw_set(auth[0], auth[1] if len(auth) > 1 else None)
if self.type == "READY":
self.mqtt.will_set(self.base + "state", "disconnected", 0, True)
if self.type == "READ":
self.mqtt.will_set(self.base + f"clients/{os.getpid()}", None, 0, True)
try:
self.mqtt.connect(host[0], int(host[1] if len(host) > 1 else 1883), 60)
self.mqtt.loop_start()
self.mqtt_connected = True
except Exception as ex:
self.write_log(f"[MQTT] {ex}")
def send_mqtt(self, topic: str, message: str) -> None:
"""Publish a message to the MQTT server."""
if self.mqtt_connected:
if message:
self.mqtt.publish(self.base + topic, message)
else:
self.mqtt.publish(self.base + topic, None, 0, True)
def clean_up(self) -> None:
"""Update the log and MQTT status when a termination signal is received."""
if self.type == "READY":
self.write_log(f"❌ '/{self.uri}' stream is down")
self.send_mqtt("state", "offline")
self.send_mqtt("attributes", None)
self.send_mqtt(f"clients/{os.getpid()}", None)
elif self.type == "READ":
self.write_log("📕 Client stopped reading")
self.send_mqtt(f"clients/{os.getpid()}", None)
if self.mqtt_connected:
self.mqtt.disconnect()
self.mqtt.loop_stop()
sys.exit(0)
def env_bool(env: str, false: str = "") -> str:
"""Return env variable or empty string if the variable contains 'false' or is empty."""
return os.getenv(env.upper(), "").lower().replace("false", "") or false
if __name__ == "__main__" and len(sys.argv) > 2:
rtsp = RtspEvent()
if rtsp.type == "READY":
rtsp.pub_start()
elif rtsp.type == "READ":
rtsp.read_start()