Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup streaming app sample #173

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
322 changes: 182 additions & 140 deletions samples/apps/video-streaming-app/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,180 +19,222 @@
from kubernetes import client, config
import re

camera_frame_queues = []
small_frame_sources = []
class CameraFeed:
def __init__(self, url):
global global_stop_event

self.url = url
self.queue = queue.Queue(1)
self.thread = None
self.stop_event = global_stop_event

def __eq__(self, other):
if other is None:
return False
return self.url == other.url

def StartHandler(self):
jiria marked this conversation as resolved.
Show resolved Hide resolved
self.thread = threading.Thread(target=self.GetFrames)
self.thread.start()

def WaitHandler(self):
if self.thread is not None:
self.thread.join()

# Generator function for video streaming.
def GeneratorFunc(self):
while not self.stop_event.wait(0.01):
frame = self.queue.get(True, None)
yield (b'--frame\r\nContent-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')

# Loops, creating gRPC client and grabing frame from camera serving specified url.
def GetFrames(self):
logging.info("Starting get_frames(%s)" % self.url)
while not self.stop_event.wait(0.01):
try:
client_channel = grpc.insecure_channel(self.url, options=(
('grpc.use_local_subchannel_pool', 1),))
camera_stub = camera_pb2_grpc.CameraStub(client_channel)
frame = camera_stub.GetFrame(camera_pb2.NotifyRequest())
frame = frame.frame
client_channel.close()

frame_received = False
# prevent stale data
if (len(frame) > 0):
if (self.queue.full()):
try:
self.queue.get(False)
except:
pass
self.queue.put(frame, False)
frame_received = True

if (frame_received):
sleep(1)

except:
logging.info("[%s] Exception %s" % (self.url, traceback.format_exc()))
sleep(1)

def get_camera_list(configuration_name):
camera_list = []
class CameraDisplay:
def __init__(self):
self.main_camera = None
self.small_cameras = []
self.mutex = threading.Lock()

def __eq__(self, other):
return self.main_camera == other.main_camera and self.small_cameras == other.small_cameras

def StartHandlers(self):
if self.main_camera is not None:
self.main_camera.StartHandler()
for small_camera in self.small_cameras:
small_camera.StartHandler()

def WaitHandlers(self):
global global_stop_event

global_stop_event.set()
if self.main_camera is not None:
self.main_camera.WaitHandler()
for small_camera in self.small_cameras:
small_camera.WaitHandler()
global_stop_event.clear()

def Merge(self, other):
self.mutex.acquire()
try:
self.WaitHandlers()

self.main_camera = other.main_camera
self.small_cameras = other.small_cameras

self.StartHandlers()
finally:
self.mutex.release()

def Count(self):
self.mutex.acquire()
result = len(self.small_cameras)
if self.main_camera is not None:
result += 1
self.mutex.release()
return result

def HashCode(self):
self.mutex.acquire()
cameras = ",".join([camera.url for camera in self.small_cameras])
if self.main_camera is not None:
cameras = "{0}+{1}".format(self.main_camera.url, cameras)
self.mutex.release()
return cameras

def StreamFrames(self, camera_id):
selected_camera = None
camera_id = int(camera_id)

self.mutex.acquire()
if camera_id == 0:
selected_camera = self.main_camera
elif camera_id - 1 < len(self.small_cameras):
selected_camera = self.small_cameras[camera_id - 1]
self.mutex.release()

if selected_camera is None:
return Response(None, 500)
else:
return Response(selected_camera.GeneratorFunc(), mimetype='multipart/x-mixed-replace; boundary=frame')

def get_camera_display(configuration_name):
camera_display = CameraDisplay()

config.load_incluster_config()
coreV1Api = client.CoreV1Api()

# TODO use labels instead once available
instance_service_name_regex = re.compile(
configuration_name + "-[\da-f]{6}-svc")

ret = coreV1Api.list_service_for_all_namespaces(watch=False)
p = re.compile(configuration_name + "-[\da-f]{6}-svc")
for svc in ret.items:
if not p.match(svc.metadata.name):
continue
grpc_ports = list(filter(lambda port: port.name == "grpc", svc.spec.ports))
if (len(grpc_ports) == 1):
url = "{0}:{1}".format(svc.spec.cluster_ip, grpc_ports[0].port)
camera_list.append(url)
camera_list.sort()
return camera_list
if svc.metadata.name == configuration_name + "-svc":
grpc_ports = list(
filter(lambda port: port.name == "grpc", svc.spec.ports))
if (len(grpc_ports) == 1):
url = "{0}:{1}".format(svc.spec.cluster_ip, grpc_ports[0].port)
camera_display.main_camera = CameraFeed(url)
elif instance_service_name_regex.match(svc.metadata.name):
grpc_ports = list(
filter(lambda port: port.name == "grpc", svc.spec.ports))
if (len(grpc_ports) == 1):
url = "{0}:{1}".format(svc.spec.cluster_ip, grpc_ports[0].port)
camera_display.small_cameras.append(CameraFeed(url))

