# Signal processing

In [5]:
from typing import Tuple, List
from scipy.io import wavfile
import scipy.signal as ss
import numpy as np
import matplotlib.pyplot as plt
from random import randint, seed

### Morse code

In [6]:
SECRET_SENTECE = "KDMDS PC MD PGC PC FCTO TJOLCFB! LBMBSO PC KD MLOUIB"

In [7]:
MORSE_CODE_DICT = {
    "A": ".-",
    "B": "-...",
    "C": "-.-.",
    "D": "-..",
    "E": ".",
    "F": "..-.",
    "G": "--.",
    "H": "....",
    "I": "..",
    "J": ".---",
    "K": "-.-",
    "L": ".-..",
    "M": "--",
    "N": "-.",
    "O": "---",
    "P": ".--.",
    "Q": "--.-",
    "R": ".-.",
    "S": "...",
    "T": "-",
    "U": "..-",
    "V": "...-",
    "W": ".--",
    "X": "-..-",
    "Y": "-.--",
    "Z": "--..",
    "1": ".----",
    "2": "..---",
    "3": "...--",
    "4": "....-",
    "5": ".....",
    "6": "-....",
    "7": "--...",
    "8": "---..",
    "9": "----.",
    "0": "-----",
    ", ": "--..--",
    ".": ".-.-.-",
    "?": "..--..",
    "/": "-..-.",
    "-": "-....-",
    "(": "-.--.",
    ")": "-.--.-",
    " ": "---...",
    "~": " "
}

MORSE_TO_INT = {
    " ": 0,
    ".": 1,
    "-": 2,
}

MORSE_CODE_TO_CHAR_DICT = {}
for key, value in MORSE_CODE_DICT.items():
	MORSE_CODE_TO_CHAR_DICT[value] = key

## WAV file format functions

In [8]:
DEFAULT_SAMPLING_RATE = 44100

In [9]:
def save_signal_to_wav(signal: np.ndarray, duration: float, path: str, sample_rate: int = None) -> None:
    """
    Saves the generated signal to a wav file on the given path

    Sample rate calculated as len(signal) / duration
    """
    if sample_rate == None:
        sample_rate = signal.shape[0] // duration

    wavfile.write(filename=path, rate=sample_rate, data=signal)

In [10]:
def read_signal_from_wav(path: str) -> Tuple[int, np.ndarray]:
    """
    Reads the signal from the wav file
    """
    rate, signal = wavfile.read(path)
    return rate, signal

## Encoding to digital signal

In [11]:
def plot_digital_signal(signal: np.ndarray) -> None:
    """
    Plots the digital signal in time
    """
    # gives the signal unit width
    time = np.array([t for i in range(1, signal.shape[0] + 1) for t in [i - 1, i]])
    signal = np.array([sig for sig in signal for _ in [0, 1]])

    plt.figure(figsize=(15, 5))
    plt.plot(time, signal, "b-")
    plt.grid(visible=True, which="both", axis="both")
    plt.show()

In [12]:
def binary_digital_signal(sentence: str) -> np.ndarray:
    """
    Makes a binary digital wave numpy array out of the passed sentence

    Turns sentence into bytes, then makes waves with width 1 unit,
    with amplitude 1 unit where byte was 1 else 0

    With width being 1, it just means that the freq is constant
    """
    byte_str = bytearray(sentence, encoding="utf-8")
    binary_chars = [
        char.replace("0b", "").rjust(8, '0')
        for char in list(map(bin, byte_str))
    ]
    binary_str = "".join(binary_chars)

    signal = np.array([int(char) for char in binary_str])

    return signal

In [13]:
def digital_am_signal(sentence: str) -> np.ndarray:
    """
    Makes a digital am wave numpy array out of the passed sentence

    Ords each char of the sentence and uses that number for the amplitude
    Width is 1 for each amplitude

    With width being 1, it just means that the freq is constant
    """
    byte_str = bytearray(sentence, encoding="utf-8")
    signal = np.array(list(byte_str))

    return signal

