# Benchmark different file transform approaches

This document serves the purpose of improving certain time critical parts of the file creat_cube.py from the CryoCube repository on Github.

### Setup
Importing the necessary modules (run poetry install to use the environment for this)

In [1]:
#python basemodules and jupyter modules
import os
import sys
import multiprocessing as mp
from IPython.display import display

#benchmarking
import time
import cProfile
import pstats
import io

# data handling
import h5py
import xarray as xr
import numpy as np
import pandas as pd
import scipy
from scipy import signal, fft
import dask.array as da
import pyfftw
import pyfftw.interfaces.dask_fft as dafft
import pickle

repo_dir = os.popen('git rev-parse --show-toplevel').read().strip()


Reading the folders and filenames

In [2]:
base="/work/le837wmue-Rhone_download/DAS_2020"
os.chdir(base)
folders=os.listdir()
#folders

In [3]:
os.chdir(f"{base}/{folders[0]}")
files=os.listdir()
print("Number of files in folder 1:", len(files))
files[0:10]

Number of files in folder 1: 1307


['rhone1khz_UTC_20200707_205138.931.h5',
 'rhone1khz_UTC_20200707_154908.931.h5',
 'rhone1khz_UTC_20200707_164408.931.h5',
 'rhone1khz_UTC_20200707_154608.931.h5',
 'rhone1khz_UTC_20200707_235338.931.h5',
 'rhone1khz_UTC_20200707_135038.931.h5',
 'rhone1khz_UTC_20200707_190438.931.h5',
 'rhone1khz_UTC_20200707_182508.931.h5',
 'rhone1khz_UTC_20200707_183838.931.h5',
 'rhone1khz_UTC_20200707_191908.931.h5']

### Benchmarking the file read and conversion to numpy array

1. Using h5py as originally

In [4]:
start=time.time()
f=h5py.File(files[0],  'r')
dset=f['Acoustic']
print(np.array(dset))
end=time.time()
print("Time elapsed:", end-start)

[[  2979  -4623  -8582 ...   3592   2945   3725]
 [  7908  -2019  -6477 ...  -2881  -5727    647]
 [-10515  -3669   2995 ...   2528   1463  -5090]
 ...
 [  6664  15762  12302 ...  -4615  -8789  -5392]
 [   804    389   1578 ...   -908 -12766 -12201]
 [ -4772 -10940  -1687 ...   5943  -1315  -1256]]
Time elapsed: 0.4123396873474121


2. Using xarray. Xarray uses distributed chunked reading, so we can assume that it is faster than h5py.

In [4]:
start=time.time()
xr_h5=xr.open_dataset(files[0], engine='h5netcdf', backend_kwargs={'phony_dims': 'access'}) # we need to pass phony_dims as the file has no xarray readable dimensions
print(xr_h5["Acoustic"].compute().values)
end=time.time()
print("Time elapsed:", end-start)

[[  2979  -4623  -8582 ...   3592   2945   3725]
 [  7908  -2019  -6477 ...  -2881  -5727    647]
 [-10515  -3669   2995 ...   2528   1463  -5090]
 ...
 [  6664  15762  12302 ...  -4615  -8789  -5392]
 [   804    389   1578 ...   -908 -12766 -12201]
 [ -4772 -10940  -1687 ...   5943  -1315  -1256]]
Time elapsed: 0.4220571517944336


Reading a single file, xarray is about 3 hundredths faster than h5py. Let's see if this scales:

In [15]:
start=time.time()
for index,file in enumerate(files[0:40]):
    f=h5py.File(files[index],  'r')
    dset=f['Acoustic']
    np.array(dset)
end=time.time()
print("Time elapsed:", end-start)

Time elapsed: 20.112152338027954


In [16]:
start=time.time()
for index,file in enumerate(files[0:40]):
    xr_h5=xr.open_dataset(files[index], engine='h5netcdf', backend_kwargs={'phony_dims': 'access'})
    xr_h5["Acoustic"].compute().values
end=time.time()
print("Time elapsed:", end-start)

Time elapsed: 3.881666660308838


It does! Reading 40 files with h5py takes 17.9 seconds, with xarray it takes 2.9 seconds.
Can we use multiprocessing to speed up the process?

In [6]:
cpu_count=mp.cpu_count()*2//3 # we tae two thirds so the open file limit is not exceeded
cpu_count

85

In [18]:
def read_file(file):
    with h5py.File(file, 'r') as f: # we need with so it actually closes
        dset = f['Acoustic']
        np.array(dset)

start=time.time()
pool=mp.Pool(cpu_count)
pool.map(read_file, files[0:40])
pool.close()
pool.join()
end=time.time()
print("Time elapsed:", end-start) 

Time elapsed: 3.781881093978882


In [20]:
def read_file(file):
    xr_h5=xr.open_dataset(file, engine='h5netcdf', backend_kwargs={'phony_dims': 'access'})
    xr_h5["Acoustic"].compute().values

start=time.time()
pool=mp.Pool(cpu_count)
pool.map(read_file, files[0:40])
pool.close()
pool.join()
end=time.time()
print("Time elapsed:", end-start) 

Time elapsed: 2.9076411724090576


