In [None]:
import numpy as np
import os
import pandas as pd
import torch
from torchaudio.functional.filtering import lfilter

In [None]:
import sys
sys.path.append("../")
from kdmc.data.sbasic import get_sbasic_datasets

get_sbasic_datasets(f'../../data', 1024, 0)

In [None]:
data = np.load(os.path.join("../../data/synthetic/signal", "sbasic_big.npz"))
iq = data['iq']
modulation = data['modulation']
y = data['y']
snr = data['snr']
rx_s = data['rx_s']

In [None]:
def rrcosfilter(N, alpha, Ts, gain=None):
    """
    Generates a root raised cosine (RRC) filter (FIR) impulse response.
    Parameters
    ----------
    N : int
        Length of the filter in samples.
    alpha : float
        Roll off factor (Valid values are [0, 1]).
    Ts : float
        Symbol period in seconds.
    Returns
    ---------
    h_rrc : 1-D ndarray of floats
        Impulse response of the root raised cosine filter.
    """
    t_vector = np.arange(N)-(N-1)/2
    h_rrc = np.zeros(N, dtype=np.float32)

    for x, t in enumerate(t_vector):
        if t == 0.0:
            h_rrc[x] = 1.0 - alpha + (4*alpha/np.pi)
        elif alpha != 0 and abs(t) == Ts/(4*alpha):
            h_rrc[x] = (alpha/np.sqrt(2))*(((1+2/np.pi)* \
                    (np.sin(np.pi/(4*alpha)))) + ((1-2/np.pi)*(np.cos(np.pi/(4*alpha)))))
        else:
            h_rrc[x] = (np.sin(np.pi*t*(1-alpha)/Ts) +  \
                    4*alpha*(t/Ts)*np.cos(np.pi*t*(1+alpha)/Ts))/ \
                    (np.pi*t*(1-(4*alpha*t/Ts)*(4*alpha*t/Ts))/Ts)
    if gain is None:
        h_rrc *= (1/np.sqrt(Ts))
    h_rrc = torch.from_numpy(h_rrc)
    return h_rrc

h = rrcosfilter(8 * 8 + 1, alpha=0.35, Ts=8)
h

In [None]:
h = torch.tensor([0.000255413507322333,0.00110422182418781,0.00166527642963825,0.00172728942499136,0.00119672217082238,0.000143118377136952,-0.00119380933122976,-0.00244282808313376,-0.00318217479899278,-0.00304811162353564,-0.00184716549701300,0.000356156769634307,0.00320234275056614,0.00606806822478606,0.00816924442299973,0.00872309340615226,0.00714095795073857,0.00321272897261628,-0.00275948306981454,-0.00991503060643535,-0.0168979866895801,-0.0220315921582807,-0.0235825830725101,-0.0200735499350221,-0.0105878322047461,0.00499197737985832,0.0258626845030293,0.0503282966092729,0.0759828182924085,0.100025226547374,0.119658253120396,0.132505285668751,0.136974268964650,0.132505285668751,0.119658253120396,0.100025226547374,0.0759828182924085,0.0503282966092729,0.0258626845030293,0.00499197737985832,-0.0105878322047461,-0.0200735499350221,-0.0235825830725101,-0.0220315921582807,-0.0168979866895801,-0.00991503060643535,-0.00275948306981454,0.00321272897261628,0.00714095795073857,0.00872309340615226,0.00816924442299973,0.00606806822478606,0.00320234275056614,0.000356156769634307,-0.00184716549701300,-0.00304811162353564,-0.00318217479899278,-0.00244282808313376,-0.00119380933122976,0.000143118377136952,0.00119672217082238,0.00172728942499136,0.00166527642963825,0.00110422182418781,0.000255413507322333], dtype=torch.float32)

In [None]:
iqt = torch.from_numpy(iq)

In [None]:
iq.shape

In [None]:
rx_sc = lfilter(iqt[0], torch.tensor([1] + [0]*(h.size(0) - 1)), h, clamp=False)
rx_sc[..., :160:8]

In [None]:
rx_sc = torch.conv1d(iqt[0:1], h.view(1, 1, -1).repeat(1,2,1), groups=1)
rx_sc[..., :160:8]

In [None]:
from scipy.signal import upfirdn
rx_sc = upfirdn(h.numpy(), iqt[0:1].numpy(), up=1, down=8)

In [None]:
rx_sc[..., :20]

In [None]:
rx_s[0, ..., :20]

In [None]:
rx_sc = np.convolve(iqt[0:1].numpy(), h.numpy())

In [None]:
reversed(h).view(1, 1, -1).repeat(1, 2, 1).shape

In [None]:
iqt[0:1].shape

In [None]:
rx_sc.shape

In [None]:
rx_s[0,:,8:]

In [None]:
def raised_root_cosine(upsample, n_span_symb, alpha, gain=None):
    """
    Root raised cosine (RRC) filter (FIR) impulse response.

    upsample: number of samples per symbol

    num_positive_lobes: number of positive overlaping symbols
    length of filter is 2 * num_positive_lobes + 1 samples

    alpha: roll-off factor
    """
    if gain is None:
        gain = 1 / np.sqrt(upsample)

    N = upsample * n_span_symb + 1
    t = (np.arange(N) - N / 2) / upsample

    # result vector
    h_rrc = np.zeros(t.size, dtype=float)

    # index for special cases
    sample_i = np.zeros(t.size, dtype=bool)

    # deal with special cases
    subi = t == 0
    sample_i = np.bitwise_or(sample_i, subi)
    h_rrc[subi] = 1.0 - alpha + (4 * alpha / np.pi)

    subi = np.abs(t) == 1 / (4 * alpha)
    sample_i = np.bitwise_or(sample_i, subi)
    h_rrc[subi] = (alpha / np.sqrt(2)) \
                * (((1 + 2 / np.pi) * (np.sin(np.pi / (4 * alpha))))
                + ((1 - 2 / np.pi) * (np.cos(np.pi / (4 * alpha)))))

    # base case
    sample_i = np.bitwise_not(sample_i)
    ti = t[sample_i]
    h_rrc[sample_i] = np.sin(np.pi * ti * (1 - alpha)) \
                    + 4 * alpha * ti * np.cos(np.pi * ti * (1 + alpha))
    h_rrc[sample_i] /= (np.pi * ti * (1 - (4 * alpha * ti) ** 2))

    # multiply by gain
    #h_rrc *= gain

    return h_rrc

