Skip to content

Commit

Permalink
Merge pull request #13 from pimoroni/patch-py39
Browse files Browse the repository at this point in the history
Fix Python 3.9 support for #11
  • Loading branch information
Gadgetoid committed Feb 9, 2022
2 parents 5525c58 + 33a9443 commit 38b2a72
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 4 deletions.
50 changes: 50 additions & 0 deletions examples/cmd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#!/usr/bin/env python3
import time
import sys

from pa1010d import PA1010D


"""
Run raw commands against the PA1010D GPS and return the respones.
Eg:
PMTK605 = Query Firmware Release Info
PMTK430 = Query Datum
PMTK414 = Query NMEA Output Frequency
PMTK400 = Query Update Rate
PMTK225,<1 or 0> = Enable/Disable PPS
"""

def timeout(err=None, timeout=5.0):
if err is None:
err = "Timed out!"
t_start = time.time()
while time.time() - t_start < timeout:
yield
raise TimeoutError(err)


responses = {
}

if len(sys.argv) < 2:
print(f"Usage: {sys.argv[0]} <command>")
sys.exit()

command = sys.argv[1]
response = responses.get(command, f"$PMTK{int(command[4:]) + 100}")

gps = PA1010D()

gps.update()

gps.send_command(sys.argv[1])

if response:
print(f"Waiting for {response}...")
for t in timeout("Timed out waiting for command response."):
message = gps.read_sentence()
if message.startswith(response):
print(message)
break
21 changes: 21 additions & 0 deletions examples/pps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/usr/bin/env python3
import time
import sys

from pa1010d import PA1010D


if len(sys.argv) < 2:
print(f"Usage: {sys.argv[0]} <on/off>")
sys.exit()

gps = PA1010D()

if sys.argv[1] == "on":
gps.send_command("PMTK255,1")
else:
gps.send_command("PMTK255,0")

result = gps.update()

print("OK" if result else "Uh oh!")
11 changes: 7 additions & 4 deletions library/pa1010d/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,21 @@ def _write_sentence(self, bytestring):
"""
for char_index in bytestring:
self._i2c.write_byte(self._i2c_addr, ord(char_index))
self._i2c.write_byte(self._i2c_addr, char_index)

def send_command(self, command, add_checksum=True):
"""Send a command string to the PA1010D.
If add_checksum is True (the default) a NMEA checksum will automatically be computed and added.
"""
if type(command) is not bytes:
command = command.encode("ascii")

# TODO replace with pynmea2 functionality
if command.startswith("$"):
if command[0] == b"$":
command = command[1:]
if command.endswith("*"):
if command[-1] == b"*":
command = command[:-1]

buf = bytearray()
Expand Down Expand Up @@ -112,7 +115,7 @@ def read_sentence(self, timeout=5):
# Should be a full \r\n since the GPS emits spurious newlines
if buf[-2:] == [ord("\r"), ord("\n")]:
# Remove line ending and spurious newlines from the sentence
return bytearray(buf).decode("ascii").strip().replace("\n","")
return bytearray(buf).decode("ascii").strip().replace("\n", "")

raise TimeoutError("Timeout waiting for readline")

Expand Down
19 changes: 19 additions & 0 deletions library/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,28 @@
import sys


class SMBus:
def __init__(self, bus):
self.data = "$PMTK011,MTKGPS*08\r\n".encode("ascii")
self.ptr = 0

def read_byte_data(self, address, register):
if register == 0x00:
result = self.data[self.ptr]
self.ptr += 1
self.ptr %= len(self.data)
return result
else:
return 0

def write_byte(self, address, data):
pass


@pytest.fixture(scope='function', autouse=False)
def smbus():
smbus = mock.MagicMock()
smbus.SMBus = SMBus
sys.modules["smbus"] = smbus
yield smbus
del sys.modules["smbus"]
16 changes: 16 additions & 0 deletions library/tests/test_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,19 @@ def test_setup(smbus):

gps = pa1010d.PA1010D()
del gps


def test_send_command(smbus):
import pa1010d

gps = pa1010d.PA1010D()
gps.send_command("$TEST")
gps.send_command("$TEST*")
gps.send_command("$TEST*".encode("ascii"))


def test_recv_command(smbus):
import pa1010d

gps = pa1010d.PA1010D()
assert gps.update() is True

0 comments on commit 38b2a72

Please sign in to comment.