app = Flask(__name__)
camera_display.small_cameras.sort(key=lambda camera: camera.url)

@app.route('/')
# Home page for video streaming.
def index():
global camera_frame_queues
return render_template('index.html', camera_count=len(camera_frame_queues)-1)

@app.route('/camera_list')
# Returns the current list of cameras to allow for refresh
def camera_list():
global small_frame_sources
return ",".join(small_frame_sources)

# Generator function for video streaming.
def gen(frame_queue, verbose=False):
while True:
frame = frame_queue.get(True, None)
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')
return camera_display

# Gets response and puts it in frame queue.
def response_wrapper(frame_queue):
return Response(gen(frame_queue),
mimetype='multipart/x-mixed-replace; boundary=frame')
def run_webserver():
app.run(host='0.0.0.0', threaded=True)

@app.route('/camera_frame_feed/<camera_id>')
# Gets frame feed for specified camera.
def camera_frame_feed(camera_id=0):
global camera_frame_queues
camera_id = int(camera_id)
if (camera_id <= len(camera_frame_queues)):
logging.info("camera_feed %d" % camera_id)
return response_wrapper(camera_frame_queues[camera_id])
return None

# Updates set of cameras based on set of camera instance services
def refresh_cameras(camera_frame_threads, small_frame_sources, camera_frame_queues, stop_event):
def refresh_cameras():
global global_camera_display
while True:
sleep(1)
camera_list = get_camera_list(os.environ['CONFIGURATION_NAME'])
if camera_list != small_frame_sources:
old_count = len(small_frame_sources)
new_count = len(camera_list)
logging.info("Camera change detected, old: %d, new: %d" % (old_count, new_count))
if old_count != new_count:
if old_count < new_count:
for x in range(new_count - old_count):
camera_frame_queues.append(queue.Queue(1))
small_frame_sources[:] = camera_list
else:
small_frame_sources[:] = camera_list
camera_frame_queues[:] = camera_frame_queues[:(old_count - new_count)]
else:
small_frame_sources[:] = camera_list
logging.info(small_frame_sources)
schedule_get_frames(
camera_frame_threads, small_frame_sources, camera_frame_queues, stop_event)

def run_webserver():
app.run(host='0.0.0.0', threaded=True)
camera_display = get_camera_display(os.environ['CONFIGURATION_NAME'])
if camera_display != global_camera_display:
global_camera_display.Merge(camera_display)

# Loops, creating gRPC client and grabing frame from camera serving specified url.
def get_frames(url, frame_queue, stop_event):
logging.info("Starting get_frames(%s)" % url)
while not stop_event.wait(0.01):
try:
client_channel = grpc.insecure_channel(url, options=(
('grpc.use_local_subchannel_pool', 1),))
camera_stub = camera_pb2_grpc.CameraStub(client_channel)
frame = camera_stub.GetFrame(camera_pb2.NotifyRequest())
frame = frame.frame
client_channel.close()

frame_received = False
# prevent stale data
if (len(frame) > 0):
if (frame_queue.full()):
try:
frame_queue.get(False)
except:
pass
frame_queue.put(frame, False)
frame_received = True

if (frame_received):
sleep(1)
global_stop_event = threading.Event()
global_camera_display = CameraDisplay()

except:
logging.info("[%s] Exception %s" % (url, traceback.format_exc()))
sleep(1)
app = Flask(__name__)

# schedules frame polling threads
def schedule_get_frames(camera_frame_threads, small_frame_sources, camera_frame_queues, stop_event):
if camera_frame_threads:
stop_event.set()
for camera_frame_thread in camera_frame_threads:
camera_frame_thread.join()
stop_event.clear()
camera_frame_threads.clear()
# Home page for video streaming.
@app.route('/')
def index():
global global_camera_display
return render_template('index.html', camera_count=global_camera_display.Count(), camera_list=global_camera_display.HashCode())

