Skip to content

Commit

Permalink
mavproxy_misc: make and mark as flake8 clean
Browse files Browse the repository at this point in the history
  • Loading branch information
peterbarker committed Jun 16, 2024
1 parent 919a35e commit 3019f93
Showing 1 changed file with 57 additions and 45 deletions.
102 changes: 57 additions & 45 deletions MAVProxy/modules/mavproxy_misc.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
#!/usr/bin/env python
'''miscellaneous commands'''
'''
miscellaneous commands
AP_FLAKE8_CLEAN
'''

import math
import os
import sys
import time

import time, math, sys
from pymavlink import mavutil

from MAVProxy.modules.lib import mp_module
from MAVProxy.modules.lib import mp_util


from os import kill
from signal import signal
from subprocess import PIPE, Popen


class RepeatCommand(object):
'''repeated command object'''
def __init__(self, period, cmd):
Expand All @@ -23,27 +30,26 @@ def __str__(self):
return "Every %.1f seconds: %s" % (self.period, self.cmd)


def run_command(args, cwd = None, shell = False, timeout = None, env = None):
def run_command(args, cwd=None, shell=False, timeout=None, env=None):
'''
Run a shell command with a timeout.
See http://stackoverflow.com/questions/1191374/subprocess-with-timeout
'''
from subprocess import PIPE, Popen
try:
# py2
from StringIO import StringIO
except ImportError:
# py3
from io import StringIO
import fcntl, os, signal
p = Popen(args, shell = shell, cwd = cwd, stdout = PIPE, stderr = PIPE, env = env)
import fcntl
p = Popen(args, shell=shell, cwd=cwd, stdout=PIPE, stderr=PIPE, env=env)
tstart = time.time()
buf = StringIO()

# try to make it non-blocking
try:
fcntl.fcntl(p.stdout, fcntl.F_SETFL, fcntl.fcntl(p.stdout, fcntl.F_GETFL) | os.O_NONBLOCK)
except Exception as ex:
except Exception:
pass

while True:
Expand All @@ -54,7 +60,7 @@ def run_command(args, cwd = None, shell = False, timeout = None, env = None):
if sys.version_info.major >= 3:
s = s.decode('utf-8')
buf.write(s)
except Exception as ex:
except Exception:
pass
if retcode is not None:
break
Expand All @@ -67,6 +73,7 @@ def run_command(args, cwd = None, shell = False, timeout = None, env = None):
p.wait()
return buf.getvalue()


class MiscModule(mp_module.MPModule):
def __init__(self, mpstate):
super(MiscModule, self).__init__(mpstate, "misc", "misc commands", public=True)
Expand Down Expand Up @@ -108,7 +115,7 @@ def __init__(self, mpstate):
self.add_command('canforward', self.cmd_canforward, "enable CAN forwarding")

self.add_command('gear', self.cmd_landing_gear, "landing gear control")

self.repeats = []

def altitude_difference(self, pressure1, pressure2, ground_temp):
Expand Down Expand Up @@ -256,7 +263,7 @@ def cmd_dfu_boot(self, args):
def cmd_deadlock(self, args):
'''trigger a mutex deadlock'''
self.cmd_dosomethingreallynastyto_autopilot(args, 'mutex-deadlock', 100)

def cmd_battery_reset(self, args):
'''reset battery remaining'''
mask = -1
Expand All @@ -267,7 +274,7 @@ def cmd_battery_reset(self, args):
remaining_pct = int(args[1])
self.master.mav.command_long_send(self.settings.target_system, self.settings.target_component,
mavutil.mavlink.MAV_CMD_BATTERY_RESET, 0,
mask, remaining_pct, 0, 0, 0, 0, 0)
mask, remaining_pct, 0, 0, 0, 0, 0)

def cmd_time(self, args):
'''show autopilot time'''
Expand Down Expand Up @@ -306,7 +313,7 @@ def cmd_changealt_abs(self, args):
3, 1, 0, 0, 0, 0,
0, 0, absalt)
print("Sent change altitude command for %.1f meters" % absalt)

