-
Notifications
You must be signed in to change notification settings - Fork 20
/
publish_hue.py
96 lines (74 loc) · 2.61 KB
/
publish_hue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import asyncio
import colorsys
import logging
import os
from signal import SIGINT, SIGTERM
import numpy as np
from livekit import api, rtc
WIDTH, HEIGHT = 1280, 720
# ensure LIVEKIT_URL, LIVEKIT_API_KEY, and LIVEKIT_API_SECRET are set
async def main(room: rtc.Room):
token = (
api.AccessToken()
.with_identity("python-publisher")
.with_name("Python Publisher")
.with_grants(
api.VideoGrants(
room_join=True,
room="my-room",
)
)
.to_jwt()
)
url = os.getenv("LIVEKIT_URL")
logging.info("connecting to %s", url)
try:
await room.connect(url, token)
logging.info("connected to room %s", room.name)
except rtc.ConnectError as e:
logging.error("failed to connect to the room: %s", e)
return
# publish a track
source = rtc.VideoSource(WIDTH, HEIGHT)
track = rtc.LocalVideoTrack.create_video_track("hue", source)
options = rtc.TrackPublishOptions()
options.source = rtc.TrackSource.SOURCE_CAMERA
publication = await room.local_participant.publish_track(track, options)
logging.info("published track %s", publication.sid)
asyncio.ensure_future(draw_color_cycle(source))
async def draw_color_cycle(source: rtc.VideoSource):
argb_frame = bytearray(WIDTH * HEIGHT * 4)
arr = np.frombuffer(argb_frame, dtype=np.uint8)
framerate = 1 / 30
hue = 0.0
while True:
start_time = asyncio.get_event_loop().time()
rgb = colorsys.hsv_to_rgb(hue, 1.0, 1.0)
rgb = [(x * 255) for x in rgb] # type: ignore
argb_color = np.array(rgb + [255], dtype=np.uint8)
arr.flat[::4] = argb_color[0]
arr.flat[1::4] = argb_color[1]
arr.flat[2::4] = argb_color[2]
arr.flat[3::4] = argb_color[3]
frame = rtc.VideoFrame(WIDTH, HEIGHT, rtc.VideoBufferType.RGBA, argb_frame)
source.capture_frame(frame)
hue = (hue + framerate / 3) % 1.0
code_duration = asyncio.get_event_loop().time() - start_time
await asyncio.sleep(1 / 30 - code_duration)
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
handlers=[logging.FileHandler("publish_hue.log"), logging.StreamHandler()],
)
loop = asyncio.get_event_loop()
room = rtc.Room(loop=loop)
async def cleanup():
await room.disconnect()
loop.stop()
asyncio.ensure_future(main(room))
for signal in [SIGINT, SIGTERM]:
loop.add_signal_handler(signal, lambda: asyncio.ensure_future(cleanup()))
try:
loop.run_forever()
finally:
loop.close()