In [1]:
import os
import re
import glob
import numpy as np
import matplotlib.pyplot as plt
import csv
from collections import defaultdict

In [2]:
# Function to calculate SD1 and SD2
def calculate_sd1_sd2(series):
    n = len(series)
    mean_rr = np.mean(series)
    
    # Calculate SD1
    diff_rr = np.diff(series) / np.sqrt(2)
    sd1 = np.sqrt(np.sum(diff_rr**2) / (n - 1))
    
    # Calculate SD2
    sd2_term = (series[:-1] + series[1:] - 2 * mean_rr) / np.sqrt(2)
    sd2 = np.sqrt(np.sum(sd2_term**2) / (n - 1))
    
    return sd1, sd2

In [32]:
data = np.loadtxt(file_path)
filtered_data = data[data[:, 1] != 0]  # Filter out zero values
time_data = filtered_data[:, 0]
hrv_data = filtered_data[:, 1]
# Calculate statistics
stats = calculate_interval_statistics(time_data, hrv_data)

NameError: name 'file_path' is not defined

In [30]:
# Function to calculate RR mean, SD1, SD2, and standard deviation for each interval
def calculate_interval_statistics(time_data, hrv_data):
    intervals = {
        'Rest': (1, 6),
        'Exercise': (14, 19),
        'Recovery': (25, 30)
    }
    
    stats = {}
    
    for interval, (start, end) in intervals.items():
        mask = (time_data >= start) & (time_data < end)
        interval_data = hrv_data[mask]
        
        if len(interval_data) > 1:  # Ensure there's enough data for SD calculation
            rr_mean = np.mean(interval_data)
            sd1, sd2 = calculate_sd1_sd2(interval_data)
            rr_std = np.std(interval_data)  # Calculate standard deviation
            stats[interval] = {
                'Mean': rr_mean,
                'Standard Deviation': rr_std,
                'SD1': sd1,
                'SD2': sd2
            }
    
    return stats

In [4]:
def plot_data(ax, time_data, hrv_data, filename, stats):
    # Define timeline markers
    Rest = 7
    Exercise = 20
    Recovery = 32

    colors = {'Rest': 'blue', 'Exercise': 'orange', 'Recovery': 'purple'}
    
    # Plot each point with color-coded intervals
    for i in range(len(time_data)):
        if time_data[i] < Rest:
            color = colors['Rest']
        elif time_data[i] < Exercise:
            color = colors['Exercise']
        elif time_data[i] < Recovery:
            color = colors['Recovery']
        else:
            continue
        ax.plot(time_data[i], hrv_data[i], '.', color=color)
    
    # Plot green vertical lines to mark intervals
    ax.axhline(y= hrv_data.min(), color='r', linestyle='-')

    ax.axvline(x=1, color='g', linestyle='-')
    ax.axvline(x=6, color='g', linestyle='-')
    ax.axvline(x=14, color='g', linestyle='--')
    ax.axvline(x=19, color='g', linestyle='--')
    ax.axvline(x=25, color='g', linestyle='-')
    ax.axvline(x=30, color='g', linestyle='-')
    
    # Modify the region texts dynamically based on calculated stats
    region_positions = {'Rest': 3.5,
                        'Exercise': 16.5,
                        'Recovery': 27.5
                       }

    # Loop over regions and stats
    for region, x_pos in region_positions.items():
        if region in stats:  # Check if stats for the region exist
            rr_mean = stats[region]['Mean']
            rr_std = stats[region]['Standard Deviation']
            text_str = f"Mean RR: {rr_mean:.2f} ms\nSTD RR: {rr_std:.2f} ms"
            # Add the text for each region at the corresponding position
            ax.text(x_pos, hrv_data.max() + 10, text_str, fontsize=10, 
                    verticalalignment='top', horizontalalignment='center',
                    bbox=dict(facecolor='white', alpha=0.8))

    # Set axis limits, title, labels, and grid
    ax.set_ylim(hrv_data.min() - 50, hrv_data.max() + 50)
    ax.set_title(f"RR {filename}")  # Include filename in title
    ax.set_xlabel("Time (s)")
    ax.set_ylabel("RR (ms)")
    ax.grid(True)

