# Importation des bibliothèques

In [1]:
#!/usr/bin/env python3

import sys
sys.path.insert(0, '../pyaf/py_aff3ct/build/lib')
sys.path.insert(0, '../pyaf/build/lib')

import pyaf
import numpy as np
import py_aff3ct as aff3ct
import math
import time
import matplotlib.pyplot as plt
from datetime import timedelta
from tempfile import TemporaryFile

from py_aff3ct.module.py_module import Py_Module
import py_aff3ct.tools.frozenbits_generator as tool_fb
import py_aff3ct.tools.noise as tool_noise
import py_aff3ct.tools.sparse_matrix as tool_sp
import random 

In [2]:
#############
# Main script
#############


########
# Parameters


#N = 576 
#K = 288
# K = 64*2
# N = 3*K
N = 128
K = 64

ebn0_min = 2.7
ebn0_max = 2.71
ebn0_step = 0.2

ebn0 = np.arange(ebn0_min,ebn0_max,ebn0_step)
esn0 = ebn0 + 10 * math.log10(K/N)
sigma_vals = 1/(math.sqrt(2) * 10 ** (esn0 / 20))

delta_x_range = np.arange(-4,4,0.05)
delta_y_range = np.arange(-4,4,0.05)

W = len(delta_x_range)
H = len(delta_y_range)

code_type = "polar"