In [14]:
def morse_code_signal(sentence: str) -> np.ndarray:
    """
    Turns the sentence into morse code then that morse code string to a numpy signal

    . == 1 in code [1, 0] where 0 represents the end impulse 0
    - == 1 1 1
    ' ' == 0 0 0 in code only [0, 0] because it will always be after an end impulse 0
    """
    sentence = "~".join(list(sentence))
    sentence = sentence.upper().strip()
    signal = list(map(
        lambda d: [0, 0] if d == 0 else [1, 0] if d == 1 else [1, 1, 1, 0],
        list(
            map(
                lambda c: MORSE_TO_INT[c],
                "".join(list(map(lambda c: MORSE_CODE_DICT[c], sentence))),
            )
        ),
    ))
    return np.array([el for morse_char in signal for el in morse_char])

In [15]:
def binary_digital_signals_with_errors(sentence: str, error_ratio: int = 0.025) -> List[np.ndarray]:
    """
    Generates list of three identical signal that have some errors in their bits.

    Each singal is equal in length so they can be used to error correct changed bits.
    Error ration and number of bits that sentence determine how many errors there will be
    """
    seed(0)

    signals = [binary_digital_signal(sentence) for _ in range(3)]

    total_num_bits = len(sentence) * 8
    num_errors = int(error_ratio * total_num_bits)

    for _ in range(num_errors):
        signals[randint(0, 2)][randint(0, total_num_bits-1)] ^= 1

    return signals

## Decoding from digital signal

