# Experimental setup for a coherent optical homodyne (intradyne) transmission

In [None]:
import time, copy

import numpy as np
import scipy.signal as ssignal
import matplotlib.pyplot as plt
import scipy.interpolate as sinterp

try:
    # if you installed skcomm with pypi
    import skcomm  as sk
except:
    # if you like to use skcomm directly from source.
    # Please note: If you want to import skcomm into your file using this snippet, you must ensure that the file is on a directional level below skcomm.
    import sys, os
    current_parent_folder = os.path.abspath('..')
    if not current_parent_folder in sys.path:
        sys.path.append(os.path.join(current_parent_folder))
    import skcomm as skc

In [None]:
symbol_rate = 12.8e9
n_bits = 2**15
modulation_order = 4
roll_off = 0.1
dac_sr = 16e9
upload_samples = False
hold_shot = False
use_predistortion = False
sinc_correction = True

In [None]:

# contruct signal
sig_tx = skc.signal.Signal(n_dims=1)
sig_tx.symbol_rate = symbol_rate

tx_upsampling = dac_sr / sig_tx.symbol_rate[0]

# generate bits
sig_tx.generate_bits(n_bits=n_bits, seed=1)

# set constellation (modulation format)
sig_tx.generate_constellation(format='QAM', order=modulation_order)

# create symbols
sig_tx.mapper()

# upsampling and pulseshaping

sig_tx.pulseshaper(upsampling=tx_upsampling, pulseshape='rrc', roll_off=[roll_off])

# sinc correction
if sinc_correction:
    sig_tx.samples[0] = skc.pre_distortion.dac_sinc_correction(sig_tx.samples[0],
                                                                f_max=1.0)

# pre-equalization of AWG frequency response
if use_predistortion:
    #filtershape = np.load('setup_files/preDistFilter.npy')
    filtershape = np.transpose(np.array([[-16e9, -8e9, -7.8e9, 0, 7.8e9, 8e9, 16e9], [-100, -100, 2, 0 , 2,-100,-100]]))
    sig_tx.samples[0] = skc.filters.filter_arbitrary(sig_tx.samples[0], 
                                                      filtershape, 
                                                      sample_rate=sig_tx.symbol_rate[0]*tx_upsampling)

# format samples so that driver can handle them (range +-1)
maxVal = np.max(np.abs(np.concatenate((np.real(sig_tx.samples), np.imag(sig_tx.samples)))))
samples = np.asarray(sig_tx.samples) / maxVal
samples = np.concatenate((np.real(samples), np.imag(samples)))


Upload signal to arbitrary signal generator (AWG), E/O conversion, transmission, O/E conversion, sampling and download samples fron digital real-time oszilloscope

In [None]:
if upload_samples:                    
    # write samples to AWG        
    skc.instrument_control.write_samples_Tektronix_AWG70002B(samples, ip_address='192.168.1.21', 
                                                    sample_rate=[dac_sr], amp_pp=[0.25, 0.25], 
                                                    channels=[1, 2], log_mode = False)
    time.sleep(2.0)
    
if not hold_shot:
    # get samples from scope
    sr, samples = skc.instrument_control.get_samples_Tektronix_MSO6B(channels=[1, 2], 
                                                                    ip_address='192.168.1.20',
                                                                    number_of_bytes = 1,
                                                                    log_mode = False)
    
    tmp_shot = copy.deepcopy(samples)
else:
    samples = copy.deepcopy(tmp_shot) # see hint at https://github.com/spyder-ide/spyder/issues/11558
    # or, dump to and load from file:
    # see https://stackoverflow.com/questions/4530611/saving-and-loading-objects-and-using-pickle/4531859
    # import pickle
    # with open('tmp_shot.obj', 'wb') as ts:
    #   pickle.dump(tmp_shot, ts)
    # with open('tmp_shot.obj', 'rb') as ts:
    #   tmp_shot = pickle.load(ts)

# # I-Q imbalance correction (for suppressing the DD-term for high SSI-suppression)
# IQ_imbalance = -0.45  # [dB] (a pos. value means channel I is smaller than channel Q after coh. frontend, and v.v.)
# samples[0] = samples[0]*10**(0.5*IQ_imbalance/20)
# samples[1] = samples[1]*10**(-0.5*IQ_imbalance/20)