Xarray with the underlying dask is already using distributed computing. Still, we see that we can improve the reading and conversion to 1.78 seconds. 
However, h5py's conversion time is also highly reduced to only 2.54 seconds. Let's try it with more files:

In [None]:
def read_file(file):
    with h5py.File(file, 'r') as f: # we need with so it actually closes
        dset = f['Acoustic']
        np.array(dset)

start=time.time()
pool=mp.Pool(mp.cpu_count())
pool.map(read_file, files[0:200])
pool.close()
pool.join()
end=time.time()
print("Time elapsed:", end-start) 

In [None]:
def read_file(file):
    xr_h5=xr.open_dataset(file, engine='h5netcdf', backend_kwargs={'phony_dims': 'access'})
    xr_h5["Acoustic"].compute().values

start=time.time()
pool=mp.Pool(cpu_count)
pool.map(read_file, files[0:200])
pool.close()
pool.join()
end=time.time()
print("Time elapsed:", end-start) 

As we can see, this scales:
200 files with xarray still take only 17.2 seconds, but 200 files with h5py on 85 cpus never finish and the kernel crashes, which might be due to files not being closed properly and using too much memory space.

### Fourier transfrom

In [4]:
##########Base settings#########
#granularity of spectrogram
d_f = 1 # frequency resolution in Hz
d_t = 0.1 # time res in seconds

# section
loc_a, loc_e = 0, 9200 # cable section to be processed (in meters) - 0==start
ind_a, ind_e= loc_a//4, loc_e//4 # channel distances (4m each)
nFiles = 5 # number of h5 files processed
nCores = 8 # cpu cores


day= 22
month=7
sec=0
minut=0
hours=0

# Additional parameters:
file_length = 30 # Length of a single h5 file in seconds.
NU = 1000 #Sampling frequency in Hz of the recorded data.
freq_max = 100 # maximum frequency cut off value for the analysis
seg_length=1/d_f #calculate window length corresponding to d_f
N = file_length*NU #number of samples in one file
ind_f = int(seg_length*freq_max+1)
seg_len=int(seg_length*NU) #how many time points should be in one processing window
nseg=int(2*(file_length/seg_length)) #amount of segments for the desired window length
location_coords = np.arange(loc_a, loc_e, 4)
freq_coords=scipy.fft.rfftfreq(int(NU/d_f), 1/NU)[:ind_f]
hop = int(d_t*NU)

#fft input arguments
args = {
    "ind_f" : ind_f,
    "ind_a" : ind_a,
    "ind_e" : ind_e,
    "seg_len" : seg_len,
    "hop" : hop,
    "N" : N
}


#path and name of resulting zarr-formatted data cube.
ZARR_NAME = "cryo_cube.zarr"

1. The original approach 

In [5]:
def channel_fourier_numpy(data, args, taper, positions):
    """
    Applies Fourier Transformation to segments of DAS records to compute spectrograms.

    Args:
        data (ndarray): The raw data from DAS channels.
        args (dict): Contains parameters for Fourier Transform such as segment length and indices.
        taper (ndarray): The taper function to apply before the Fourier transform.
        positions (ndarray): The positions of the segments.

    Returns:
        ndarray: A 3D array containing the Fourier transform for each segment and channel.
    """
    seg_len = args["seg_len"]
    ind_e, ind_a = args["ind_e"], args["ind_a"]
    ind_f = args["ind_f"]



    # data transformation
    segs = ([data[pos:pos+seg_len] for pos in positions]) #dividing the data into segments each consisting of desired amount of data points
    segs = [seg.T[ind_a:ind_e] for seg in segs] #transposing the segments individually to gain time series for each channel
    nseg = positions.shape[0]
    
    # the first loop iterates over all segments (each corresponding to a time point)
    # in the second loop, the fourier transform gets applied on each channel
    Fsegs=np.zeros((nseg, ind_e-ind_a, ind_f))
    for i in range(nseg):
        for channel_number, channel in enumerate(segs[i]):

            # note that modified_log(x)=10*log(x) (conversion to

            fourier_transformed = np.fft.rfft(taper*channel, n=seg_len)
            fourier_transformed = ((10*np.log(np.abs(fourier_transformed)**2)))[0:ind_f]
            fourier_transformed[0]=0
            Fsegs[i][channel_number]=fourier_transformed

    return Fsegs


####### running for only 
file_index=0
f=h5py.File(files[file_index],  'r')
dset=f['Acoustic']
seg_len=args["seg_len"]
hop=args["hop"]
N=args["N"]
data = np.array(dset) # DAS data
taper = signal.windows.tukey(seg_len, 0.25) #taper function - reduce the amplitude of the discontinuities at the boundaries, thereby reducing spectral leakage.


# the windowing function (Tukey window in this case) tapers at the ends, 
#so to avoid losing data at the ends of each file, 
# the end of one file is overlapped with the beginning of the next file.
if file_index!=nFiles-1:

    g = h5py.File(files[file_index+1],'r')
    dset2=g['Acoustic']
    data2= np.array(dset2)
    
    start=time.time()
    data = np.concatenate((data, data2[0:seg_len]), axis=0)
    end=time.time()
    print("Time elapsed to concatenate:", end-start) 