In [16]:
def decode_binary_digital_signal(signal: np.ndarray) -> str:
    """
    Decodes a string from digital binary signal

    Signal gets split for every 8th element, then 8 string bits get converted
    to int value and then to ascii char.
    """
    string = ""
    split_signal = np.split(signal, signal.shape[0] // 8)

    for split in split_signal:
        binary_char = "".join([str(bit) for bit in split])
        string += chr(int(binary_char, 2))

    return string

In [17]:
def decode_digital_am_signal(signal: np.ndarray) -> str:
    """
    Decodes a string from digital amplitude modulated singal

    Each amplitude gets converted to ascii char
    """
    string = "".join([chr(amp) for amp in signal])
    return string

In [18]:
def decode_morse_code_signal(signal: np.ndarray) -> str:
    """
    Decodes a string from digital morse code signal
    """
    past_sig = 0
    curr_morse_ch = ""
    curr_code = ""
    list_msg = []

    for sig in signal:
        if past_sig == 0 and sig == 0:
            if curr_morse_ch != "":
                list_msg.append(curr_morse_ch)

                curr_morse_ch = ""
                curr_code = ""
                continue

        elif past_sig == 1 and sig == 0:
            if curr_code == "1":
                curr_morse_ch += '.'
            else:
                curr_morse_ch += '-'
            curr_code = ''

        elif sig == 1:
            curr_code += '1'

        past_sig = sig

    list_msg.append(curr_morse_ch)

    sentence = [list(MORSE_CODE_DICT.keys())[list(MORSE_CODE_DICT.values()).index(a)] for a in list_msg]

    return "".join(sentence)

In [19]:
def correct_errors_in_binary_digital_signal(signals: List[np.ndarray]) -> np.ndarray:
    """
    Corrects errors in binary signal by combining information from multiple same source signals
    """

    corrected_signal = sum(signals)

    corrected_signal[corrected_signal == 1] = 0
    corrected_signal[corrected_signal == 2] = 1
    corrected_signal[corrected_signal == 3] = 1

    return corrected_signal

## Generating WAV file for digital signals

In [20]:
# Run this cell to generate example signals for workshop

binary_signal = binary_digital_signal("This is simple binary signal")
save_signal_to_wav(binary_signal, binary_signal.shape[0], "signal_01.wav")

am_modulated_signal = digital_am_signal("Here amplitudes have been modulated")
save_signal_to_wav(am_modulated_signal, am_modulated_signal.shape[0], "signal_02.wav")

morse_signal = morse_code_signal("Morse code was used in the past")
save_signal_to_wav(morse_signal, morse_signal.shape[0], "signal_03.wav")

signals_with_errors = binary_digital_signals_with_errors("Error correction mechanisms play a vital role in ensuring the reliability and integrity of data transmission in digital communication systems. These mechanisms are designed to detect and rectify errors that can occur during the transmission of binary data. One commonly used technique is the addition of redundant information, such as parity bits or checksums, which allow the receiver to identify and correct errors. More advanced error correction codes like Reed-Solomon codes and convolutional codes provide even greater resilience against errors by encoding the data in a mathematically structured way. These techniques are particularly crucial in scenarios where data integrity is paramount, such as in satellite communication, wireless networks, and data storage. Error correction mechanisms help maintain data accuracy, minimize retransmissions, and enhance the overall reliability of modern digital communication systems.")
for i, signal in enumerate(signals_with_errors):
    save_signal_to_wav(signal, signal.shape[0], f"signal_04_{i}.wav")

- signal_01.wav - digital binary signal, message requires reading bits and parsing it using ascii table
- signal_02.wav - amplitudes of digital signal have been modulated
- signal_03.wav - digital signal representing Morse code
- signal_04.wav - multiple signals used for ECC

## Encoding to analog signal

In [21]:
def plot_analog_signal(time: np.ndarray, signal: np.ndarray) -> None:
    """
    Plots the analog signal in time
    """
    plt.figure(figsize=(20, 5))
    plt.xticks(np.arange(min(time), max(time)+1, 0.5))
    plt.plot(time, signal)
    plt.grid(visible=True)
    plt.show()

In [22]:
def analog_am_signal(sentence: str) -> Tuple[np.ndarray, np.ndarray]:
    """
    Turns a sentence into an analog amplitude modulated signal

    Returns the signal component and time component for that signal
    """
    byte_str = bytearray(sentence, encoding="utf-8")
    signal = np.array(list(byte_str))

    period = 2 * np.pi
    time = [
        np.linspace(period*i, period*(i+1), 100)
        for i in np.arange(0, signal.shape[0])
    ]

    signal = np.concatenate(
        [sig_amp * np.sin(t) for sig_amp, t in zip(signal, time)]
    )

    return signal, np.concatenate(time)

In [23]:
def analog_fm_signal(sentence: str) -> Tuple[np.ndarray, np.ndarray]:
    """
    Turns a sentence into an analog frequency modulated signal

    Returns the signal component and the time component of that signal
    """
    byte_str = bytearray(sentence, encoding="utf-8")
    signal = np.array(list(byte_str))

    period = 2 * np.pi

    time_signal_pairs = [
        (t := np.linspace(0, period, 100), np.sin(period * sig_freq * t))
        for sig_freq in signal
    ]

    time = np.concatenate([
        t + i*period
        for i, (t, _) in enumerate(time_signal_pairs)
    ])

    signal = np.concatenate([
        sig
        for _, sig in time_signal_pairs
    ])

    return signal, time

In [24]:
def analog_wave_composition(sentence: str) -> Tuple[np.ndarray, np.ndarray]:
    """
    Turns a sentence into an analog wave signal which is gotten by composing
    waves for each char of the signal
    Each char is both amp and freq modulated at the same time
    by the ord of the char

    Returns the signal component and the time component of that signal
    """
    byte_str = bytearray(sentence, encoding="utf-8")
    signal = np.array(list(byte_str))

    period = 2 * np.pi

    time = np.linspace(0, period * signal.shape[0], 10_000)

    signal = np.sum(np.array([
        sig * np.sin(period * sig * time) for sig in signal
    ]), axis=0)

    return signal, time

In [25]:
def analog_white_noise_signal() -> Tuple[np.ndarray, np.ndarray]:
    """
    Generates sin wave (note A) with white noises added to it
    """
    np.random.seed(0)

    duration = 3                            # duration of sound in seconds
    sampling_freq = DEFAULT_SAMPLING_RATE   # important when saving to wav file
    freq = 400                              # note A frequency
    noise_amp = 0.2                         # used for dampening noise
    signal_amp = 1                          # default amplitude of sin wave

    time = np.linspace(0, duration, round(sampling_freq * duration))

    signal = signal_amp * np.sin(2 * np.pi * freq * time)
    white_noise = noise_amp * np.random.randn(time.shape[0])

    noisy_signal = signal + white_noise

    return noisy_signal, time

In [26]:
def analog_mixed_notes_signal() -> Tuple[np.ndarray, np.ndarray]:
    """
    Generates analog signal where notes are mixed together in one signal
    """
    amps = [2.0, 5.0, 2.0, 1.0]
    freqs = [220.0, 440.0, 880.0, 1760.0]

    duration = 5
    sampling_freq = DEFAULT_SAMPLING_RATE

    time = np.linspace(0, duration, round(duration * sampling_freq))
    period = 2 * np.pi

    signal = np.sum(np.array([
        amp * np.sin(period * freq * time) for amp, freq in zip(amps, freqs)
    ]), axis=0)

    return signal, time

## Decoding from analog singal

In [27]:
def decode_analog_am_signal(signal: np.ndarray) -> str:
    """
    Turns analog amplitude modulated signal into a sentence

    Calculates number of different amplitude of sin waves and then
    split main signal into sub signals and finds local maximums which represent
    different characters
    """
    sentence_len = len(signal[np.isclose(signal, 0)]) // 2
    split_signal = np.split(signal, sentence_len)

    sentence = "".join([chr(round(np.max(s))) for s in split_signal])

    return sentence

In [28]:
def decode_analog_fm_signal(signal: np.ndarray, time: np.ndarray) -> str:
    """
    Turns frequency modulated analog signal into a sentence
    """
    period = 2 * np.pi

    sentence_len = len(signal[signal == 0])
    split_signal = np.split(signal, sentence_len)
    split_time = np.split(time, sentence_len)
    t = split_time[0]
    sentence = []
    for sig in split_signal:
      for i in range(255):
        fake_sig = np.sin(np.array([i * a * period for a in t]))
        if np.allclose(fake_sig, sig, 0, 0.0001):
          sentence.append(chr(i))
          break
    return "".join(sentence)

In [29]:
def decode_analog_wave_composition(signal: np.ndarray, time: np.ndarray) -> str:
    """
    Uses FFT to transfrom signal into frequency domain and decode sentance
    """
    fft_spectrum = np.abs(np.fft.rfft(signal))
    fft_spectrum /= fft_spectrum.size
    fft_spectrum *= 2

    sampling_freq = time.size / time[-1]

    fft_freq = np.fft.rfftfreq(signal.size, 1 / sampling_freq)

    plt.figure(figsize=(15, 5))
    plt.plot(fft_freq, fft_spectrum)
    plt.xlabel("Frequency")
    plt.ylabel("Amplitude")
    plt.show()

In [30]:
 def filter_analog_white_noise_signal(signal: np.ndarray, filter_size: int = 50) -> np.ndarray:
    """
    Filters white noise from signal by using convolution and simple moving average filter

    Filtered signal is same in size as noisy signal
    """

    filter = np.ones(filter_size) / filter_size

    filtered_signal = np.convolve(signal, filter, "same")

    return filtered_signal

## Generating WAV file for analog signals

In [31]:
# Run this cell to generate example signals for workshop

am_signal, time = analog_am_signal("Analog signals can be represented using discrete values")
save_signal_to_wav(am_signal, am_signal.shape[0], "signal_05.wav")

# fm_signal, time = analog_fm_signal("Frequency can also hide secret messages")
# save_signal_to_wav(fm_signal, fm_signal.shape[0], "signal_06.wav")

# comp_signal, time = analog_wave_composition("abcdef")
# save_signal_to_wav(comp_signal, comp_signal.shape[0], "signal_07.wav")

noisy_signal, time = analog_white_noise_signal()
save_signal_to_wav(noisy_signal, time, "signal_06.wav", DEFAULT_SAMPLING_RATE)

notes_signal, time = analog_mixed_notes_signal()
save_signal_to_wav(notes_signal, time, "signal_07.wav", DEFAULT_SAMPLING_RATE)

- signal_05.wav - amplitude modulated analog signal, message is read from amplitude
- signal_06.wav - sin wave signal with white noise, requires convolution to denoise
- signal_07.wav - FM modulated analog signal with mixed notes, requires FFT to decode

## Processing and decoding signals recived by SDR

In [32]:
def encoding_morse_binary_signal(sentence: str) -> np.ndarray:
    """
    This function encodes sentece to binary, then it translates 0 to short duration note A
    and 1 to longer duration note A. Note B marks the beginning of the signal.
    """

    freq = 880
    amp = 0.2
    duration = 60   # seconds

    signal = binary_digital_signal(sentence)

    data = []

    for i in signal:
        data.extend([0] * round(DEFAULT_SAMPLING_RATE / 32))
        if i:
            data.extend([1] * round(DEFAULT_SAMPLING_RATE / 32) * 2)
        else:
            data.extend([1] * round(DEFAULT_SAMPLING_RATE / 32))

    signal = np.array(data).astype(np.float32)
    time = np.linspace(0, duration, len(signal))

    signal = signal * np.sin(2 * np.pi * freq * time) * amp
    leader = np.sin(2 * np.pi * 600 * time[:22050]) * amp
    signal = np.concatenate((leader, signal))

    return signal

In [33]:
def decoding_morse_binary_signal(signal: np.ndarray, sampling_rate: int, tolerance: int = 100) -> str:
    """
    Decodes signal that was encoded with encoding_morse_binary_signal function
    """

    split_signal = np.split(signal, np.where(signal == 0)[0])

    signal_of_len = np.array([len(k) for k in split_signal if len(k) > tolerance])
    threshold = (min(signal_of_len) + max(signal_of_len) ) // 2

    signal_of_len[signal_of_len <= threshold] = 0.0
    signal_of_len[signal_of_len >= threshold] = 1.0

    end = signal_of_len.size % 8
    if end != 0:
        signal_of_len = signal_of_len[:-end]

    message = decode_binary_digital_signal(signal_of_len)

    return message

### Main

In [36]:
# Saving to file
signal = encoding_morse_binary_signal(SECRET_SENTECE)
duration = signal.size // DEFAULT_SAMPLING_RATE
save_signal_to_wav(signal, 60, "kontakt.wav", round(DEFAULT_SAMPLING_RATE / 2))

# Reading from file and decoding
rate, signal = read_signal_from_wav("kontakt.wav")

# Cutting note B (start sound)
signal = signal[rate:]

decoding_morse_binary_signal(signal, rate)

'KDMDS PC MD PGC PC FCTO TJOLCFB! LBMBSO PC KD MLOUIB'

### Testing with real signal

In [35]:
import os

sdr_signal_path = 'radio_signal.wav'

if os.path.exists(sdr_signal_path):

    rate, signal = read_signal_from_wav(sdr_signal_path)
    signal = signal[:, 0]

    duration = signal.size / rate

    time = np.linspace(0, duration, signal.size)

    signal = signal[6*rate:47*rate]
    time = time[6*rate:47*rate]

    signal[abs(signal) <= 10_000] = 0.0
    signal[abs(signal) > 10_000] = 1.0

    filter_size = 50
    filter = np.ones(filter_size) / filter_size

    signal = np.convolve(signal, filter, mode = "same")

    signal[signal > 0] = 1.0

    # plot_analog_signal(time, signal)

    print(decoding_morse_binary_signal(signal, rate))