Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
4 changed files
with
92 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,11 +1,32 @@ | ||
import argparse | ||
from runner import Runner | ||
from drone_command_requester import DroneCommandRequester | ||
from drone_state_receiver import DroneStateReceiver | ||
from drone_video_receiver import DroneVideoReceiver | ||
from dashboard import Dashboard | ||
from stub_command_requester import StubCommandRequester | ||
from stub_state_receiver import StubStateReceiver | ||
from stub_video_receiver import StubVideoReceiver | ||
|
||
parser = argparse.ArgumentParser() | ||
parser.add_argument( | ||
"-s", "--stub", help="use stubs instead of a drone", action="store_true" | ||
) | ||
parser.add_argument( | ||
"-w", "--webcam", help="webcam device id (for stub)", type=int, default=0 | ||
) | ||
args = parser.parse_args() | ||
|
||
if args.stub: | ||
startables = [ | ||
StubCommandRequester(), | ||
StubStateReceiver(), | ||
StubVideoReceiver(args.webcam), | ||
] | ||
else: | ||
startables = [DroneCommandRequester(), DroneStateReceiver(), DroneVideoReceiver()] | ||
|
||
dashboard = Dashboard() | ||
startables = [DroneCommandRequester(), DroneStateReceiver(), DroneVideoReceiver()] | ||
runner = Runner() | ||
|
||
runner.run(startables, dashboard) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
import time | ||
|
||
from info import Info | ||
from logger import get_logger | ||
from startable import Startable | ||
|
||
|
||
class StubCommandRequester(Startable): | ||
def start(self, info: Info) -> None: | ||
logger = get_logger(__name__) | ||
logger.info("start") | ||
|
||
while info.is_active(): | ||
time.sleep(0.1) | ||
msg = info.pick_command() | ||
if msg == "": | ||
continue | ||
|
||
info.set_sent_command(msg) | ||
time.sleep(1) | ||
info.set_command_result("OK") | ||
|
||
logger.info("done") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
from __future__ import annotations | ||
import time | ||
|
||
from info import Info | ||
from logger import get_logger | ||
from startable import Startable | ||
|
||
|
||
class StubStateReceiver(Startable): | ||
def start(self, info: Info) -> None: | ||
logger = get_logger(__name__) | ||
logger.info("start") | ||
|
||
while info.is_active(): | ||
time.sleep(5) | ||
info.set_states({"bat": 38.0}) | ||
|
||
logger.info("done") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
import cv2 | ||
|
||
from info import Info | ||
from logger import get_logger | ||
from startable import Startable | ||
|
||
|
||
class StubVideoReceiver(Startable): | ||
device_id: int | ||
|
||
def __init__(self, device_id: int = 0) -> None: | ||
super().__init__() | ||
self.device_id = device_id | ||
|
||
def start(self, info: Info) -> None: | ||
logger = get_logger(__name__) | ||
logger.info("start") | ||
|
||
cap = cv2.VideoCapture(self.device_id) | ||
|
||
while info.is_active(): | ||
success, image = cap.read() | ||
if not success: | ||
continue | ||
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) | ||
info.set_image(image) | ||
|
||
cap.release() | ||
logger.info("done") |