j = file_index+1
file_pos = file_index * N

start=time.time()

# If the current file is not the last one
if file_index != nFiles-1:
    # Calculate the starting positions of each segment in the data
    # first segment: (j-1)*N/hop, rounded up
    # last segment: (j*N-1)/hop, rounded down
    positions = np.arange(np.ceil((j-1)*N/hop), np.floor((j*N-1)/hop)+1, dtype=int)*hop - file_pos # scaled by the hop size and offset by the file position
else:
    # If last one, start: (j*N-seg_len)/hop
    # to ensure that the last segment doesn't extend beyond the end of the data
    positions = np.arange(np.ceil((j-1)*N/hop), np.floor((j*N-seg_len)/hop)+1, dtype=int)*hop - file_pos
    
Fsegs = channel_fourier_numpy(data, args, taper, positions)

end=time.time()
print("Time elapsed for numpy fft:", end-start) 

Time elapsed to concatenate: 0.03639101982116699
Time elapsed for numpy fft: 18.33467960357666


2. Benchmarking SciPy NumPy pyFFTW

In [21]:
def channel_fourier(data, args, taper, positions, method='numpy'):
    """
    Applies Fourier Transformation to segments of DAS records using specified method.

    Args:
        data (ndarray): The raw data from DAS channels.
        args (dict): Contains parameters for Fourier Transform such as segment length and indices.
        taper (ndarray): The taper function to apply before the Fourier transform.
        positions (ndarray): The positions of the segments.
        method (str): Method for FFT computation ('numpy', 'scipy', 'pyfftw'). Default is 'numpy'.

    Returns:
        ndarray: A 3D array containing the Fourier transform for each segment and channel.
    """
    seg_len = args["seg_len"]
    ind_e, ind_a = args["ind_e"], args["ind_a"]
    ind_f = args["ind_f"]

    segs = ([data[pos:pos+seg_len] for pos in positions])
    segs = [seg.T[ind_a:ind_e] for seg in segs]

    nseg = positions.shape[0]
    Fsegs = np.zeros((nseg, ind_e-ind_a, ind_f))

    if method == 'numpy':
        for i in range(nseg):
            for channel_number, channel in enumerate(segs[i]):
                fourier_transformed = np.fft.rfft(taper * channel, n=seg_len)
                fourier_transformed = ((10 * np.log(np.abs(fourier_transformed) ** 2)))[0:ind_f]
                fourier_transformed[0] = 0
                Fsegs[i][channel_number] = fourier_transformed

    elif method == 'scipy':
        for i in range(nseg):
            for channel_number, channel in enumerate(segs[i]):
                fourier_transformed = fft.fft(taper * channel, n=seg_len)
                fourier_transformed = ((10 * np.log(np.abs(fourier_transformed) ** 2)))[0:ind_f]
                fourier_transformed[0] = 0
                Fsegs[i][channel_number] = fourier_transformed


    elif method == 'pyfftw':
        
        try:
            with open(f'{repo_dir}/code/notebooks/fftw_wisdom.pkl', 'rb') as f:
                wisdom = pickle.load(f)
                pyfftw.import_wisdom(wisdom)
                print("Found a wisdom file.")
        except FileNotFoundError:
            print("No wisdom file found. Starting without wisdom.")

        # Pre-allocate the input and output arrays for FFTW
        fft_input = pyfftw.empty_aligned(seg_len, dtype='complex128')
        fft_output = pyfftw.empty_aligned(seg_len, dtype='complex128')

        # Create FFTW object
        fft_object = pyfftw.FFTW(fft_input, fft_output)

        for i in range(nseg):
            for channel_number, channel in enumerate(segs[i]):
                fft_input[:] = taper * channel  # Apply taper
                fft_object()  # Execute FFT
                fourier_transformed = ((10 * np.log(np.abs(fft_output) ** 2)))[0:ind_f]
                fourier_transformed[0] = 0
                Fsegs[i][channel_number] = fourier_transformed

        with open(f'{repo_dir}/code/notebooks/fftw_wisdom.pkl', 'wb') as f:
            pickle.dump(pyfftw.export_wisdom(), f)

    else:
        raise ValueError("Invalid method specified. Choose from 'numpy', 'scipy', or 'pyfftw'.")

    return Fsegs

In [None]:
# Example usage for benchmarking
file_index = 0
seg_len = args["seg_len"]
hop = args["hop"]
N = args["N"]

f = h5py.File(files[file_index], 'r')
dset = f['Acoustic']
data = np.array(dset)

taper = signal.windows.tukey(seg_len, 0.25)

if file_index != nFiles - 1:
    g = h5py.File(files[file_index + 1], 'r')
    dset2 = g['Acoustic']
    data2 = np.array(dset2)
    data = np.concatenate((data, data2[0:seg_len]), axis=0)

j = file_index + 1
file_pos = file_index * N

if file_index != nFiles - 1:
    positions = np.arange(np.ceil((j - 1) * N / hop), np.floor((j * N - 1) / hop) + 1, dtype=int) * hop - file_pos
