Skip to content

Commit

Permalink
fix bug in FSK modulation: remove spiky jumps (#685)
Browse files Browse the repository at this point in the history
  • Loading branch information
jopohl committed Aug 24, 2019
1 parent ad0e175 commit 30a4d59
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 4 deletions.
31 changes: 29 additions & 2 deletions src/urh/cythonext/signal_functions.pyx
Expand Up @@ -6,6 +6,7 @@ from libcpp cimport bool

from libc.stdint cimport uint8_t, uint16_t, uint32_t, int64_t
from libc.stdio cimport printf
from libc.stdlib cimport malloc, free
from urh.cythonext.util cimport IQ, iq, bit_array_to_number

from cython.parallel import prange
Expand Down Expand Up @@ -62,12 +63,13 @@ cpdef modulate_c(uint8_t[:] bits, uint32_t samples_per_symbol, str modulation_ty
float carrier_amplitude, float carrier_frequency, float carrier_phase, float sample_rate,
uint32_t pause, uint32_t start, iq iq_type,
float gauss_bt=0.5, float filter_width=1.0):
cdef int64_t i = 0, j = 0, index = 0, s_i = 0, num_bits = len(bits)
cdef int64_t i = 0, j = 0, index = 0, prev_index=0, s_i = 0, num_bits = len(bits)
cdef uint32_t total_symbols = int(num_bits // bits_per_symbol)
cdef int64_t total_samples = total_symbols * samples_per_symbol + pause

cdef float a = carrier_amplitude, f = carrier_frequency, phi = carrier_phase

cdef float f_previous = 0, phase_correction = 0
cdef float t = 0, arg = 0

result = np.zeros((total_samples, 2), dtype=get_numpy_dtype(iq_type))
Expand Down Expand Up @@ -97,19 +99,41 @@ cpdef modulate_c(uint8_t[:] bits, uint32_t samples_per_symbol, str modulation_ty
samples_per_symbol, sample_rate, carrier_phase,
start, gauss_bt, filter_width)


cdef float* phase_corrections = NULL
if is_fsk and total_symbols > 0:
phase_corrections = <float*>malloc(total_symbols * sizeof(float))
phase_corrections[0] = 0.0
for s_i in range(1, total_symbols):
# Add phase correction to FSK modulation in order to prevent spiky jumps
index = bit_array_to_number(bits, end=(s_i+1)*bits_per_symbol, start=s_i*bits_per_symbol)
prev_index = bit_array_to_number(bits, end=s_i*bits_per_symbol, start=(s_i-1)*bits_per_symbol)

f = parameters[index]
f_previous = parameters[prev_index]

if f != f_previous:
t = (s_i*samples_per_symbol+start-1) / sample_rate
phase_corrections[s_i] = (phase_corrections[s_i-1] + 2 * M_PI * (f_previous-f) * t) % (2 * M_PI)
else:
phase_corrections[s_i] = phase_corrections[s_i-1]

for s_i in prange(0, total_symbols, schedule="static", nogil=True):
index = bit_array_to_number(bits, end=(s_i+1)*bits_per_symbol, start=s_i*bits_per_symbol)

a = carrier_amplitude
f = carrier_frequency
phi = carrier_phase
phase_correction = 0

if is_ask:
a = parameters[index]
if a == 0:
continue
elif is_fsk:
f = parameters[index]
phase_correction = phase_corrections[s_i]

elif is_psk or is_oqpsk:
phi = parameters[index]

Expand All @@ -118,11 +142,14 @@ cpdef modulate_c(uint8_t[:] bits, uint32_t samples_per_symbol, str modulation_ty
if is_gfsk:
f = gauss_filtered_freqs_phases[i, 0]
phi = gauss_filtered_freqs_phases[i, 1]
arg = 2 * M_PI * f * t + phi
arg = 2 * M_PI * f * t + phi + phase_correction

result_view[i, 0] = <iq>(a * cosf(arg))
result_view[i, 1] = <iq>(a * sinf(arg))

if phase_corrections != NULL:
free(phase_corrections)

return result

cdef uint8_t[:] get_oqpsk_bits(uint8_t[:] original_bits):
Expand Down
2 changes: 0 additions & 2 deletions src/urh/dev/native/lib/usrp.pyx
@@ -1,5 +1,3 @@
import time

from urh.dev.native.lib.cusrp cimport *
import numpy as np
# noinspection PyUnresolvedReferences
Expand Down
21 changes: 21 additions & 0 deletions tests/test_demodulations.py
@@ -1,6 +1,8 @@
import array
import unittest

import numpy as np

from tests.utils_testing import get_path_for_data_file
from urh.cythonext.signal_functions import modulate_c
from urh.signalprocessing.IQArray import IQArray
Expand Down Expand Up @@ -44,6 +46,25 @@ def test_fsk(self):
self.assertEqual(proto_analyzer.plain_bits_str[0],
"101010101010101010101010101010101100011000100110110001100010011011110100110111000001110110011000111011101111011110100100001001111001100110011100110100100011100111010011111100011")

def test_fsk_short_bit_length(self):
bits_str = "101010"
bits = array.array("B", list(map(int, bits_str)))
parameters = array.array("f", [-10e3, 10e3])
result = modulate_c(bits, 8, "FSK", parameters, 1, 1, 40e3, 0, 1e6, 1000, 0, parameters[0])

signal = Signal("")
signal.iq_array = IQArray(result)

# Ensure we have no spikes
self.assertLess(np.max(signal.qad), 1)

signal.qad_center = 0
signal.bit_len = 8

proto_analyzer = ProtocolAnalyzer(signal)
proto_analyzer.get_protocol_from_signal()
self.assertEqual(proto_analyzer.plain_bits_str[0], bits_str)

def test_psk(self):
signal = Signal(get_path_for_data_file("psk_gen_noisy.complex"), "PSK-Test")
signal.modulation_type = "PSK"
Expand Down

0 comments on commit 30a4d59

Please sign in to comment.