In [5]:
# Poincare plot function with color coding and filename in title
def poincare_plot(ax, hrv_data, time_data, filename):
    x = []
    y = []
    colors = {'Rest': 'blue', 'Exercise': 'orange', 'Recovery': 'purple'}
    color_x = []

    Rest = 7
    Exercise = 20
    Recovery = 32

    for i in range(len(hrv_data) - 1):
        x.append(hrv_data[i])     # RR(n)
        y.append(hrv_data[i + 1]) # RR(n+1)
        # Determine color based on the time of RR(n) and RR(n+1)
        if time_data[i] < Rest:
            color_x.append(colors['Rest'])
        elif time_data[i] < Exercise:
            color_x.append(colors['Exercise'])
        elif time_data[i] < Recovery:
            color_x.append(colors['Recovery'])
        else:
            continue

    ax.scatter(x, y, c=color_x, s=10)  # Use c=color_x directly
    ax.set_title(f"Poincaré {filename}")  # Include filename in title
    ax.set_xlabel("RR(n)")
    ax.set_ylabel("RR(n+1)")
    ax.grid(True)

# Modified main function to include limit on number of images
def process_files(data_folder, output_folder, csv_filename, n=None):
    txt_files = glob.glob(os.path.join(data_folder, "*.txt"))
    
    # List all .txt files in the folder
    filenames = os.listdir(data_folder)
    
    # Group files by prefix up to the first number
    file_groups = defaultdict(list)
    pattern = re.compile(r"([a-zA-Z]+)(\d+)")  # Regex to extract the prefix and number
    
    # Process each filename
    for filename in filenames:
        match = pattern.match(filename)
        if match:
            prefix = match.group(1)
            number = int(match.group(2))  # Extract the number
            file_groups[prefix].append((filename, number))

    # Sort each group by the number part
    sorted_filenames = []
    for prefix, files in file_groups.items():
        sorted_files = sorted(files, key=lambda x: x[1])  # Sort by the number
        sorted_filenames.extend([file[0] for file in sorted_files])  # Add sorted filenames to the list
    
    # Apply the limit on the number of files to process
    if n is not None:
        sorted_filenames = sorted_filenames[:n]  # Limit the number of files processed
    
    # Prepare CSV file for writing
    with open(csv_filename, mode='w', newline='') as csv_file:
        writer = csv.writer(csv_file)
        # Write a single header row
        writer.writerow([
            "File", 
            "Mean_RR_Rest", "STD_RR_Rest",
            "Mean_RR_Exercise","STD_RR_Exercise",
            "Mean_RR_Recovery", "STD_RR_Recovery",
            "SD1_Rest", "SD2_Rest",
            "SD1_Exercise", "SD2_Exercise",
            "SD1_Recovery", "SD2_Recovery"
        ])
        
        # List to track problematic files
        failed_files = []
        
        # Loop through each sorted file and process
        for filename in sorted_filenames:
            file_path = os.path.join(data_folder, filename)  # Get the full path of the file
            try:
                # Try loading the file, catching any errors in the process
                data = np.loadtxt(file_path)
                filtered_data = data[data[:, 1] != 0]  # Filter out zero values
                time_data = filtered_data[:, 0]
                hrv_data = filtered_data[:, 1]
                
                # Calculate statistics
                stats = calculate_interval_statistics(time_data, hrv_data)
                
                # Collect data for all regions (Rest, Exercise, Recovery)
                row = [os.path.basename(file_path)]

                # Append SD1 and SD2 for each region
                for region in ['Rest', 'Exercise', 'Recovery']:
                    if region in stats:
                        row.extend([
                            stats[region]['Mean'],
                            stats[region]['Standard Deviation'],
                            stats[region]['SD1'], 
                            stats[region]['SD2']
                        ])
                    else:
                        row.extend([None, None])  # If not enough data for the region, append Nones
                
                # Write the collected row for this file
                writer.writerow(row)
                
                # Create a subplot for side-by-side plots
                fig, axes = plt.subplots(1, 2, figsize=(30, 8))  # 1 row, 2 columns
                
                # Extract the filename without extension
                filename_without_extension
                
                # Extract the filename without extension
                filename_without_extension = os.path.splitext(filename)[0]
                
                # Plot the data on the left subplot
                plot_data(axes[0], time_data, hrv_data, filename_without_extension, stats)
                
                # Plot the Poincaré plot on the right subplot
                poincare_plot(axes[1], hrv_data, time_data, filename_without_extension)
                
                # Save the figure to the output folder
                output_file_path = os.path.join(output_folder, f"{filename_without_extension}.png")
                plt.savefig(output_file_path)
                
                # Close the figure to free up memory
                plt.close(fig)
            
            except Exception as e:
                # Handle any issues during file processing
                failed_files.append(filename)
                print(f"Error processing {filename}: {e}")
    
    # If there were problematic files, print a summary
    if failed_files:
        print(f"The following files encountered errors: {failed_files}")
    else:
        print("All files processed successfully.")