else:
    positions = np.arange(np.ceil((j - 1) * N / hop), np.floor((j * N - seg_len) / hop) + 1, dtype=int) * hop - file_pos

# Benchmarking each method
methods = ['numpy', 'scipy', 'pyfftw']
#methods = ['pyfftw']
#methods = ['scipy']
# methods = ['numpy']
for method in methods:
    start = time.time()
    Fsegs = channel_fourier(data, args, taper, positions, method=method)
    end = time.time()
    print(f"Time elapsed for {method} fft:", end - start)


In [22]:
# Assuming 'files' is a list of file paths and 'nFiles' is the total number of files
nFiles = 5  # Set this to the actual number of files you want to process
#methods = ['numpy', 'scipy', 'pyfftw']  # FFT methods to benchmark
methods = ['pyfftw']
#methods = ['scipy']
# methods = ['numpy']

# Loop over each method first
for method in methods:
    print(f"Benchmarking method: {method}")
    # Then loop over each file for the current method
    for file_index in range(nFiles):
        seg_len = args["seg_len"]
        hop = args["hop"]
        N = args["N"]

        f = h5py.File(files[file_index], 'r')
        dset = f['Acoustic']
        data = np.array(dset)

        taper = signal.windows.tukey(seg_len, 0.25)

        if file_index != nFiles - 1:
            g = h5py.File(files[file_index + 1], 'r')
            dset2 = g['Acoustic']
            data2 = np.array(dset2)
            data = np.concatenate((data, data2[0:seg_len]), axis=0)

        j = file_index + 1
        file_pos = file_index * N

        if file_index != nFiles - 1:
            positions = np.arange(np.ceil((j - 1) * N / hop), np.floor((j * N - 1) / hop) + 1, dtype=int) * hop - file_pos
        else:
            positions = np.arange(np.ceil((j - 1) * N / hop), np.floor((j * N - seg_len) / hop) + 1, dtype=int) * hop - file_pos

        # Benchmark the current method for the current file
        start = time.time()
        Fsegs = channel_fourier(data, args, taper, positions, method=method)
        end = time.time()
        print(f"Time elapsed for {method} fft in file {file_index}:", end - start)

Benchmarking method: pyfftw
No wisdom file found. Starting without wisdom.
Time elapsed for pyfftw fft in file 0: 29.353445529937744
Found a wisdom file.
Time elapsed for pyfftw fft in file 1: 29.174957036972046
Found a wisdom file.
Time elapsed for pyfftw fft in file 2: 29.14138913154602
Found a wisdom file.
Time elapsed for pyfftw fft in file 3: 29.096455097198486
Found a wisdom file.
Time elapsed for pyfftw fft in file 4: 28.372843265533447


### Optimizing pyFFTW

#### Using rfft from the builders module of pyFFTW
With the simple setup above, pyFFTW and scipy.fft both performed worse than the numpy implementation. However, there is room for improvement of pyFFTW, which is given the name by the original algorithm Fastest Fourier Transform in the West from the corresponding researchers at MIT.

Originally RFFT was applied via numpy, we can do the same with the builders model from pyFFTW (a c wrapper for FFTW in python).
The "FFTW_ESTIMATE" flag estimates the best method instead of measuring it - this can bring some performance. In our case however, it doesnt and for more accuracy we take the measured default. Also confguring multithreading with pyfftw.config.NUM_THREADS = 8 actually leads to about a second worse performance than the standard implementation with a single thread. Suprisingly, also the suage of a wsidom file for the FFTW object creation slows the process by 0.5 seconds, so we use the most simple default setup.
Overall we see an improvement of 3 seconds to the original numpy function per file. This of course scales when using a total of 1300 files per folder.

In [5]:
def channel_fourier(data, args, taper, positions):
    """
    Applies Fourier Transformation to segments of DAS records using specified method.

    Args:
        data (ndarray): The raw data from DAS channels.
        args (dict): Contains parameters for Fourier Transform such as segment length and indices.
        taper (ndarray): The taper function to apply before the Fourier transform.
        positions (ndarray): The positions of the segments.
        method (str): Method for FFT computation ('numpy', 'scipy', 'pyfftw'). Default is 'numpy'.

    Returns:
        ndarray: A 3D array containing the Fourier transform for each segment and channel.
    """
    start=time.time()
    seg_len = args["seg_len"]
    ind_e, ind_a = args["ind_e"], args["ind_a"]
    ind_f = args["ind_f"]

    segs = ([data[pos:pos+seg_len] for pos in positions])
    segs = [seg.T[ind_a:ind_e] for seg in segs]

    nseg = positions.shape[0]
    Fsegs = np.zeros((nseg, ind_e-ind_a, ind_f))
    step1=time.time()
    print("Time to create segments: ", step1-start)

    # Pre-allocate the input array for FFTW
    fft_input = pyfftw.empty_aligned(seg_len, dtype='float64')

    # Create FFTW object
    fft_object = pyfftw.builders.rfft(fft_input)#, planner_effort='FFTW_ESTIMATE') #, threads=mp.cpu_count()//2)

    for i in range(nseg):
        for channel_number, channel in enumerate(segs[i]):
            fft_input[:] = taper * channel  # Apply taper
            fft_output = fft_object()  # Execute FFT
            fourier_transformed = ((10 * np.log(np.abs(fft_output) ** 2)))[0:ind_f] # Compute power spectrum
            fourier_transformed[0] = 0 # Remove DC component (average value of the signal)
            Fsegs[i][channel_number] = fourier_transformed

    return Fsegs


