In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import os
from tqdm import tqdm

# 1. Data Cleaning

Handling missing values (NaNs) and padding sequences to equal length.

In [2]:
def read_excel(file_path):
    df = pd.read_excel(file_path)
    float_values = pd.to_numeric(df.values.flatten(), errors='coerce') 
    numeric_values = np.where(np.isnan(float_values), 0, float_values)
    return numeric_values

def get_max_length(input_directory):
    max_length = 0
    for file_name in tqdm(os.listdir(input_directory)):
        if file_name.endswith('.xlsx'):
            file_path = os.path.join(input_directory, file_name)
            data = read_excel(file_path)
            if len(data) > max_length:
                max_length = len(data)
    return max_length

def pad_data(data, target_length):
    if len(data) >= target_length:
        return data[:target_length]
    else:
        padding = np.zeros(target_length - len(data))
        return np.concatenate((data, padding))

def process_and_save_excel_files(input_directory, output_directory):
    if not os.path.exists(output_directory):
        os.makedirs(output_directory)
    
    target_length = get_max_length(input_directory)
    print("target_length: ", target_length)
    
    for file_name in tqdm(os.listdir(input_directory)):
        if file_name.endswith('.xlsx'):
            file_path = os.path.join(input_directory, file_name)
            data = read_excel(file_path)
            padded_data = pad_data(data, target_length)
            
            output_file_path = os.path.join(output_directory, file_name.replace('.xlsx', '.csv'))
            pd.DataFrame(padded_data).to_csv(output_file_path, index=False, header=False)

    print("Processing and saving completed.")


In [3]:
input_directory = r"..\0. data\0. PCM_Raw_data"
output_directory = r"..\0. data\1. PCM_Equal_length"

process_and_save_excel_files(input_directory, output_directory)

100%|████████████████████████████████████████████████████████████████████████████████| 300/300 [04:59<00:00,  1.00it/s]


target_length:  72193


100%|████████████████████████████████████████████████████████████████████████████████| 300/300 [04:58<00:00,  1.00it/s]

Processing and saving completed.





# 2. Convert to .wav files
Convert to standard .wav (Waveform Audio File Format) files, a widely used uncompressed audio format that preserves high sound quality for analysis.

    PDM (Pulse Density Modulation): a system for representing a sampled signal as a stream of single bits. 
    PCM (Pulse Code Modulation): a system for representing a sampled signal as a series of multi-bit words.




In [4]:
import wave
import csv
import struct
import os
from tqdm import tqdm  # Progress bar for iteration

# Parameter settings
input_folder = r'..\0. data\1. PCM_Equal_length'  # Folder containing sample CSV files
output_folder = r'..\0. data\2. Data_wav'         # Folder for output WAV files

sample_rate = 16000              # Sampling rate
channels = 1                     # Mono channel
sample_width = 2                 # 16-bit = 2 bytes
max_amplitude = 32767            # Maximum amplitude for 16-bit PCM
gain = 1                         # (Gain factor to amplify volume if needed)

# Ensure output folder exists
if not os.path.exists(output_folder):
    os.makedirs(output_folder)

# Iterate through all CSV files in the input folder
for filename in tqdm(os.listdir(input_folder)):
    if filename.endswith('.csv'):
        input_csv = os.path.join(input_folder, filename)
        output_wav = os.path.join(output_folder, os.path.splitext(filename)[0] + '.wav')

        # Read sample data from CSV file
        samples = []
        with open(input_csv, 'r') as f:
            reader = csv.reader(f)
            for row in reader:
                if row:  # Ensure the row is not empty
                    samples.append(float(row[0]))

        # Apply gain, round to integer, and clip to 16-bit PCM range
        int_samples = []
        for sample in samples:
            amplified_sample = sample * gain
            int_value = round(amplified_sample)
            if int_value > max_amplitude:
                int_value = max_amplitude
            elif int_value < -max_amplitude:
                int_value = -max_amplitude
            int_samples.append(int_value)

        # Create WAV file
        with wave.open(output_wav, 'w') as wf:
            wf.setnchannels(channels)
            wf.setsampwidth(sample_width)
            wf.setframerate(sample_rate)
            frames = struct.pack('<' + 'h' * len(int_samples), *int_samples)
            wf.writeframes(frames)

        # print(f"Successfully generated {output_wav} (with amplified volume)")


100%|████████████████████████████████████████████████████████████████████████████████| 300/300 [00:24<00:00, 12.37it/s]
