In [33]:

import acoular as ac #type: ignore
import numpy as np #type: ignore
import matplotlib.pyplot as plt #type: ignore
from scipy.signal import welch #type: ignore
from config import uma16_index, ConfigUMA
import h5py

In [34]:
device_index = uma16_index()
t = 1 # seconds

source = ac.SoundDeviceSamplesGenerator(device=device_index, numchannels=16) # object, responsible for recording the time data 
source.numsamples = int(t * source.sample_freq)

uma_config = ConfigUMA()
# microphone geometry
mics = uma_config.create_mics()

# calculate the CSM from the incoming data
freq_data = ac.PowerSpectra(
    time_data=source, 
    block_size=128, 
    window='Hanning'
    )



UMA-16 device: nanoSHARC micArray16 UAC2.0: USB Audio (hw:3,0) at index 5



In [35]:
folder = "uma16_example_recordings/"
hdf5_filename = folder + "450_and_700_1s_h5py.h5"

with h5py.File(hdf5_filename, 'r') as hdf5_file:
    signal = hdf5_file['data'][:]
    sample_freq = hdf5_file.attrs['sample_freq']
    channels = hdf5_file.attrs['channels']

print("Loaded signal shape:", signal.shape)
print("Sample freq:", sample_freq)
print("Number of channels:", channels)

fs = sample_freq #Abtastrate
t = np.arange(signal.shape[0]) / fs
df = 1/fs #Zeitabstand zwischen den Samples

Loaded signal shape: (44100, 16)
Sample freq: 44100.0
Number of channels: 16


In [36]:
# aus Acoular kopiert

class CSM():
    def __init__(self, signal, sample_freq, block_size, calib=None, num_blocks=1, precision=np.float32):
        self.signal = signal
        self.sample_freq = sample_freq
        self.block_size = block_size
        self.window_ = np.hanning
        self.calib = calib
        self.num_blocks = num_blocks
        self.precision = precision

    def get_source_data(self):
        num_samples = self.signal.shape[0]
        num_channels = self.signal.shape[1]

        for start in range(0, num_samples, self.block_size):
            end = start + self.block_size
            if end > num_samples:
                break 
            yield self.signal[start:end, :]

    def calc_csm(self):
        """Berechnung der Cross-Spectral Matrix (CSM)."""
        t = self.signal
        wind = self.window_(self.block_size)
        weight = np.dot(wind, wind)
        wind = wind[np.newaxis, :].swapaxes(0, 1)
        numfreq = int(self.block_size / 2 + 1)
        csm_shape = (numfreq, t.shape[1], t.shape[1])
        csm_upper = np.zeros(csm_shape, dtype=self.precision)

        if self.calib and self.calib.num_mics > 0:
            if self.calib.num_mics == t.shape[1]:
                wind = wind * self.calib.data[np.newaxis, :]
            else:
                raise ValueError('Calibration data not compatible: %i, %i' % (self.calib.num_mics, t.shape[1]))

        for data in self.get_source_data():
            ft = np.fft.rfft(data * wind, axis=0).astype(self.precision)
            for freq_idx in range(numfreq):
                csm_upper[freq_idx] += np.outer(ft[freq_idx], np.conj(ft[freq_idx]))
        
        csm_lower = csm_upper.conj().transpose(0, 2, 1)
        [np.fill_diagonal(csm_lower[cntFreq, :, :], 0) for cntFreq in range(csm_lower.shape[0])]
        csm = csm_lower + csm_upper
        
        return csm * (2.0 / self.block_size / weight / self.num_blocks)


csm_calculator = CSM(signal=signal, sample_freq=sample_freq, block_size=128)
csm_result = csm_calculator.calc_csm()

print("CSM shape:", csm_result.shape)


CSM shape: (65, 16, 16)


  ft = np.fft.rfft(data * wind, axis=0).astype(self.precision)


In [37]:
# für eine Frequenz
FREQ = 5000 #Hz



In [39]:
#Sehr unsicher, ob das richtig ist

def calculate_octave_band_csm(csm_result, freqs, center_freqs):
    band_csm = []
    factor = 2**(1/6) 
    
    for center_freq in center_freqs:
        lower_freq = center_freq / factor
        upper_freq = center_freq * factor
        
        freq_indices = np.where((freqs >= lower_freq) & (freqs <= upper_freq))[0]
        

        band_csm_mean = np.mean(csm_result[freq_indices], axis=0)
        band_csm.append(band_csm_mean)

    return band_csm

numfreq = csm_result.shape[0]
freqs = np.fft.rfftfreq(csm_calculator.block_size, d=1/sample_freq)

center_frequencies = [250, 315, 400, 500, 630, 800, 1000, 1250, 1600, 2000, 2500, 3150, 4000, 5000, 6300, 8000]

band_csms = calculate_octave_band_csm(csm_result, freqs, center_frequencies)

selected_csm = band_csms[0] 

print("Shape of selected CSM:", selected_csm.shape)

Shape of selected CSM: (16, 16)


In [8]:
# So würde ChatGPT das berechnen (nicht getestet)

def calculate_csm_welch(signal, fs, nperseg=None, noverlap=None):
    freqs, Pxx = welch(signal, fs=fs, nperseg=nperseg, noverlap=noverlap, axis=0)
    csm = np.einsum('ij,ik->ijk', Pxx, np.conjugate(Pxx))
    return csm, freqs

fs = 44100  # Beispielhafte Abtastrate
nperseg = 1024  # Anzahl der Samples pro Segment
noverlap = 512  # Anzahl der überlappenden Samples

csm, freqs = calculate_csm_welch(signal, fs, nperseg, noverlap)
csm.shape

(513, 16, 16)

In [9]:
def calculate_csm(signal):
    num_of_samples, num_of_channels = signal.shape
    fft_signal = np.fft.fft(signal, axis=0)
    csm = np.einsum('ij,ik->jk', fft_signal, np.conjugate(fft_signal)) / num_of_samples
    
    return csm

csm = calculate_csm(signal)
csm.shape

(16, 16)