In [6]:
# Assuming 'files' is a list of file paths and 'nFiles' is the total number of files
nFiles = 5  # Set this to the actual number of files you want to process
#methods = ['numpy', 'scipy', 'pyfftw']  # FFT methods to benchmark
methods = ['pyfftw']
#methods = ['scipy']
# methods = ['numpy']

# try:
#     with open(f'{repo_dir}/code/notebooks/fftw_wisdom.pkl', 'rb') as f:
#         wisdom = pickle.load(f)
#         pyfftw.import_wisdom(wisdom)
#         print("Found a wisdom file.")
# except FileNotFoundError:
#     print("No wisdom file found. Starting without wisdom.")

# Loop over each method first
for method in methods:
    print(f"Benchmarking method: {method}")
    # Then loop over each file for the current method
    for file_index in range(nFiles):
        seg_len = args["seg_len"]
        hop = args["hop"]
        N = args["N"]

        f = h5py.File(files[file_index], 'r')
        dset = f['Acoustic']
        data = np.array(dset)

        taper = signal.windows.tukey(seg_len, 0.25)

        if file_index != nFiles - 1:
            g = h5py.File(files[file_index + 1], 'r')
            dset2 = g['Acoustic']
            data2 = np.array(dset2)
            data = np.concatenate((data, data2[0:seg_len]), axis=0)

        j = file_index + 1
        file_pos = file_index * N

        if file_index != nFiles - 1:
            positions = np.arange(np.ceil((j - 1) * N / hop), np.floor((j * N - 1) / hop) + 1, dtype=int) * hop - file_pos
        else:
            positions = np.arange(np.ceil((j - 1) * N / hop), np.floor((j * N - seg_len) / hop) + 1, dtype=int) * hop - file_pos

        # Benchmark the current method for the current file
        start = time.time()
        Fsegs = channel_fourier(data, args, taper, positions, method=method)
        end = time.time()
        print(f"Time elapsed for {method} rfft in file {file_index}:", end - start)

Benchmarking method: pyfftw
Time to create segments:  0.00024628639221191406
Time elapsed for pyfftw rfft in file 0: 15.545733451843262
Time to create segments:  0.00023055076599121094
Time elapsed for pyfftw rfft in file 1: 15.783392429351807
Time to create segments:  0.0002448558807373047
Time elapsed for pyfftw rfft in file 2: 15.748984575271606
Time to create segments:  0.0002193450927734375
Time elapsed for pyfftw rfft in file 3: 15.702316284179688
Time to create segments:  0.0002334117889404297
Time elapsed for pyfftw rfft in file 4: 15.187469244003296


#### Using dask via pyFFTW
This is a very ineffciient implementation of dask to calculate the fft. The calculation therefore takes minutes to complete. There might however be more promising setups with Dask.

In [4]:
def channel_fourier(data, args, taper, positions):
    
    # Enable the pyfftw cache
    pyfftw.interfaces.cache.enable()
    start=time.time()
    seg_len = args["seg_len"]
    ind_e, ind_a = args["ind_e"], args["ind_a"]
    ind_f = args["ind_f"]

    segs = ([data[pos:pos+seg_len] for pos in positions])
    segs = [seg.T[ind_a:ind_e] for seg in segs]

    nseg = positions.shape[0]
    Fsegs = np.zeros((nseg, ind_e-ind_a, ind_f))
    step1=time.time()
    print("Time to create segments: ", step1-start)
    for i in range(nseg):
        print("time to calculate segment", time.time()-step1)
        for channel_number, channel in enumerate(segs[i]):
            # Convert the channel data to a Dask array
            channel_da = da.from_array(channel, chunks=seg_len)
            
            # Apply taper and compute FFT using pyFFTW
            fft_output = dafft.rfft(taper * channel_da).compute()
            
            # Compute the result
            fourier_transformed = ((10 * np.log(np.abs(fft_output) ** 2)))[0:ind_f]
            fourier_transformed[0] = 0
            Fsegs[i][channel_number] = fourier_transformed
    return Fsegs

In [10]:
# Assuming 'files' is a list of file paths and 'nFiles' is the total number of files
nFiles = 2  # Set this to the actual number of files you want to process
#methods = ['numpy', 'scipy', 'pyfftw']  # FFT methods to benchmark
methods = ['pyfftw']
#methods = ['scipy']
# methods = ['numpy']