In [10]:
def process_files(data_folder, output_folder, csv_filename, n):
    txt_files = glob.glob(os.path.join(data_folder, "*.txt"))
    
    # List all .txt files in the folder
    filenames = os.listdir(data_folder)
    
    # Group files by prefix up to the first number
    file_groups = defaultdict(list)
    pattern = re.compile(r"([a-zA-Z]+)(\d+)")  # Regex to extract the prefix and number
    
    # Process each filename
    for filename in filenames:
        match = pattern.match(filename)
        if match:
            prefix = match.group(1)
            number = int(match.group(2))  # Extract the number
            file_groups[prefix].append((filename, number))

    # Sort each group by the number part
    sorted_filenames = []
    for prefix, files in file_groups.items():
        sorted_files = sorted(files, key=lambda x: x[1])  # Sort by the number
        sorted_filenames.extend([file[0] for file in sorted_files])  # Add sorted filenames to the list
    
    # Prepare CSV file for writing
    with open(csv_filename, mode='w', newline='') as csv_file:
        writer = csv.writer(csv_file)
        # Write a single header row
        writer.writerow([
            "File", 
            "Mean_RR_Rest", "STD_RR_Rest",
            "Mean_RR_Exercise","STD_RR_Exercise",
            "Mean_RR_Recovery", "STD_RR_Recovery",
            "SD1_Rest", "SD2_Rest",
            "SD1_Exercise", "SD2_Exercise",
            "SD1_Recovery", "SD2_Recovery"
        ])
        
        # List to track problematic files
        failed_files = []
        
        # Limit processing to `n` files
        file_count = 0
        
        # Loop through each sorted file and process
        for filename in sorted_filenames:
            file_path = os.path.join(data_folder, filename)  # Get the full path of the file
            filename_without_extension = os.path.splitext(os.path.basename(file_path))[0]  # Extract filename here
            try:
                # Try loading the file, catching any errors in the process
                data = np.loadtxt(file_path)
                filtered_data = data[data[:, 1] != 0]  # Filter out zero values
                time_data = filtered_data[:, 0]
                hrv_data = filtered_data[:, 1]
                
                # Calculate statistics
                stats = calculate_interval_statistics(time_data, hrv_data)
                
                # Collect data for all regions (Rest, Exercise, Recovery)
                row = [os.path.basename(file_path)]

                # Append SD1 and SD2 for each region
                for region in ['Rest', 'Exercise', 'Recovery']:
                    if region in stats:
                        row.extend([
                            stats[region]['Mean'],
                            stats[region]['Standard Deviation'],
                            stats[region]['SD1'], 
                            stats[region]['SD2']
                        ])
                    else:
                        row.extend([None, None])  # If not enough data for the region, append Nones
                
                # Write the collected row for this file
                writer.writerow(row)
                
                # Create a subplot for side-by-side plots
                fig, axes = plt.subplots(1, 2, figsize=(30, 8))  # 1 row, 2 columns
                
                # Plot data and Poincare plot side by side with filename in title
                plot_data(axes[0], time_data, hrv_data, filename_without_extension, stats)  # HRV data on the left
                poincare_plot(axes[1], hrv_data, time_data, filename_without_extension)  # Poincare plot on the right
                
                # Adjust layout and save the plot
                plt.subplots_adjust(wspace=1)
                plot_filename = os.path.join(output_folder, filename_without_extension + '_subplot.png')
                plt.savefig(plot_filename)
                plt.close()

                file_count += 1
                if file_count >= n:
                    break  # Stop after processing `n` files

            except ValueError as ve:
                print(f"ValueError for file {file_path}: {ve}")
                failed_files.append(file_path)  # Add file to failed list
            except Exception as e:
                print(f"Error processing file {file_path}: {e}")
                failed_files.append(file_path)  # Add file to failed list

    # Print the list of files that couldn't be processed
    if failed_files:
        print("\nThe following files encountered errors:")
        for failed_file in failed_files:
            print(failed_file)
    else:
        print("\nAll files processed successfully!")