def cmd_land(self, args):
'''auto land commands'''
if len(args) < 1:
Expand Down Expand Up @@ -365,13 +372,13 @@ def cmd_led(self, args):
pattern[0] = int(args[0])
pattern[1] = int(args[1])
pattern[2] = int(args[2])

if len(args) == 4:
plen = 4
pattern[3] = int(args[3])
else:
plen = 3

self.master.mav.led_control_send(self.settings.target_system,
self.settings.target_component,
0, 0, plen, pattern)
Expand All @@ -398,7 +405,7 @@ def cmd_scripting(self, args):
mavutil.mavlink.MAV_CMD_SCRIPTING,
0, 0,
cmd,
0,0,0,0,0,0)
0, 0, 0, 0, 0, 0)

def cmd_formatsdcard(self, args):
'''format SD card'''
Expand All @@ -408,8 +415,8 @@ def cmd_formatsdcard(self, args):
mavutil.mavlink.MAV_CMD_STORAGE_FORMAT,
0, 0,
1, 1,
0,0,0,0,0)
0, 0, 0, 0, 0)

def cmd_oreoled(self, args):
'''send LED pattern as override, using OreoLED conventions'''
if len(args) < 4:
Expand All @@ -425,18 +432,20 @@ def cmd_oreoled(self, args):
pattern[5] = int(args[1])
pattern[6] = int(args[2])
pattern[7] = int(args[3])

self.master.mav.led_control_send(self.settings.target_system,
self.settings.target_component,
lednum, 255, 8, pattern)

def cmd_flashbootloader(self, args):
'''flash bootloader'''
self.master.mav.command_long_send(self.settings.target_system,
0,
mavutil.mavlink.MAV_CMD_FLASH_BOOTLOADER,
0, 0, 0, 0, 0, 290876, 0, 0)

self.master.mav.command_long_send(
self.settings.target_system,
0,
mavutil.mavlink.MAV_CMD_FLASH_BOOTLOADER,
0, 0, 0, 0, 0, 290876, 0, 0
)

def cmd_playtune(self, args):
'''send PLAY_TUNE message'''
if len(args) < 1:
Expand Down Expand Up @@ -604,47 +613,50 @@ def cmd_canforward(self, args):
0,
0,
0)

def cmd_landing_gear(self, args):
usage='''Usage: gear <up|down> [ID]
usage = '''Usage: gear <up|down> [ID]
Alt: gear <extend|retract> [ID]'''
if len(args) == 0 or args[0] not in ['up','down','extend', 'retract']:
if len(args) == 0 or args[0] not in ['up', 'down', 'extend', 'retract']:
print(usage)
return
if args[0] in ['down','extend']:
DesiredState=0
elif args [0] in ['up','retract']:
DesiredState=1
if args[0] in ['down', 'extend']:
DesiredState = 0
elif args[0] in ['up', 'retract']:
DesiredState = 1
else:
print(usage)
return
if len(args)==1:
ID=-1
elif len(args)==2:
if len(args) == 1:
ID = -1
elif len(args) == 2:
try:
ID=int(args[1])
ID = int(args[1])
except ValueError:
print(usage)
return
if ID<-1:
if ID < -1:
print(usage)
return
else:
print(usage)
return
self.master.mav.command_long_send(self.target_system,
self.target_component,
mavutil.mavlink.MAV_CMD_AIRFRAME_CONFIGURATION , 0,
ID,
DesiredState,
0, 0, 0, 0, 0, 0)

self.master.mav.command_long_send(
self.target_system,
self.target_component,
mavutil.mavlink.MAV_CMD_AIRFRAME_CONFIGURATION , 0,
ID,
DesiredState,
0, 0, 0, 0, 0, 0
)

def idle_task(self):
'''called on idle'''
for r in self.repeats:
if r.event.trigger():
self.mpstate.functions.process_stdin(r.cmd, immediate=True)


def init(mpstate):
'''initialise module'''
return MiscModule(mpstate)

0 comments on commit 3019f93

Please sign in to comment.