Skip to content

Commit

Permalink
Adding tests for ATC mode and adjusting some pylint stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
jgillula committed Nov 14, 2022
1 parent baa24fb commit d9d922f
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 19 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pylint.yml
Expand Up @@ -18,7 +18,7 @@ jobs:
pip install pylint>=2.12.2
- name: Analysing the code with pylint
id: pylint
run: python3 .github/workflows/run_pylint.py
run: python3 .github/workflows/run_pylint.py >> $GITHUB_OUTPUT
- name: Generating badge
uses: schneegans/dynamic-badges-action@v1.1.0
with:
Expand Down
11 changes: 5 additions & 6 deletions RFM69/radio.py
Expand Up @@ -332,7 +332,7 @@ def get_packets(self):
# return packets
with self._packetLock:
packets = list(self._packets)
self._packets = []
self._packets = [] # pylint: disable=dangerous-default-value
return packets


Expand All @@ -343,7 +343,7 @@ def send_ack(self, toAddress, buff=[]):
toAddress (int): Recipient node's ID
"""
while not self._canSend():
while not self._canSend(): # pragma: no cover
pass #self.has_received_packet()

# Convert buff to list of int if it's a string so the RSSI can be inserted
Expand Down Expand Up @@ -444,7 +444,7 @@ def _canSend(self):
self.begin_receive()
return True
#if signal stronger than -100dBm is detected assume channel activity - removed self.PAYLOADLEN == 0 and
elif self.mode == RF69_MODE_RX and self._readRSSI() < CSMA_LIMIT:
elif self.mode == RF69_MODE_RX and self._readRSSI() < CSMA_LIMIT: # pylint: disable=no-else-return
self._setMode(RF69_MODE_STANDBY)
return True
return False
Expand Down Expand Up @@ -556,7 +556,6 @@ def __str__(self): # pragma: no cover
def __repr__(self): # pragma: no cover
return "Radio()"

# pylint: disable=no-self-use
def _init_log(self):
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -593,8 +592,7 @@ def _interruptHandler(self, pin): # pragma: no cover
with self._spiLock:
payload_length, target_id, sender_id, CTLbyte = self.spi.xfer2([REG_FIFO & 0x7f, 0, 0, 0, 0])[1:]

if payload_length > 66:
payload_length = 66
payload_length = min(payload_length, 66)

if not (self.promiscuousMode or target_id == self.address or target_id == RF69_BROADCAST_ADDR):
self._debug("Ignore Interrupt")
Expand Down Expand Up @@ -658,6 +656,7 @@ def _reinitRadio(self): # pragma: no cover
if self._isHighSpeed:
self._writeReg(REG_LNA, (self._readReg(REG_LNA) & ~0x3) | RF_LNA_GAINSELECT_AUTO)

# pylint: disable=no-else-return
def _getUsForResolution(self, resolution): # pragma: no cover
if resolution == RF_LISTEN1_RESOL_RX_64 or resolution == RF_LISTEN1_RESOL_IDLE_64:
return 64
Expand Down
4 changes: 3 additions & 1 deletion tests/test-node/test-node.ino
Expand Up @@ -50,6 +50,7 @@


RFM69 radio(RF69_SPI_CS, RF69_IRQ_PIN, false);
//RFM69_ATC radio(RF69_SPI_CS, RF69_IRQ_PIN, false);

void setup() {
Serial.begin(SERIAL_BAUD);
Expand Down Expand Up @@ -141,6 +142,7 @@ bool test_transmit(String& failureReason) {
bool result = false;
while (!radio.receiveDone()) delay(1);
getMessage(data, datalen);
delay(25);
if (radio.ACKRequested()) {
radio.sendACK(radio.SENDERID);
char goal_string[6] = {'B', 'a', 'n', 'a', 'n', 'a'};
Expand Down Expand Up @@ -174,7 +176,7 @@ bool test_receive(String& failureReason) {
bool test_txrx(String& failureReason) {
while (!radio.receiveDone()) delay(1);
getMessage(data, datalen);
delay(25);
delay(50);
if (radio.ACKRequested()) radio.sendACK(radio.SENDERID);
char* response = new char[datalen];
for (uint8_t i = 0; i < datalen; i++) {
Expand Down
8 changes: 8 additions & 0 deletions tests/test_config.py
Expand Up @@ -37,3 +37,11 @@

# Uncomment to test ListenMode
# TEST_LISTEN_MODE_SEND_BURST = True

# Uncomment to test ATC mode
# ATC_MODE = True

try:
ATC_MODE
except NameError:
ATC_MODE = False
12 changes: 6 additions & 6 deletions tests/test_radio.py
Expand Up @@ -9,7 +9,7 @@


def test_transmit():
with Radio(FREQUENCY, 1, 99, verbose=True, interruptPin=INTERRUPT_PIN, resetPin=RESET_PIN, spiDevice=SPI_DEVICE, isHighPower=IS_HIGH_POWER, encryptionKey="sampleEncryptKey") as radio:
with Radio(FREQUENCY, 1, 99, verbose=True, interruptPin=INTERRUPT_PIN, resetPin=RESET_PIN, spiDevice=SPI_DEVICE, isHighPower=IS_HIGH_POWER, encryptionKey="sampleEncryptKey", enableATC=ATC_MODE) as radio:
# Test setting the network ID to the value we'll actually test with
radio.set_network(100)
# Try sending to a node that isn't on, and don't require an ack
Expand All @@ -22,7 +22,7 @@ def test_transmit():
assert success is True

def test_receive():
with Radio(FREQUENCY, 1, 100, verbose=True, interruptPin=INTERRUPT_PIN, resetPin=RESET_PIN, spiDevice=SPI_DEVICE, isHighPower=IS_HIGH_POWER, encryptionKey="sampleEncryptKey") as radio:
with Radio(FREQUENCY, 1, 100, verbose=True, interruptPin=INTERRUPT_PIN, resetPin=RESET_PIN, spiDevice=SPI_DEVICE, isHighPower=IS_HIGH_POWER, encryptionKey="sampleEncryptKey", enableATC=ATC_MODE) as radio:
timeout = time.time() + 5
while time.time() < timeout:
if radio.num_packets() > 0:
Expand All @@ -46,7 +46,7 @@ def do_txrx_test(radio):
time.sleep(1.0)

def test_txrx():
with Radio(FREQUENCY, 1, 100, verbose=True, interruptPin=INTERRUPT_PIN, resetPin=RESET_PIN, spiDevice=SPI_DEVICE, isHighPower=IS_HIGH_POWER, encryptionKey="sampleEncryptKey") as radio:
with Radio(FREQUENCY, 1, 100, verbose=True, interruptPin=INTERRUPT_PIN, resetPin=RESET_PIN, spiDevice=SPI_DEVICE, isHighPower=IS_HIGH_POWER, encryptionKey="sampleEncryptKey", enableATC=ATC_MODE) as radio:
do_txrx_test(radio)

def test_bcm_pin_numbers():
Expand All @@ -56,15 +56,15 @@ def test_bcm_pin_numbers():
GPIO.cleanup()
# The format of this dict is (Raspberry Pi pin number: GPIO number)
board_to_bcm_map = {3: 2, 5: 3, 7: 4, 8: 14, 10: 15, 11: 17, 12: 18, 13: 27, 15: 22, 16: 23, 18: 24, 19: 10, 21: 9, 22: 25, 23: 11, 24: 8, 26: 7, 27: 0, 28: 1, 29: 5, 31: 6, 32: 12, 33: 13, 35: 19, 36: 16, 37: 26, 38: 20, 40: 21}
with Radio(FREQUENCY, 1, 100, verbose=True, use_board_pin_numbers=False, interruptPin=board_to_bcm_map[INTERRUPT_PIN], resetPin=board_to_bcm_map[RESET_PIN], spiDevice=SPI_DEVICE, isHighPower=IS_HIGH_POWER, encryptionKey="sampleEncryptKey") as radio:
with Radio(FREQUENCY, 1, 100, verbose=True, use_board_pin_numbers=False, interruptPin=board_to_bcm_map[INTERRUPT_PIN], resetPin=board_to_bcm_map[RESET_PIN], spiDevice=SPI_DEVICE, isHighPower=IS_HIGH_POWER, encryptionKey="sampleEncryptKey", enableATC=ATC_MODE) as radio:
do_txrx_test(radio)
# Since we used BCM pin numbers, we have to clean up all of GPIO again
GPIO.cleanup()

def test_listen_mode_send_burst():
try:
TEST_LISTEN_MODE_SEND_BURST
with Radio(FREQUENCY, 1, 100, verbose=True, interruptPin=INTERRUPT_PIN, resetPin=RESET_PIN, spiDevice=SPI_DEVICE, isHighPower=IS_HIGH_POWER, encryptionKey="sampleEncryptKey") as radio:
with Radio(FREQUENCY, 1, 100, verbose=True, interruptPin=INTERRUPT_PIN, resetPin=RESET_PIN, spiDevice=SPI_DEVICE, isHighPower=IS_HIGH_POWER, encryptionKey="sampleEncryptKey", enableATC=ATC_MODE) as radio:
# For more test coverage, let's try setting the listen mode durations outside the acceptable range, like 70 seconds
radio.listen_mode_set_durations(256, 70000000)
radio.listen_mode_set_durations(70000000, 1000400)
Expand All @@ -84,7 +84,7 @@ def test_listen_mode_send_burst():
pytest.skip("Skipping testing listen_mode_send_burst since it's not set up")

def test_general():
with Radio(FREQUENCY, 1, 100, verbose=True, interruptPin=INTERRUPT_PIN, resetPin=RESET_PIN, spiDevice=SPI_DEVICE, isHighPower=IS_HIGH_POWER, encryptionKey="sampleEncryptKey") as radio:
with Radio(FREQUENCY, 1, 100, verbose=True, interruptPin=INTERRUPT_PIN, resetPin=RESET_PIN, spiDevice=SPI_DEVICE, isHighPower=IS_HIGH_POWER, encryptionKey="sampleEncryptKey", enableATC=ATC_MODE) as radio:
# This is just here for test coverage
radio._readRSSI(True)
radio.read_registers()
Expand Down
2 changes: 1 addition & 1 deletion tests/test_radio_broadcast.py
Expand Up @@ -6,7 +6,7 @@


def test_broadcast_and_promiscuous_mode():
with Radio(FREQUENCY, 1, 100, verbose=True, interruptPin=INTERRUPT_PIN, resetPin=RESET_PIN, spiDevice=SPI_DEVICE, isHighPower=IS_HIGH_POWER, encryptionKey="sampleEncryptKey") as radio:
with Radio(FREQUENCY, 1, 100, verbose=True, interruptPin=INTERRUPT_PIN, resetPin=RESET_PIN, spiDevice=SPI_DEVICE, isHighPower=IS_HIGH_POWER, encryptionKey="sampleEncryptKey", enableATC=ATC_MODE) as radio:
test_message = [random.randint(0, 255) for i in range(RF69_MAX_DATA_LEN)]
# first we send the message out to the world
radio.broadcast(test_message)
Expand Down
8 changes: 4 additions & 4 deletions tests/test_radio_threadsafe.py
Expand Up @@ -8,7 +8,7 @@


def test_transmit_threadsafe():
with Radio(FREQUENCY, 1, 100, verbose=True, interruptPin=INTERRUPT_PIN, resetPin=RESET_PIN, spiDevice=SPI_DEVICE, isHighPower=IS_HIGH_POWER) as radio:
with Radio(FREQUENCY, 1, 100, verbose=True, interruptPin=INTERRUPT_PIN, resetPin=RESET_PIN, spiDevice=SPI_DEVICE, isHighPower=IS_HIGH_POWER, enableATC=ATC_MODE) as radio:
# Test getting a packet when we shouldn't have one
packet = radio.get_packet(block=False)
assert packet is None
Expand All @@ -17,15 +17,15 @@ def test_transmit_threadsafe():
assert success is True

def test_receive_threadsafe():
with Radio(FREQUENCY, 1, 100, verbose=True, interruptPin=INTERRUPT_PIN, resetPin=RESET_PIN, spiDevice=SPI_DEVICE, isHighPower=IS_HIGH_POWER) as radio:
with Radio(FREQUENCY, 1, 100, verbose=True, interruptPin=INTERRUPT_PIN, resetPin=RESET_PIN, spiDevice=SPI_DEVICE, isHighPower=IS_HIGH_POWER, enableATC=ATC_MODE) as radio:
packet = radio.get_packet(timeout=5)
if packet:
assert packet.data == [ord(x) for x in "Apple\0"]
return True
return False

def test_txrx_threadsafe():
with Radio(FREQUENCY, 1, 100, verbose=True, interruptPin=INTERRUPT_PIN, resetPin=RESET_PIN, spiDevice=SPI_DEVICE, isHighPower=IS_HIGH_POWER) as radio:
with Radio(FREQUENCY, 1, 100, verbose=True, interruptPin=INTERRUPT_PIN, resetPin=RESET_PIN, spiDevice=SPI_DEVICE, isHighPower=IS_HIGH_POWER, enableATC=ATC_MODE) as radio:
# We'll try sending too big a packet to get some code coverage
test_message = [random.randint(0, 255) for i in range(RF69_MAX_DATA_LEN+10)]
success = radio.send(2, test_message, attempts=5, waitTime=100)
Expand All @@ -40,7 +40,7 @@ def test_txrx_threadsafe():
def test_listen_mode_send_burst_threadsafe():
try:
TEST_LISTEN_MODE_SEND_BURST
with Radio(FREQUENCY, 1, 100, verbose=True, interruptPin=INTERRUPT_PIN, resetPin=RESET_PIN, spiDevice=SPI_DEVICE, isHighPower=IS_HIGH_POWER) as radio:
with Radio(FREQUENCY, 1, 100, verbose=True, interruptPin=INTERRUPT_PIN, resetPin=RESET_PIN, spiDevice=SPI_DEVICE, isHighPower=IS_HIGH_POWER, enableATC=ATC_MODE) as radio:
# Try sending bytes instead of a string to get more test coverage
test_message = [108, 105, 115, 116, 101, 110, 32, 109, 111, 100, 101, 32, 116, 101, 115, 116] # this corresponds to the string "listen mode test"
radio.listen_mode_send_burst(2, test_message)
Expand Down

0 comments on commit d9d922f

Please sign in to comment.