In [None]:
def rcosfilter(N, beta, Ts, Fs):
    t = (np.arange(N) - N / 2) / Fs
    return np.where(np.abs(2*t) == Ts / beta,
        np.pi / 4 * np.sinc(t/Ts),
        np.sinc(t/Ts) * np.cos(np.pi*beta*t/Ts) / (1 - (2*beta*t/Ts) ** 2))
h = rcosfilter(8 * 8, 0.35, Ts=8, Fs=1)
h

In [None]:
h = raised_root_cosine(8, 8, 0.35)
print(len(h))
h

In [None]:
def compute_ml(self, x, snr) -> torch.Tensor:
        """Constructs the likelihood function for the given SNR and samples per symbol.
        Only valid for the supported modulations.

        Args:
            x (torch.Tensor): signal received. Shape: (batch_size, IQ, time_samples)
            snr (float): SNR in dB.
        
        Returns:
            torch.Tensor: likelihood values ()
        """
        N0 = 10 ** (-snr/20)
        sigma = N0 / torch.sqrt(2)
        K = torch.log(2 * torch.pi * sigma ** 2)

        likelihood = torch.zeros(x.shape[0], len(self.states_dict))
        for j, mod_states in enumerate(self.states_dict.values()):
            M = len(mod_states)
            distances = torch.zeros((x.shape[0], x.shape[2], M))
            for i, state in enumerate(mod_states):
                distances[..., i] = torch.sum((x - state) ** 2, dim=1) / (2 * sigma ** 2)
            likelihood_sample = torch.logsumexp(-distances, dim=-1) + K
            likelihood[:, j] = torch.sum(likelihood_sample, dim=-1)
        return F.softmax(likelihood, dim=-1)

In [None]:
# Without tx and rx filters
(yml_nf.argmax(1) == y.argmax(1)).mean()

In [None]:
# With "root" rx and tx filter
(yml.argmax(1) == y.argmax(1)).mean()

In [None]:
from kdmc.plot import plot_acc_vs_snr, plot_acc_vs_snr_by_class

df = pd.DataFrame({'snr': snr, 'acc': (yml.argmax(1) == y.argmax(1)), 'y': y.argmax(1), 'y_ml': yml.argmax(1)})
df_snr = df.groupby('snr', as_index=False)['acc'].mean().sort_values('snr')
fig = plot_acc_vs_snr(df_snr.acc, df_snr.snr, title="Accuracy vs SNR")

In [None]:
from kdmc.plot import plot_acc_vs_snr_by_class
from kdmc.data.sbasic import SBasic

df_snr_class = df.groupby(['snr', 'y'], as_index=False)['acc'].mean()
fig = plot_acc_vs_snr_by_class(df_snr_class, labels=SBasic.classes, title="Accuracy vs SNR")

In [None]:
df = pd.DataFrame({'snr': snr, 'acc': (yml_nf.argmax(1) == y.argmax(1)), 'y': y.argmax(1), 'y_ml': yml_nf.argmax(1)})
df_snr = df.groupby('snr', as_index=False)['acc'].mean().sort_values('snr')
fig = plot_acc_vs_snr(df_snr.acc, df_snr.snr, title="Accuracy vs SNR")

In [None]:
df_snr_class = df.groupby(['snr', 'y'], as_index=False)['acc'].mean()
fig = plot_acc_vs_snr_by_class(df_snr_class, labels=SBasic.classes, title="Accuracy vs SNR")

In [None]:
import h5py
import pandas as pd

mods = []
snrs = []
Ls = []
Ms = []
keys = []
data = {}
with h5py.File('../../data/synthetic/1024/dataset1024.mat') as f:
    for key in f['ds'].keys():
        keys.append(key)
        splits = key.split('rer')
        mods.append(splits[0])
        snrs.append(splits[1])
        Ls.append(splits[2])
        Ms.append(splits[4])
        data[key] = f['ds'][key][:]


In [None]:
df = pd.DataFrame({'mod': mods, 'snr': snrs, 'L': Ls, 'M': Ms, 'key': keys})
df['L'] = df['L'].astype(int)
df['M'] = df['M'].astype(int)
df['snr'] = df.snr.apply(lambda x: x.replace('neg', '-'))
df['snr'] = df['snr'].astype(float)
df.to_csv('../../data/synthetic/1024/dataset1024_keys.csv', index=False)

In [None]:
sps8_keys = df.loc[(df.L == 8) & (df.M == 1), 'key'].values
list_data = []
for key in sps8_keys:
    list_data.append(data[key][:,:2])

In [None]:
import numpy as np

dataset = np.concatenate(list_data, axis=2)
dataset = dataset.reshape(dataset.shape[2], dataset.shape[1], dataset.shape[0])

In [None]:
dataset.shape