# Loop over each method first
for method in methods:
    print(f"Benchmarking method: {method}")
    # Then loop over each file for the current method
    for file_index in range(nFiles):
        seg_len = args["seg_len"]
        hop = args["hop"]
        N = args["N"]

        f = h5py.File(files[file_index], 'r')
        dset = f['Acoustic']
        data = np.array(dset)

        taper = signal.windows.tukey(seg_len, 0.25)

        if file_index != nFiles - 1:
            g = h5py.File(files[file_index + 1], 'r')
            dset2 = g['Acoustic']
            data2 = np.array(dset2)
            data = np.concatenate((data, data2[0:seg_len]), axis=0)

        j = file_index + 1
        file_pos = file_index * N

        if file_index != nFiles - 1:
            positions = np.arange(np.ceil((j - 1) * N / hop), np.floor((j * N - 1) / hop) + 1, dtype=int) * hop - file_pos
        else:
            positions = np.arange(np.ceil((j - 1) * N / hop), np.floor((j * N - seg_len) / hop) + 1, dtype=int) * hop - file_pos

        # Benchmark the current method for the current file
        start = time.time()
        Fsegs = channel_fourier(data, args, taper, positions, method=method)
        end = time.time()
        print(f"Time elapsed for {method} fft in file {file_index}:", end - start)

Benchmarking method: pyfftw
Time to create segments:  0.0002827644348144531
time to calculate segment 9.608268737792969e-05
time to calculate segment 7.6081647872924805
time to calculate segment 15.087376594543457
time to calculate segment 22.52371120452881
time to calculate segment 30.099513053894043
time to calculate segment 37.775598764419556
time to calculate segment 45.30102229118347
time to calculate segment 52.911967515945435
time to calculate segment 60.48925495147705
time to calculate segment 68.05755162239075
time to calculate segment 75.67105388641357
time to calculate segment 83.3500165939331
time to calculate segment 91.00610542297363
time to calculate segment 98.66999650001526
time to calculate segment 106.36303210258484
time to calculate segment 114.03694224357605
time to calculate segment 121.70337414741516


KeyboardInterrupt: 

### Comparison of the improvements to the point
Let's summarize our improvements so far and compare them together with the original setup:
1. Reading with Xarray instead of  h5py
2. Using pyFFTW with the builders module for the rfft

In [5]:
##########Base settings#########
#granularity of spectrogram
freq_res = 1 # frequency resolution in Hz
time_res = 0.1 # time res in seconds

# section
cable_start, cable_end = 0, 9200 # cable section to be processed (in meters) - 0==start
start_channel_index, end_channel_index= cable_start // 4 , cable_end // 4 # channel distances (4m each)
n_files = 5 # number of h5 files processed
n_cores = 8 # cpu cores


day= 22
month=7
sec=0
minut=0
hours=0

# Additional parameters:
file_length = 30 # Length of a single h5 file in seconds.
sample_freq = 1000 #Sampling frequency in Hz of the recorded data.
freq_max = 100 # maximum frequency cut off value for the analysis
seg_length=1/freq_res #calculate window length corresponding to freq_res
n_samples = file_length*sample_freq #number of samples in one file
num_frequency_points = int(seg_length*freq_max+1)
seg_len=int(seg_length*sample_freq) #how many time points should be in one processing window
n_segments=int(2*(file_length/seg_length)) #amount of segments for the desired window length
location_coords = np.arange(cable_start, cable_end, 4)
freq_coords=scipy.fft.rfftfreq(int(sample_freq/freq_res), 1/sample_freq)[:num_frequency_points]
hop = int(time_res*sample_freq)

#fft input arguments
args = {
    "num_frequency_points" : num_frequency_points,
    "start_channel_index" : start_channel_index,
    "end_channel_index" : end_channel_index,
    "seg_len" : seg_len,
    "hop" : hop,
    "n_samples" : n_samples
}


#path and name of resulting zarr-formatted data cube.
ZARR_NAME = "cryo_cube.zarr"

def channel_fourier(data, args, taper, positions, method='numpy'):
    """
    Applies Fourier Transformation to segments of DAS records using specified method.

    Args:
        data (ndarray): The raw data from DAS channels.
        args (dict): Contains parameters for Fourier Transform such as segment length and indices.
        taper (ndarray): The taper function to apply before the Fourier transform.
        positions (ndarray): The positions of the segments.
        method (str): Method for FFT computation ('numpy', 'scipy', 'pyfftw'). Default is 'numpy'.

    Returns:
        ndarray: A 3D array containing the Fourier transform for each segment and channel.
    """
    seg_len = args["seg_len"]
    end_channel_index, start_channel_index = args["end_channel_index"], args["start_channel_index"]
    num_frequency_points = args["num_frequency_points"]

    segments = ([data[pos:pos+seg_len] for pos in positions])
    segments = [seg.T[start_channel_index:end_channel_index] for seg in segments]

    n_segments = positions.shape[0]
    Fsegs = np.zeros((n_segments, end_channel_index-start_channel_index, num_frequency_points))

    if method=="pyfftw":
        # Pre-allocate the input array for FFTW
        fft_input = pyfftw.empty_aligned(seg_len, dtype='float64')
        # Create FFTW object
        fft_object = pyfftw.builders.rfft(fft_input)#, planner_effort='FFTW_ESTIMATE') #, threads=mp.cpu_count()//2)

        for i in range(n_segments):
            for channel_number, channel in enumerate(segments[i]):
                fft_input[:] = taper * channel  # Apply taper
                fft_output = fft_object()  # Execute FFT
                fourier_transformed = ((10 * np.log(np.abs(fft_output) ** 2)))[0:num_frequency_points] # Compute power spectrum
                fourier_transformed[0] = 0 # Remove DC component (average value of the signal)
                Fsegs[i][channel_number] = fourier_transformed
    elif method=="numpy":
        for i in range(n_segments):
            for channel_number, channel in enumerate(segments[i]):

                # note that modified_log(x)=10*log(x) (conversion to

                fourier_transformed = np.fft.rfft(taper*channel, n=seg_len)
                fourier_transformed = ((10*np.log(np.abs(fourier_transformed)**2)))[0:num_frequency_points]
                fourier_transformed[0]=0
                Fsegs[i][channel_number]=fourier_transformed
    else:
        print("method must be one of 'numpy' or 'pyfftw'")
        
    return Fsegs



