In [5]:
import pandas as pd
import numpy as np
import struct
import xml.etree.ElementTree
import os

def xdat2array(xdat_path,little_endian):
    with open(xdat_path,'rb') as f:
        b = f.read()
        num_shorts = int(len(b) / 2)
        if little_endian:
            xdat = struct.unpack("<{}h".format(num_shorts),b)
        else:
            xdat = struct.unpack(">{}h".format(num_shorts),b)
        xdat_array = np.array(xdat,dtype='float32')
        xdat_array = xdat_array.reshape((int(len(xdat_array)/2),2))
        return xdat_array

#calculates the overall number of samples (shorts) of all xdat files in a given directory (for efficient allocation)
def calc_num_xdat_samples(data_dir):
    num_samples = 0
    xdat_data_files =[os.path.join(data_dir,file) for file in os.listdir(data_dir) if file.endswith('.xdat')]
    for file in xdat_data_files:
        num_samples = num_samples + int(os.path.getsize(file)/2) #divide by 2 because short = 2 bytes
    return num_samples

def get_xhdr_scale_factor(xhdr_path):
    e=xml.etree.ElementTree.parse(xhdr_path)
    return float(e.find('captures').find('capture').get('acq_scale_factor'))

#assuming sampling rate is the same in consecutive xdat recordings
def get_xhdr_sample_rate(data_dir):
    xhdr_files = [os.path.join(data_dir,file) for file in os.listdir(data_dir) if file.endswith('.xhdr')]
    xhdr_path = xhdr_files[0]
    e=xml.etree.ElementTree.parse(xhdr_path)
    return int(e.find('captures').find('capture').get('sample_rate'))


def get_xhdr_little_endian(xhdr_path):
    e=xml.etree.ElementTree.parse(xhdr_path)
    return bool(e.find('data_files').find('data').get('little_endian'))


def load_xdat_data(data_files,num_samples):
    xdat = np.zeros((int(num_samples/2) , 2),dtype='float32')
    pos = 0
    for file in data_files:
        xhdr_path = "".join((os.path.splitext(file)[0],'.xhdr'))
        scale_factor = get_xhdr_scale_factor(xhdr_path)
        little_endian = get_xhdr_little_endian(xhdr_path)
        xdat_part = xdat2array(file,little_endian)
        xdat_part = scale_factor * 2**16 * xdat_part
        curr = pos + len(xdat_part)
        xdat[pos : curr,:] = xdat_part
        pos = curr
    return xdat

#Return data,data_type
def load_raw_data(data_dir):
    xdat_data_files =[os.path.join(data_dir,file) for file in os.listdir(data_dir) if file.endswith('.xdat')]
    data = []
    if len(xdat_data_files) > 0:
        num_samples = calc_num_xdat_samples(data_dir)
        data = load_xdat_data(xdat_data_files,num_samples)
    return data

def samples2complex(samples):
    return samples[:,0]+1j*samples[:,1]

def complex2power(complex_data):
    return 20*np.log10(1000*np.absolute(complex_data))

def trim_by_seq_length(data, seq_length):
    num_samples = data.shape[0]
    trim_num_samples = num_samples - num_samples % seq_length
    # trim to the data to fit sequence length
    trim_data = data[:trim_num_samples]
    return trim_data


In [6]:
from numpy import max,min,imag,real

def get_normal_data(filename):
    filename_xdat = filename + ".xdat"
    filename_xhdr = filename + ".xhdr"
    scale = get_xhdr_scale_factor(filename_xhdr)
    fs = get_xhdr_sample_rate(data_dir)
    data = samples2complex(xdat2array(filename_xdat,True))
    ts = 1/fs
    
    return data,fs,ts

In [7]:
def cw(x_len, ts, fc):
    t = np.arange(0,x_len)*ts
    cw = np.exp(1j*2*np.pi*fc*t);
def channel(data):
    R = np.random.rayleigh(size=data.size)
    return data*R.astype('int')

In [8]:
import numpy as np
from scipy.signal import butter, lfilter, freqz, kaiserord, firwin

def butter_lowpass(cutoff, fs, order=5):
    nyq_rate = fs/2
    normal_cutoff = cutoff / nyq_rate
    b, a = butter(order, normal_cutoff, btype='low', analog=False)
    return b, a

def kaiser_lowpass(cutoff, fs, order=5):
    nyq_rate = fs
    width = cutoff*1.1/nyq_rate
    ripple_db = 60.0
    N, beta = kaiserord(ripple_db, width)
    taps = firwin(N, cutoff/nyq_rate, window=('kaiser', beta))
    return taps, 1.0