cameras_frame_thread = threading.Thread(target=get_frames, args=(main_frame_source, camera_frame_queues[0], stop_event))
cameras_frame_thread.start()
camera_frame_threads.append(cameras_frame_thread)
# Returns the current list of cameras to allow for refresh
@app.route('/camera_list')
def camera_list():
global global_camera_display
logging.info("Expected cameras: %s" % global_camera_display.HashCode())
return global_camera_display.HashCode()

for camera_id in range(1, len(small_frame_sources) + 1):
camera_frame_thread = threading.Thread(target=get_frames, args=(small_frame_sources[camera_id - 1], camera_frame_queues[camera_id], stop_event))
camera_frame_thread.start()
camera_frame_threads.append(camera_frame_thread)
# Gets frame feed for specified camera.
@app.route('/camera_frame_feed/<camera_id>')
def camera_frame_feed(camera_id=0):
global global_camera_display
return global_camera_display.StreamFrames(camera_id)

print("Starting...", flush=True)
logging.basicConfig(format="%(asctime)s: %(message)s", level=logging.INFO, datefmt="%H:%M:%S")

main_frame_source = ""

if 'CONFIGURATION_NAME' in os.environ:
# Expecting source service ports to be named grpc

configuration_name = os.environ['CONFIGURATION_NAME']
camera_display = get_camera_display(configuration_name)
global_camera_display.Merge(camera_display)

config.load_incluster_config()
coreV1Api = client.CoreV1Api()
ret = coreV1Api.list_service_for_all_namespaces(watch=False)
for svc in ret.items:
if svc.metadata.name == configuration_name + "-svc":
grpc_ports = list(
filter(lambda port: port.name == "grpc", svc.spec.ports))
if (len(grpc_ports) == 1):
main_frame_source = "{0}:{1}".format(
svc.spec.cluster_ip, grpc_ports[0].port)

small_frame_sources = get_camera_list(configuration_name)
camera_count = len(small_frame_sources)
refresh_thread = threading.Thread(target=refresh_cameras)
refresh_thread.start()
else:
camera_count = int(os.environ['CAMERA_COUNT'])
main_frame_source = "{0}:80".format(os.environ['CAMERAS_SOURCE_SVC'])
main_camera_url = "{0}:80".format(os.environ['CAMERAS_SOURCE_SVC'])
global_camera_display.main_camera = CameraFeed(main_camera_url)
for camera_id in range(1, camera_count + 1):
url = "{0}:80".format(
os.environ['CAMERA{0}_SOURCE_SVC'.format(camera_id)])
small_frame_sources.append(url)

for camera_id in range(camera_count + 1):
camera_frame_queues.append(queue.Queue(1))
global_camera_display.small_cameras.append(CameraFeed(url))
global_camera_display.StartHandlers()

webserver_thread = threading.Thread(target=run_webserver)
webserver_thread.start()

stop_event = threading.Event()
camera_frame_threads = []
schedule_get_frames(camera_frame_threads, small_frame_sources, camera_frame_queues, stop_event)

if 'CONFIGURATION_NAME' in os.environ:
refresh_thread = threading.Thread(target=refresh_cameras, args=(camera_frame_threads, small_frame_sources, camera_frame_queues, stop_event))
refresh_thread.start()

print("Started", flush=True)
webserver_thread.join()
print("Done", flush=True)
8 changes: 5 additions & 3 deletions samples/apps/video-streaming-app/templates/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,24 @@
<body>
<div style="max-width: 800px; margin: auto;text-align:center">
<h1>Akri Demo</h1>
{%if camera_count > 0 %}
<div style="display: inline-block;clear:both;margin-bottom:30px">
<img src="{{ url_for('camera_frame_feed', camera_id = 0) }}" style="width:480px">
</div>
{%endif%}
<ul style="display: block;list-style-type: none;padding:0;">
{%for camera_id in range(1, camera_count + 1)%}
{%for camera_id in range(1, camera_count)%}
<li style="display: inline-block; padding: 0 25">
<img src="{{ url_for('camera_frame_feed', camera_id = camera_id) }}" style="width:200px">
</li>
{%endfor%}
</ul>
</div>
<script>
var last_camera_list = ""
var last_camera_list = "{{ camera_list }}"
function refresh_on_device_change() {
fetch('/camera_list').then(resp => resp.text()).then(new_camera_list => {
if (new_camera_list != last_camera_list && last_camera_list != "") {
if (new_camera_list != last_camera_list) {
window.location.reload(false);
} else {
last_camera_list = new_camera_list;
Expand Down