In [12]:
# Set your parameters here
n = 5  # Limit for the number of images to process
data_folder = "Active"  # Folder with .txt files
output_folder = "Plots_Wachi"  # Folder to save plots
csv_filename = "Wachi.csv"  # Output CSV file

# Create output folder if it doesn't exist
if not os.path.exists(output_folder):
    os.makedirs(output_folder)

# Run the main file processing function
process_files(data_folder, output_folder, csv_filename, n)



All files processed successfully!


In [13]:
def process_file(data_folder, output_folder, csv_filename, filename):
    file_path = os.path.join(data_folder, filename)  # Get the full path of the file
    filename_without_extension = os.path.splitext(os.path.basename(file_path))[0]  # Extract filename without extension
    
    # Prepare CSV file for writing (or appending)
    with open(csv_filename, mode='w', newline='') as csv_file:
        writer = csv.writer(csv_file)
        
        # Write a single header row (optional: only if the CSV is new)
        writer.writerow([
            "File", 
            "Mean_RR_Rest", "STD_RR_Rest",
            "Mean_RR_Exercise","STD_RR_Exercise",
            "Mean_RR_Recovery", "STD_RR_Recovery",
            "SD1_Rest", "SD2_Rest",
            "SD1_Exercise", "SD2_Exercise",
            "SD1_Recovery", "SD2_Recovery"
        ])
        
        # Try loading the file and processing the data
        try:
            data = np.loadtxt(file_path)
            filtered_data = data[data[:, 1] != 0]  # Filter out zero values
            time_data = filtered_data[:, 0]
            hrv_data = filtered_data[:, 1]
            
            # Calculate statistics
            stats = calculate_interval_statistics(time_data, hrv_data)
            
            # Collect data for all regions (Rest, Exercise, Recovery)
            row = [os.path.basename(file_path)]

            # Append SD1 and SD2 for each region
            for region in ['Rest', 'Exercise', 'Recovery']:
                if region in stats:
                    row.extend([
                        stats[region]['Mean'],
                        stats[region]['Standard Deviation'],
                        stats[region]['SD1'], 
                        stats[region]['SD2']
                    ])
                else:
                    row.extend([None, None])  # If not enough data for the region, append Nones
            
            # Write the collected row for this file
            writer.writerow(row)

            print(f"File {filename} processed successfully!")
            
        except ValueError as ve:
            print(f"ValueError for file {file_path}: {ve}")
        except Exception as e:
            print(f"Error processing file {file_path}: {e}")