if(code_type == "turbo"):
	# Build DVBS-RCS2 Turbo encoder.
	enc_n = aff3ct.module.encoder.Encoder_RSC_DB(K,2*K,standard='DVB-RCS1')
	enc_i = aff3ct.module.encoder.Encoder_RSC_DB(K,2*K,standard='DVB-RCS1')

	# Build DVBS-RCS2 Interleaver.
	itl_core = aff3ct.tools.interleaver_core.Interleaver_core_ARP_DVB_RCS1(K//2)
	itl_bit  = aff3ct.module.interleaver.Interleaver_int32(itl_core)
	itl_llr  = aff3ct.module.interleaver.Interleaver_float(itl_core)
	enc = aff3ct.module.encoder.Encoder_turbo_DB(K,N,enc_n,enc_i,itl_bit)

	# Build DVBS-RCS2 Trubo decoder.
	trellis_n = enc_n.get_trellis()
	trellis_i = enc_i.get_trellis()

	dec_n = aff3ct.module.decoder.Decoder_RSC_DB_BCJR_DVB_RCS1(K,trellis_n)
	dec_i = aff3ct.module.decoder.Decoder_RSC_DB_BCJR_DVB_RCS1(K,trellis_i)
	dec   = aff3ct.module.decoder.Decoder_turbo_DB(K,N,8,dec_n,dec_i,itl_llr)

elif(code_type == "polar"): #polar

	fbgen = tool_fb.Frozenbits_generator_GA_Arikan(K, N)
	esn0 = 3.0 + 10 * math.log10(K/N)
	sigma = 1/(math.sqrt(2) * 10 ** (esn0 / 20))
	noise = tool_noise.Sigma(sigma)
	fbgen.set_noise(noise)
	frozen_bits = fbgen.generate()

	enc  = aff3ct.module.encoder.Encoder_polar_sys      (K,N,frozen_bits)     # Build the encoder
	#dec  = aff3ct.module.decoder.Decoder_polar_SC_fast_sys(K,N,frozen_bits)   # Build the decoder
	#dec = aff3ct.module.decoder.Decoder_polar_SCL_MEM_fast_sys(K,N,8,frozen_bits)
	I = 10
	dec = aff3ct.module.decoder.Decoder_polar_SCAN_naive_sys(K, N, I, frozen_bits)
elif(code_type == "ldpc"):
	I = 10
	# H   = tool_sp.alist.read("../py_aff3ct/lib/aff3ct/conf/dec/LDPC/WIMAX_288_576.alist")
	# H   = tool_sp.alist.read("../py_aff3ct/lib/aff3ct/conf/dec/LDPC/10GBPS-ETHERNET_1723_2048.alist")
	H = tool_sp.alist.read("../py_aff3ct/lib/aff3ct/conf/dec/LDPC/CCSDS_64_128.alist")
	

	N   = H.shape[0]
	m   = H.shape[1]
	K   = N - m
	enc  = aff3ct.module.encoder.Encoder_LDPC_from_H    (K, N, H)                                                   # Build the encoder
	dec  = aff3ct.module.decoder.Decoder_LDPC_BP_horizontal_layered_inter_NMS (K, N, I, H, enc.get_info_bits_pos()) # Build the decoder
else:
	K = 106
	N = 127
	t = 3
	poly = aff3ct.tools.BCH_polynomial_generator(N, t)
	enc = aff3ct.module.encoder.Encoder_BCH(K, N, poly)
	dec = aff3ct.module.decoder.Decoder_BCH_std(K, N, poly)

src  = aff3ct.module.source.Source_random_fast(K, 12)
mdm  = aff3ct.module.modem.Modem_BPSK_fast(N)

gen = aff3ct.tools.Gaussian_noise_generator_implem.FAST
chn = aff3ct.module.channel.Channel_AWGN_LLR(N, gen)
mnt = aff3ct.module.monitor.Monitor_BFER_AR(K, 1)

sigma      = np.ndarray(shape = (1,1),  dtype = np.float32)
delta_x    = np.ndarray(shape = (1,1),  dtype = np.float32)
delta_y    = np.ndarray(shape = (1,1),  dtype = np.float32)
ix_x       = np.ndarray(shape = (1,1),  dtype = np.int32  )
ix_y       = np.ndarray(shape = (1,1),  dtype = np.int32  )
noisy_vec  = np.ndarray(shape = (1,N),  dtype = np.float32)
enable     = np.ndarray(shape = (1,1),  dtype = np.int32  )

enable[:] = 0

map = np.zeros( (H,W,3), dtype=np.uint8)
src.reset()

from datetime import datetime

seed = random.randint(0, 123456789)
src.set_seed(seed+1)
chn.set_seed(seed)

# Séquence 1 : génération des vecteur "r_in" et "x"

In [3]:
src["generate   ::U_K  "] = enc["encode       ::U_K "]
enc["encode     ::X_N  "] = mdm["modulate     ::X_N1"]
mdm["modulate   ::X_N2 "] = chn["add_noise    ::X_N "]
chn["add_noise  ::Y_N  "] = mdm["demodulate   ::Y_N1"]
mdm["demodulate ::Y_N2 "] = dec["decode_siho  ::Y_N "]
src["generate   ::U_K  "] = mnt["check_errors ::U   "]
dec["decode_siho::V_K  "] = mnt["check_errors ::V   "]
chn["add_noise  ::CP   "].bind( sigma  )
mdm["demodulate ::CP   "].bind( sigma  )

seq1 = aff3ct.tools.sequence.Sequence(src("generate"),  1)
seq1.export_dot("chaos.dot")

The use of parenthesis for accessing tasks is deprecated, use brackets instead.


# Chargement du fichier

In [4]:
load_file = False
if(load_file == False):
	for i in range(len(sigma_vals)):
		sigma[:] = sigma_vals[i]
		seq1.exec()		
		print("be=",mnt.get_n_be())
		print("fe=",mnt.get_n_fe())
		mnt.reset()
	with open('noisy_cw.npy', 'wb') as f:
		np.save(f, dec['decode_siho::Y_N'][:])
		np.save(f, src['generate::U_K'][:])
else:
	with open('noisy_cw.npy', 'rb') as f:
		mdm["demodulate ::Y_N2 "][:] = np.load(f)
		src['generate :: U_K'][:] = np.load(f)

noisy_vec = dec['decode_siho::Y_N'][:]

be= 5
fe= 1


# Déclaration du module "Conductor".

In [5]:
ix_x[:]   = np.random.randint(N) #idx[0]
ix_y[:]   = (ix_x[:]+5) % N      #idx[1]

cdc = pyaf.conductor.Conductor(noisy_vec[0,:], N, ix_x, ix_y) # Conductor

# Parameters and function for the tests

In [6]:
from statistics import mean

errors          = 0
test_number     = 0

# Number of errors
noisy_vec_error = 0
delta_x_error   = 0
delta_y_error   = 0
ix_x_error      = 0
ix_y_error      = 0
x_error         = 0
y_error         = 0

# Number of tests
noisy_vec_test  = 0
delta_x_test    = 0
delta_y_test    = 0
ix_x_test       = 0
ix_y_test       = 0
x_test          = 0
y_test          = 0

# Listing the errors
l_error_noisy_vec = [0] # Not used because no errors in this object
l_error_delta_x   = [0]
l_error_delta_y   = [0]
l_error_ix_x      = [0] # Not used because no errors in this object
l_error_ix_y      = [0] # Not used because no errors in this object
l_error_x         = [0]
l_error_y         = [0]

# Adding the value of the error
def add_error(val_cpp, val_python) :
    return abs(val_cpp - val_python)

# Testing the C++ "Conductor" module.
##### Issue 1 : float approximation in c++ leads to assertion errors. Needs to count and calculate these errors (average and max).

In [7]:

for x in range(len(delta_x_range)):
    for y in range(len(delta_y_range)):

        delta_x[:] = delta_x_range[x]
        delta_y[:] = delta_y_range[y]
        cdc['generate'].exec()

        if (cdc["generate :: noisy_vec"][:] != noisy_vec[0,:]).all() : 
            errors += 1
            noisy_vec_error += 1
        test_number += 1
        noisy_vec_test += 1
        if cdc["generate :: delta_x"  ][:] != delta_x[:] :
            errors += 1
            delta_x_error += 1
            l_error_delta_x.append(add_error(cdc["generate :: delta_x"  ][0,0], delta_x[0,0]))
        test_number += 1
        delta_x_test += 1
        if cdc["generate :: delta_y"][:] != delta_y[:] :
            errors += 1
            delta_y_error += 1
            l_error_delta_y.append(add_error(cdc["generate :: delta_y"  ][0,0], delta_y[0,0]))
        test_number += 1
        delta_y_test += 1
        if cdc["generate :: ix_x"   ][:] != ix_x[:] :
            errors += 1
            ix_x_error += 1
        test_number += 1
        ix_x_test += 1
        if cdc["generate :: ix_y"   ][:] != ix_y[:]	:
            errors += 1
            ix_y_error += 1
        test_number += 1
        ix_y_test += 1
        if cdc["generate :: x"      ][:] != x :
            errors += 1
            x_error += 1
            l_error_x.append(add_error(cdc["generate :: x"  ][0,0], x))
        test_number += 1
        x_test += 1
        if cdc["generate :: y"      ][:] != y :
            errors += 1
            y_error += 1
            l_error_y.append(add_error(cdc["generate :: y"  ][0,0], y))
        test_number += 1
        y_test += 1
print("Number of tests  : ", test_number)
print("Number of errors : ", errors)
print("Errors rate      : ", (errors / test_number)*100)
print("Success rate     : ", ((test_number - errors) / test_number)*100)
print("\n")
print("Error rate (local)  : (number of errors for this test) / (number of time this test is done).")
print("Error rate (global) : (number of errors for this test) / (number of all tests done).\n")

presentation   = '          |{:>10} | {:>10} | {:>20} | {:>20} | {:>20} | {:>20} |'.format("Tests", "Errors", "Error rate (local)", "Error rate (global)", "Average error", "Max error")
line_noisy_vec = 'noisy_vec |{:>10} | {:>10} | {:>20} | {:>20} | {:>20} | {:>20} |'.format(noisy_vec_test, noisy_vec_error, round((noisy_vec_error / noisy_vec_test) * 100, 1), round((noisy_vec_error / test_number) * 100, 1), 0, 0 )
line_delta_x   = 'delta_x   |{:>10} | {:>10} | {:>20} | {:>20} | {:>20} | {:>20} |'.format(delta_x_test, delta_x_error, round((delta_x_error / delta_x_test) * 100, 1), round((delta_x_error / test_number) * 100, 1), mean(l_error_delta_x), max(l_error_delta_x))
line_delta_y   = 'delta_y   |{:>10} | {:>10} | {:>20} | {:>20} | {:>20} | {:>20} |'.format(delta_y_test, delta_y_error, round((delta_y_error / delta_y_test) * 100, 1), round((delta_y_error / test_number) * 100, 1), mean(l_error_delta_y), max(l_error_delta_y))
line_ix_x      = 'ix_x      |{:>10} | {:>10} | {:>20} | {:>20} | {:>20} | {:>20} |'.format(ix_x_test, ix_x_error, round((ix_x_error / ix_x_test) * 100, 1), round((ix_x_error / test_number) * 100, 1), 0, 0)
line_ix_y      = 'ix_y      |{:>10} | {:>10} | {:>20} | {:>20} | {:>20} | {:>20} |'.format(ix_y_test, ix_y_error, round((ix_y_error / ix_y_test) * 100, 1), round((ix_y_error / test_number) * 100, 1), 0, 0)
line_x         = 'x         |{:>10} | {:>10} | {:>20} | {:>20} | {:>20} | {:>20} |'.format(x_test, x_error, round((x_error / x_test) * 100, 1), round((x_error / test_number) * 100, 1), mean(l_error_x), max(l_error_x))
line_y         = 'y         |{:>10} | {:>10} | {:>20} | {:>20} | {:>20} | {:>20} |'.format(y_test, y_error, round((y_error / y_test) * 100, 1), round((y_error / test_number) * 100, 1), mean(l_error_y), max(l_error_y))
print(presentation  )
print(line_noisy_vec)
print(line_delta_x  )
print(line_delta_y  )
print(line_ix_x     )
print(line_ix_y     )
print(line_x        )
print(line_y        )

# Using float  for "step" in c++ function arange : average error is 1.1763107465867506e-07. 
# Using double for "step" in c++ function arange : average error is 1.4343254435187323e-14.


Number of tests  :  179200
Number of errors :  320
Errors rate      :  0.17857142857142858
Success rate     :  99.82142857142857


Error rate (local)  : (number of errors for this test) / (number of time this test is done).
Error rate (global) : (number of errors for this test) / (number of all tests done).

          |     Tests |     Errors |   Error rate (local) |  Error rate (global) |        Average error |            Max error |
noisy_vec |     25600 |          0 |                  0.0 |                  0.0 |                    0 |                    0 |
delta_x   |     25600 |        160 |                  0.6 |                  0.1 | 1.4343254435187323e-14 | 1.4432899320127035e-14 |
delta_y   |     25600 |        160 |                  0.6 |                  0.1 | 1.4343254435187323e-14 | 1.4432899320127035e-14 |
ix_x      |     25600 |          0 |                  0.0 |                  0.0 |                    0 |                    0 |
ix_y      |     25600 |          0 | 

In [8]:
for x in range(len(delta_x_range)):
	for y in range(len(delta_y_range)):
		delta_x[:] = delta_x_range[x]
		delta_y[:] = delta_y_range[y]
		cdc['generate'].exec()
		assert((cdc["generate :: noisy_vec"][:] == noisy_vec[0,:]).all()	)
		assert( cdc["generate :: delta_x"  ][:] == delta_x[:]		)
		assert( cdc["generate :: delta_y"][:] 	== delta_y[:]		)
		assert( cdc["generate :: ix_x"   ][:] 	== ix_x[:]			)
		assert( cdc["generate :: ix_y"   ][:] 	== ix_y[:]			)
		assert( cdc["generate :: x"      ][:] 	== x				)
		assert( cdc["generate :: y"      ][:] 	== y				)
print("Module de source validé !\n")

AssertionError: 