In [9]:
from numpy import zeros
#from numpy.matlib import repmap
#from scipy.signal import chirp

def sweep(x_len, fs, f0, t1, f1, filtered=True, order=20):
    t0 = 0; #sec
    fs = 1/ts
    b = (f1-f0)/(t1-t0)

    sweep = zeros(x_len,dtype="complex64")
    t = np.arange(t0,t1,ts);
    t_len = len(t)

    for k in np.arange(0,x_len//t_len):
        del_b=0.5*b*np.random.rand();
        sweep[t_len*k:t_len*(k+1)]=np.exp(1j*2*np.pi*(f0+0.5*(b+del_b)*t)*t)

    if (sweep.size < x_len):
        sweep.append(zeros(data_len-sweep.size))
    elif(sweep.size > x_len):
        sweep = sweep[1:x_len]
        
    if(filtered):
        return filter_sweep(sweep,fs,f0,f1)
    else:
        return sweep
        
def filter_sweep(data,fs,f0,f1,order=20):
    f_co = (abs(f0-f1))/2;
    filt = butter_lowpass(f_co,fs, order=order)
    fc = (f0+f1)/2
    t = np.arange(0,data.size)/fs
    data *= np.exp(-1j*2*np.pi*fc*t) # move to bb
    data = lfilter(filt[0],filt[1],data) # filter
    data *= np.exp(1j*2*np.pi*fc*t) # bring back
    
    return data
    # for k in np.arange(0,data_len//t_sw_len):
    #     del_b=0.5*b*np.random.rand();
    #     sweep[t_sw_len*k:t_sw_len*(k+1)] = chirp(t=t_sw,f0=fstart,f1=fstop,t1=t_stop)

    # sweep(1:t_len) = exp(1j*2*pi*(fstart+0.5*b*t).*t);
    # sweep=repmat(sweep,1,(floor((length(record_rec))/(length(sweep))))); 

In [10]:
# from scipy.signal import spectrogram,get_window
# import matplotlib.pyplot as plt

# _,_,spec = spectrogram(x=sweep,fs=fs,window=get_window("blackmanharris",1024),return_onesided=False, mode='complex',nperseg=1024)
# spec = complex2power(spec)

In [11]:
#t_sw_len
# np.argwhere(np.isnan(spec))
# plt.imshow(spec[0:len(spec)//10],aspect='auto')

In [12]:
# len(np.argwhere(np.isnan(sweep)))

In [13]:
def absolute_max(data):
    maxi = max((data.real.max(),data.imag.max()))
    mini = min((data.real.min(),data.imag.min()))
    return max((maxi,abs(mini)))

def scale_dist_to_data(data,disturbance):
    abs_max_data = absolute_max(data)
    abs_max_dist = absolute_max(disturbance)
    
    disturbance = disturbance/abs_max_dist*(np.iinfo(np.int16).max-abs_max_data-1)
    
    return disturbance

# disturbance = disturbance/10**(disturbance_atten_db/10)

In [14]:
from shutil import copyfile
from time import time


def save_signal(filename,data):
    if(absolute_max(data) > np.iinfo(np.int16).max):
        print("Scaling problem. Please attenuate disturbance");
        exit()

    B = np.zeros( (data.size,2) )
    B[:,0] = real(data)
    B[:,1] = imag(data)
    B = np.reshape(B,(-1,)).astype('int16')
    
    filename_xhdr = filename + ".xhdr"
    new_filename = filename + "_new_" + str(int(time()))
    new_filename_xdat = new_filename + ".xdat"
    new_filename_xhdr = new_filename + ".xhdr"
    file = open(new_filename_xdat,'wb');
    file.write(struct.pack('<{}h'.format(len(B)),*B))
    file.close()
    copyfile(filename_xhdr,new_filename_xhdr)

In [15]:
data_dir = "test\\"
filename = "CELL_NORM"
data,fs,ts = get_normal_data(data_dir + filename)

In [16]:
f_start = -5e6
f_stop = 5e6
sweep_time = 100e-6
disturbance = sweep(data.size, ts, f_start, sweep_time, f_stop, filtered=False)
disturbance //= 10
disturbance = channel(disturbance)
disturbance = filter_sweep(disturbance,fs,f_start,f_stop)
disturbance = scale_dist_to_data(data=data,disturbance=disturbance)

In [17]:
total_signal = data + disturbance
save_signal(data_dir + filename,total_signal)

In [18]:
20*np.log10(3.27217e-12*62853120)

-73.73674601302794