In [14]:
data_folder = 'All'
output_folder = 'Run'
csv_filename = 'Run.csv'
filename = 'VPS50W.txt'  # The file you want to process

process_file(data_folder, output_folder, csv_filename, filename)


File VPS50W.txt processed successfully!


In [22]:
def process_file(data_folder, output_folder, csv_filename, filename):
    # Construct full file path
    file_path = os.path.join(data_folder, filename)  # Get the full path of the file
    filename_without_extension = os.path.splitext(os.path.basename(file_path))[0]  # Extract filename without extension
    return file_path,filename_without_extension

In [23]:
process_file(data_folder, output_folder, csv_filename, filename)

('All/VPS50W.txt', 'VPS50W')

In [24]:
    # Prepare CSV file for writing (or appending)
    with open(csv_filename, mode='w', newline='') as csv_file:
        writer = csv.writer(csv_file)
        
        # Write a single header row (optional: only if the CSV is new)
        writer.writerow([
            "File", 
            "Mean_RR_Rest", "STD_RR_Rest",
            "Mean_RR_Exercise","STD_RR_Exercise",
            "Mean_RR_Recovery", "STD_RR_Recovery",
            "SD1_Rest", "SD2_Rest",
            "SD1_Exercise", "SD2_Exercise",
            "SD1_Recovery", "SD2_Recovery"
        ])


In [25]:
csv_file

<_io.TextIOWrapper name='Run.csv' mode='w' encoding='UTF-8'>

In [26]:
import os
import csv
import numpy as np

def process_file(data_folder, output_folder, csv_filename, filename):
    # Step 1: Construct full file path and extract filename without extension
    file_path = os.path.join(data_folder, filename)
    filename_without_extension = os.path.splitext(os.path.basename(file_path))[0]
    
    print(f"Processing file: {file_path}")
    
    # Step 2: Prepare CSV file for writing
    with open(csv_filename, mode='w', newline='') as csv_file:
        writer = csv.writer(csv_file)
        
        # Write the header row
        header = [
            "File", 
            "Mean_RR_Rest", "STD_RR_Rest",
            "Mean_RR_Exercise", "STD_RR_Exercise",
            "Mean_RR_Recovery", "STD_RR_Recovery",
            "SD1_Rest", "SD2_Rest",
            "SD1_Exercise", "SD2_Exercise",
            "SD1_Recovery", "SD2_Recovery"
        ]
        print(f"Writing header to CSV: {header}")
        writer.writerow(header)
        
        # Step 3: Load the data from the file
        try:
            print(f"Loading data from: {file_path}")
            data = np.loadtxt(file_path)
            print(f"Raw data shape: {data.shape}")
            
            # Step 4: Filter out rows where the second column (HRV data) is zero
            filtered_data = data[data[:, 1] != 0]
            print(f"Filtered data shape: {filtered_data.shape}")
            
            # Split the data into time and HRV values
            time_data = filtered_data[:, 0]
            hrv_data = filtered_data[:, 1]
            print(f"Time data: {time_data[:5]}... (showing first 5 entries)")
            print(f"HRV data: {hrv_data[:5]}... (showing first 5 entries)")
            
            # Step 5: Calculate statistics
            stats = calculate_interval_statistics(time_data, hrv_data)
            print(f"Calculated statistics: {stats}")
            
            # Step 6: Build the CSV row
            row = [os.path.basename(file_path)]
            for region in ['Rest', 'Exercise', 'Recovery']:
                if region in stats:
                    row.extend([
                        stats[region]['Mean'],
                        stats[region]['Standard Deviation'],
                        stats[region]['SD1'], 
                        stats[region]['SD2']
                    ])
                else:
                    row.extend([None, None, None, None])
            
            print(f"Constructed row for CSV: {row}")
            writer.writerow(row)

            print(f"File {filename} processed successfully and row written to CSV!")

        # Step 7: Error Handling
        except ValueError as ve:
            print(f"ValueError for file {file_path}: {ve}")
        except Exception as e:
            print(f"Error processing file {file_path}: {e}")

