-
Notifications
You must be signed in to change notification settings - Fork 1
/
main.py
61 lines (50 loc) · 2.11 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
from typing import List
from stream.stream_model import StreamModel
from stream.stream_repository import StreamRepository
from common.utilities import logger, crate_redis_connection, RedisDb
from common.data.heartbeat_repository import HeartbeatRepository
from core.source_reader import start
# def get_id():
# val = shortuuid.uuid()[:11]
# return val
def check_streams() -> List[StreamModel]:
# user = 'admin'
# pwd = 'a12345678'
# ip = '192.168.0.108'
# total_camera_count = 5
# for j in range(total_camera_count):
# rep.add(DahuaDvrRtspModel(get_id(), j + 1, user, pwd, ip))
connection = crate_redis_connection(RedisDb.MAIN)
rep = StreamRepository(connection)
# # rep.flush_db()
#
# source = Source(get_id(), 'Concord IPC', 'concord', ConcordIpcRtspModel(get_id(), 'concord', 'admin', 'admin123456',
# '192.168.0.19').create_rtsp_address())
# rep.add(source)
#
# source = Source(get_id(), 'Anker Eufy Security 2K', 'anker eufy',
# AnkerEufyModel(get_id(), 'anker eufy', 'Admin1', 'Admin1', '192.168.0.15').create_rtsp_address())
# rep.add(source)
#
# camera_count = 5
# ip_start = 26
# for j in range(camera_count):
# source = Source(get_id(), 'Tapo C200 1080P', f'tapo 200 ({j + 1})',
# TapoC200(get_id(), f'tapo 200 ({j + 1})', 'admin12', 'admin12',
# f'192.168.0.{ip_start + j}').create_rtsp_address())
# rep.add(source)
streams = rep.get_all()
for stream in streams:
if stream.is_opencv_persistent_snapshot_enabled():
print(stream.address)
return streams
def main():
service_name = 'cv2_read_service'
heartbeat = HeartbeatRepository(crate_redis_connection(RedisDb.MAIN), service_name)
heartbeat.start()
if len(check_streams()) == 0:
logger.warning(f'No stream has been found for {service_name}, which is now is exiting...')
logger.info(f'{service_name} will start soon')
start()
if __name__ == '__main__':
main()