def create_spectro_segment(file_index, args, filelist, method='h5py', fft_method='numpy'):
    # chunk args
    seg_len=args["seg_len"]
    hop=args["hop"]
    n_samples=args["n_samples"]
    filename=filelist[file_index]
    
    #taper function
    taper = signal.windows.tukey(seg_len, 0.25)  # reduces the amplitude of the discontinuities at the boundaries, thereby reducing spectral leakage.
    
    data=[]
    if method=="xarray":
        xr_h5=xr.open_dataset(filename, engine='h5netcdf', backend_kwargs={'phony_dims': 'access'})
        data=xr_h5["Acoustic"].compute().values
        
        # the windowing function (Tukey window in this case) tapers at the ends, 
        # to avoid losing data at the ends of each file, 
        # the end of one file is overlapped with the beginning of the next file.
        if file_index!=n_files-1:
            xr_h5_2=xr.open_dataset(filelist[file_index+1], engine='h5netcdf', backend_kwargs={'phony_dims': 'access'})
            data_2=xr_h5_2["Acoustic"].compute().values
            data = np.concatenate((data, data_2[0:seg_len]), axis=0)
    elif method=="h5py":
        h5file=h5py.File(filename,  'r')
        dset=h5file['Acoustic']
        data=np.array(dset) # DAS data
        
        if file_index!=n_files-1:
            h5file_2 = h5py.File(files[file_index+1],'r')
            dset_2=h5file_2['Acoustic']
            data_2= np.array(dset_2)
            data = np.concatenate((data, data_2[0:seg_len]), axis=0)
    else:
        print("method must be one of 'numpy' or 'pyfftw'")
    

    next_file_index = file_index+1
    file_pos = file_index * n_samples

    # If the current file is not the last one
    if file_index != n_files-1:
        # Calculate the starting positions of each segment in the data
        # first segment: (next_file_index-1)*n_samples/hop, rounded up
        # last segment: (next_file_index*n_samples-1)/hop, rounded down
        positions = np.arange(np.ceil((file_index)*n_samples/hop), np.floor((next_file_index*n_samples-1)/hop)+1, dtype=int)*hop - file_pos # scaled by the hop size and offset by the file position
    else:
        # If last one, start: (next_file_index*n_samples-seg_len)/hop
        # to ensure that the last segment doesn't extend beyond the end of the data
        positions = np.arange(np.ceil((file_index)*n_samples/hop), np.floor((next_file_index*n_samples-seg_len)/hop)+1, dtype=int)*hop - file_pos

    Fsegs = channel_fourier(data, args, taper, positions, method=fft_method)
    return Fsegs, positions.shape[0]

In [4]:
start=time.time()
# call the original function with h5py and numpy
Fsegs_h5py, n_segments = create_spectro_segment(0, args, files, method='h5py', fft_method='numpy')
print("Time elapsed for h5py with numpy:", time.time()-start)
print("Number of segments:", n_segments)
print("Shape of the resulting array:", Fsegs_h5py.shape)

start=time.time()
# call the original function with xarray and pyfftw
Fsegs_xarray, n_segments = create_spectro_segment(0, args, files, method='xarray', fft_method='pyfftw')
print("Time elapsed for xarray with pyfftw:", time.time()-start)
print("Number of segments:", n_segments)
print("Shape of the resulting array:", Fsegs_xarray.shape)

# calculate the difference between the two arrays
difference = Fsegs_h5py - Fsegs_xarray
print("Total difference between the two arrays:", np.sum(difference))
print("Mean difference between the two arrays:", np.mean(difference))
display(difference[0:2])
display(Fsegs_xarray[0:2])

Time elapsed for h5py with numpy: 18.998287677764893
Number of segments: 300
Shape of the resulting array: (300, 2300, 101)
Time elapsed for xarray with pyfftw: 16.11255121231079
Number of segments: 300
Shape of the resulting array: (300, 2300, 101)
Total difference between the two arrays: 1.2833771878550948e-08
Mean difference between the two arrays: 1.8415514246736905e-16