# # remove mean of signal
# samples = samples - np.mean(samples)

# construct complex signal
samples = samples[0] + 1j * samples[1]
samples = samples - np.mean(samples)


## Receiver

Resampling

In [None]:
# contruct rx signal structure
sig_rx = copy.deepcopy(sig_tx)
sig_rx.samples = samples
sig_rx.sample_rate = sr

sig_rx.plot_spectrum(tit='spectrum from scope',fNum=1)

# From here: "standard" coherent complex baseband signal processing ############
# resample to 2 sps
sps_new = 2
sps = sig_rx.sample_rate[0]/sig_rx.symbol_rate[0]
new_length = int(sig_rx.samples[0].size/sps*sps_new)
sig_rx.samples = ssignal.resample(sig_rx.samples[0], new_length, window='boxcar')
sig_rx.sample_rate = sps_new*sig_rx.symbol_rate[0]

sig_rx.plot_spectrum(tit='spectrum after resampling',fNum=2)


Estimate SNR

In [None]:
# estimate SNR
sig_range = np.asarray([-1, 1])*sig_rx.symbol_rate[0]/2*(1+roll_off)
noise_range = np.asarray([-1.1, -1.05, 1.05, 1.1]) * sig_rx.symbol_rate[0]/2 * (1+roll_off)

fft_temp = np.fft.fft(sig_rx.samples[0])/sig_rx.samples[0].size
RBW = sig_rx.sample_rate[0]/sig_rx.samples[0].size
spec = (np.abs(np.fft.fftshift(fft_temp))**2)/RBW # -> [W/Hz]
freq = np.fft.fftshift(np.fft.fftfreq(sig_rx.samples[0].size, 1/sig_rx.sample_rate[0]))
snr = skc.utils.estimate_snr_spectrum(freq, spec, sig_range=sig_range, 
                                       noise_range=noise_range, order=1, 
                                       noise_bw=sig_rx.symbol_rate[0], 
                                       scaling='lin', plotting=True, fNum=3)["snr_dB"]

print('est. SNR (from spectrum): {:.1f} dB'.format(snr))

Frequency offset estimation / correction

In [None]:
# frequency offset estimation / correction
results_foe = skc.rx.frequency_offset_correction(sig_rx.samples[0], 
                                                  sample_rate=sig_rx.sample_rate[0],
                                                  symbol_rate=sig_rx.symbol_rate[0])

sig_rx.samples[0] = results_foe['samples_corrected']
print('estimated frequency offset: {:.0f} MHz'.format(results_foe['estimated_fo']/1e6))


# normalize samples to mean magnitude of original constellation
mag_const = np.mean(abs(sig_rx.constellation[0]))
mag_samples= np.mean(abs(sig_rx.samples[0]))
sig_rx.samples = sig_rx.samples[0] * mag_const / mag_samples

sig_rx.plot_constellation(hist=True, tit='constellation before EQ', fNum=4)


Equalizer / matched filter

In [None]:

adaptive_filter = True
# either blind adaptive filter....
if adaptive_filter == True:    
    results = skc.rx.blind_adaptive_equalizer(sig_rx, n_taps=51, mu_cma=1e-4, 
                                               mu_rde=1e-5, mu_dde=0.5, decimate=True, 
                                               return_info=True, stop_adapting=-1, 
                                               start_rde=5000*0, start_dde=5000*0,
                                               exec_mode='auto')
    
    sig_rx = results['sig']
    h = results['h'][0]
    eps = results['eps'][0]
    # plot error evolution
    f = plt.figure(5)
    f.clear()
    plt.plot(np.abs(eps))
    plt.title('evolution of equalizer error')
    plt.xlabel('time / symbols')
    plt.ylabel('error /a.u.')
    plt.grid(visible=True)
    plt.show()       
    # plot evolution of filters frequency response
    f = plt.figure(6)
    f.clear()
    ax = plt.subplot(projection='3d')
    f = np.fft.fftshift(np.fft.fftfreq(h[0,:].size, d=1/sig_rx.sample_rate[0]))
    outsymbs = [0, 1000, 5000, 10000, 20000, 30000, h[:,0].size-1]    
    for outsymb in outsymbs:
        plt.plot(f, np.ones(f.size)*outsymb, np.abs(np.fft.fftshift(np.fft.fft(h[int(outsymb),:]))))
    plt.title('evolution of equalizer frequency response')
    plt.xlabel('frequency / Hz')
    plt.ylabel('time / symbols')  
    plt.grid(visible=True)
    plt.show()
     # plot last filter frequency response
    f = plt.figure(7)
    f.clear()
    f = np.fft.fftshift(np.fft.fftfreq(h[0,:].size, d=1/sig_rx.sample_rate[0]))
    plt.plot(f, np.abs(np.fft.fftshift(np.fft.fft(h[-1,:]))))
    plt.title('last equalizer frequency response')
    plt.xlabel('frequency / Hz')
    plt.ylabel('amplitude a.u.')
    plt.grid(visible=True)
    plt.show()               
        
    # cut away init symbols
    sps = int(sig_rx.sample_rate[0]/sig_rx.symbol_rate[0])
    cut = 10000
    sig_rx.samples = sig_rx.samples[0][int(cut)*sps:]

# ... or matched filtering
else:
    # Rx matched filter
    sig_rx.raised_cosine_filter(roll_off=roll_off,root_raised=True) 
    
    # crop samples here, if necessary
    sps = int(sig_rx.sample_rate[0] / sig_rx.symbol_rate[0])
    crop = 10*sps
    if crop != 0:
        sig_rx.samples = sig_rx.samples[0][crop:-crop]
    else:
        sig_rx.samples = sig_rx.samples[0]
    
    # sampling phase / clock adjustment
    BLOCK_SIZE = -1 # size of one block in SYMBOLS... -1 for only one block
    sig_rx.sampling_clock_adjustment(BLOCK_SIZE)
    


Decimation

In [None]:
# sampling (if necessary)
START_SAMPLE = 0
sps = sig_rx.sample_rate[0] / sig_rx.symbol_rate[0] # CHECK FOR INTEGER SPS!!!
sig_rx.samples = sig_rx.samples[0][START_SAMPLE::int(sps)]
sig_rx.plot_constellation(0, hist=True, tit='constellation after EQ',fNum=8)


Carrier phase estimation / recovery

In [None]:

# CPE
viterbi = True
# ...either VV
if viterbi:
    cpe_results = skc.rx.carrier_phase_estimation_VV(sig_rx.samples[0], n_taps=31, 
                                                      filter_shape='wiener', mth_power=4, 
                                                      rho=.001)
    sig_rx.samples = cpe_results['rec_symbols']
    est_phase = cpe_results['phi_est'].real
# ...or BPS
else:
    cpe_results = skc.rx.carrier_phase_estimation_bps(sig_rx.samples[0], sig_rx.constellation[0], 
                                               n_taps=15, n_test_phases=45, const_symmetry=np.pi/2,
                                               exec_mode='auto')
    sig_rx.samples = cpe_results['samples_corrected']
    est_phase = cpe_results['est_phase_noise']

f = plt.figure(9)
f.clear()
plt.plot(est_phase)
plt.title('estimated phase noise')
plt.xlabel('time / symbols')
plt.ylabel('phase / rad')
plt.grid(visible=True)
plt.show()

sig_rx.plot_constellation(hist=True, tit='constellation after CPE',fNum=10)


Synchronization and estimation of quality metric

In [None]:

# delay and phase ambiguity estimation and compensation
sig_rx = skc.rx.symbol_sequence_sync(sig_rx, dimension=-1)["sig"]
    
# calc EVM
evm = skc.utils.calc_evm(sig_rx, norm='max')
print("EVM: {:2.2%}".format(evm[0]))

# estimate SNR
snr = skc.utils.estimate_SNR_evm(sig_rx, norm='rms', method='data_aided', opt=False)
print("est. SNR: {:.2f} dB".format(snr[0]))

# decision and demapper
sig_rx.decision()
sig_rx.demapper()

# BER counting
ber_res = skc.rx.count_errors(sig_rx.bits[0], sig_rx.samples[0])
print('BER = {:.2e}'.format(ber_res['ber']))