# Dummy function to simulate statistics calculation
def calculate_interval_statistics(time_data, hrv_data):
    return {
        "Rest": {"Mean": np.mean(hrv_data[:10]), "Standard Deviation": np.std(hrv_data[:10]), "SD1": 1, "SD2": 2},
        "Exercise": {"Mean": np.mean(hrv_data[10:20]), "Standard Deviation": np.std(hrv_data[10:20]), "SD1": 1, "SD2": 2},
        "Recovery": {"Mean": np.mean(hrv_data[20:]), "Standard Deviation": np.std(hrv_data[20:]), "SD1": 1, "SD2": 2}
    }

In [27]:
process_file(data_folder, output_folder, csv_filename, filename)

Processing file: All/VPS50W.txt
Writing header to CSV: ['File', 'Mean_RR_Rest', 'STD_RR_Rest', 'Mean_RR_Exercise', 'STD_RR_Exercise', 'Mean_RR_Recovery', 'STD_RR_Recovery', 'SD1_Rest', 'SD2_Rest', 'SD1_Exercise', 'SD2_Exercise', 'SD1_Recovery', 'SD2_Recovery']
Loading data from: All/VPS50W.txt
Raw data shape: (2129, 2)
Filtered data shape: (2099, 2)
Time data: [0.01616667 0.03431667 0.05105    0.06906667 0.08668333]... (showing first 5 entries)
HRV data: [ 970. 1089. 1004. 1081. 1057.]... (showing first 5 entries)
Calculated statistics: {'Rest': {'Mean': 1055.2, 'Standard Deviation': 43.924480645762905, 'SD1': 1, 'SD2': 2}, 'Exercise': {'Mean': 1023.2, 'Standard Deviation': 110.27764959410406, 'SD1': 1, 'SD2': 2}, 'Recovery': {'Mean': 891.97113997114, 'Standard Deviation': 120.08061719717381, 'SD1': 1, 'SD2': 2}}
Constructed row for CSV: ['VPS50W.txt', 1055.2, 43.924480645762905, 1, 2, 1023.2, 110.27764959410406, 1, 2, 891.97113997114, 120.08061719717381, 1, 2]
File VPS50W.txt processe

In [28]:
# Function to calculate RR mean, SD1, SD2, and standard deviation for each interval
def calculate_interval_statistics(time_data, hrv_data):
    intervals = {
        'Rest': (1, 6),
        'Exercise': (14, 19),
        'Recovery': (25, 30)
    }
    
    stats = {}
    
    for interval, (start, end) in intervals.items():
        mask = (time_data >= start) & (time_data < end)
        interval_data = hrv_data[mask]
        
        if len(interval_data) > 1:  # Ensure there's enough data for SD calculation
            rr_mean = np.mean(interval_data)
            sd1, sd2 = calculate_sd1_sd2(interval_data)
            rr_std = np.std(interval_data)  # Calculate standard deviation
            
            # Print statistics for the current interval
            print(f"Interval: {interval}")
            print(f"  Start: {start}, End: {end}")
            print(f"  Mean: {rr_mean}")
            print(f"  Standard Deviation: {rr_std}")
            print(f"  SD1: {sd1}")
            print(f"  SD2: {sd2}")
            print("")

            stats[interval] = {
                'Mean': rr_mean,
                'Standard Deviation': rr_std,
                'SD1': sd1,
                'SD2': sd2
            }
        else:
            print(f"Not enough data for {interval} (available data points: {len(interval_data)})")
            print("")
    
    return stats

# Dummy function to simulate SD1 and SD2 calculation
def calculate_sd1_sd2(data):
    # Placeholder for actual SD1 and SD2 calculations; using fixed values for demonstration
    return 1, 2  # Replace with actual calculations



In [29]:
calculate_sd1_sd2(data)

NameError: name 'data' is not defined

In [None]:
calculate_interval_statistics(time_data, hrv_data)