array([[[ 0.00000000e+00,  0.00000000e+00,  0.00000000e+00, ...,
          0.00000000e+00,  0.00000000e+00,  0.00000000e+00],
        [ 0.00000000e+00, -2.84217094e-14,  0.00000000e+00, ...,
          0.00000000e+00, -5.68434189e-14,  0.00000000e+00],
        [ 0.00000000e+00,  0.00000000e+00,  0.00000000e+00, ...,
          0.00000000e+00,  0.00000000e+00, -2.84217094e-14],
        ...,
        [ 0.00000000e+00, -2.84217094e-14, -5.68434189e-14, ...,
          1.42108547e-14,  0.00000000e+00,  0.00000000e+00],
        [ 0.00000000e+00,  0.00000000e+00,  2.84217094e-14, ...,
         -2.84217094e-14,  0.00000000e+00,  0.00000000e+00],
        [ 0.00000000e+00,  0.00000000e+00,  0.00000000e+00, ...,
          0.00000000e+00,  0.00000000e+00,  2.84217094e-14]],

       [[ 0.00000000e+00, -5.68434189e-14,  0.00000000e+00, ...,
          0.00000000e+00,  0.00000000e+00,  5.68434189e-14],
        [ 0.00000000e+00,  0.00000000e+00,  0.00000000e+00, ...,
          0.00000000e+00,  0.00000000e

array([[[  0.        , 249.65643724, 240.73256376, ..., 240.30308431,
         243.85412415, 251.23577255],
        [  0.        , 227.37218713, 236.30810147, ..., 236.49816194,
         197.78252012, 247.87986118],
        [  0.        , 235.13375845, 248.53002275, ..., 259.01518913,
         250.36339076, 244.76471376],
        ...,
        [  0.        , 134.12702641, 123.63783194, ..., 111.90591666,
         128.43574406, 135.38000592],
        [  0.        , 144.41724364, 139.1639038 , ..., 150.49915646,
         122.99808398, 155.72721874],
        [  0.        , 143.32092329, 151.27369456, ..., 161.21458254,
         150.07218085, 154.09740348]],

       [[  0.        , 236.73555304, 239.86390695, ..., 239.25272624,
         250.59239433, 242.20177736],
        [  0.        , 216.32228499, 223.47404618, ..., 241.7678368 ,
         224.12507733, 243.22383103],
        [  0.        , 244.42657378, 239.07318284, ..., 260.66833352,
         238.80873288, 242.20595459],
        ...,


### Profile the code to see further bottlenecks

In [16]:
def profile_function(method, fft_method):
    # This function will be profiled
    Fsegs, n_segments = create_spectro_segment(0, args, files, method=method, fft_method=fft_method)
    print("Method:".format(method, fft_method))
    print("Number of segments:", n_segments)
    print("Shape of the resulting array:", Fsegs.shape)


def run_profiler():
    profiler = cProfile.Profile()
    
    profiler.enable()
    # Assuming profile_function is correctly defined and called
    profile_function('h5py', 'numpy')
    profile_function('xarray', 'pyfftw')
    profiler.disable()

    s = io.StringIO()
    ps = pstats.Stats(profiler, stream=s).sort_stats('cumulative')
    ps.print_stats()
    s.flush()  # Ensure buffer is flushed
    
    return s.getvalue()

stats = run_profiler()

Method:
Number of segments: 300
Shape of the resulting array: (300, 2300, 101)
Method:
Number of segments: 300
Shape of the resulting array: (300, 2300, 101)


In [23]:
# Adjusted parsing logic
output_lines = stats.split('\n')
rows = []
for line in output_lines:
    # Use a more sophisticated method to split lines into columns, considering potential spaces in the last column
    cols = line.split(maxsplit=5)  # This assumes the first 5 columns do not contain spaces that should be preserved
    if len(cols) > 5 and cols[0].replace('.', '', 1).isdigit():
        rows.append(cols)

# Update column names based on actual data structure and inspection of the rows
df_columns = ['ncalls', 'tottime', 'percall', 'cumtime', 'percall_again', 'filename:lineno(function)']
# Ensure the number of columns in df_columns matches the actual data
df = pd.DataFrame(rows, columns=df_columns)
df = df.iloc[1:]
pd.set_option('display.max_rows', 200)
display(df)

Unnamed: 0,ncalls,tottime,percall,cumtime,percall_again,filename:lineno(function)
1,2,0.004,0.002,36.667,18.333,/tmp/ipykernel_1570349/845927988.py:1(profile_...
2,2,0.099,0.049,36.661,18.331,/tmp/ipykernel_1570349/1999376382.py:100(creat...
3,2,30.883,15.441,36.165,18.083,/tmp/ipykernel_1570349/1999376382.py:46(channe...
4,690000,0.416,0.000,5.204,0.000,/home/sc.uni-leipzig.de/ju554xqou/.cache/pypoe...
5,690000,0.479,0.000,4.628,0.000,/home/sc.uni-leipzig.de/ju554xqou/.cache/pypoe...
...,...,...,...,...,...,...
526,2,0.000,0.000,0.000,0.000,{method 'setdefault' of 'dict' objects}
527,2,0.000,0.000,0.000,0.000,{method 'copy' of 'set' objects}
528,1,0.000,0.000,0.000,0.000,/home/sc.uni-leipzig.de/ju554xqou/.cache/pypoe...
529,2,0.000,0.000,0.000,0.000,/home/sc.uni-leipzig.de/ju554xqou/.cache/pypoe...
