In [None]:
# !pip install ffmpeg
# !pip install opencv-python
# !pip install opencv-contrib-python --user
# !pip install plotly tqdm
# !pip install kaleido

In [None]:
import numpy as np
import cv2
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import plotly.express as px
import scipy.stats as stats

from tqdm.notebook import tqdm
from tqdm import tqdm

In [None]:
# Set the parameters of the video you want to analyse
fps: int = 500
freq: int = 303
amplified: bool = True

In [None]:
# describe paths to the regular and amplified video respectively
if amplified == False:
    cap = cv2.VideoCapture(f'04_juni_set/bewerkt/V3/V3_{freq}hz_{fps}fps_Part1.avi')
elif amplified == True: 
    cap = cv2.VideoCapture(f'04_juni_set/amplified/phase_based//V3/V3_{freq}hz_{fps}fps_amplified.avi')

In [None]:
# Function to plot with dynamic x-axis
def plot_displacement(df, point_id, x_axis='time'):
    filtered_df = df[df['id'] == point_id]
    fig = px.line(filtered_df, x=x_axis, y=['dy'],
                  labels={'value': 'Displacement', 'variable': 'Axis'},
                  title=f'Movement of Point {int(point_id)} Over Time')
    fig.update_layout(xaxis_title=x_axis.capitalize(), yaxis_title='Displacement (pixels)', showlegend=True)
    fig.show()

In [None]:
# Set up parameters for ShiTomasi corner detection
feature_params = dict(maxCorners=1, qualityLevel=0.3, minDistance=7, blockSize=7)

# Set up parameters for Lucas-Kanade optical flow
lk_params = dict(winSize=(15, 15), maxLevel=2, criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))

# Read the first frame
ret, old_frame = cap.read()
old_gray = cv2.cvtColor(old_frame, cv2.COLOR_BGR2GRAY)

# Select ROI for tracking
bbox = cv2.selectROI(old_frame, False)
cv2.destroyAllWindows()
x, y, w, h = bbox

# if you want to manually set the ROI, comment the lines above and paste the ROI below
# x, y, w, h = 0, 0, 0, 0

roi_mask = np.zeros_like(old_gray)
roi_mask[y:y+h, x:x+w] = 255

# Detect initial points in ROI
p0 = cv2.goodFeaturesToTrack(old_gray, mask=roi_mask, **feature_params)

# Check if any points were detected
if p0 is None:
    print("Unable to determine tracking features, please adjust bounding box.")
    cap.release()  # Release the video capture object
    sys.exit()  # Exit the program

# List to store the displacement of points
displacements = []

# Process video
iteration = 0
with tqdm(total=int(cap.get(cv2.CAP_PROP_FRAME_COUNT)), desc="Tracking") as pbar:
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

        # Calculate optical flow
        p1, st, err = cv2.calcOpticalFlowPyrLK(old_gray, frame_gray, p0, None, **lk_params)

        # Ensure optical flow was successful
        if p1 is not None and len(p1) > 0:
            # Select good points
            good_new = p1[st == 1]
            good_old = p0[st == 1]

            # Store points' displacements
            for i, (new, old) in enumerate(zip(good_new, good_old)):
                a, b = new.ravel()
                c, d = old.ravel()
                displacements.append({'iteration': iteration, 'time': pbar.n / fps, 'id': i, 'x': a, 'y': b, 'dx': a - c, 'dy': b - d})

            # Update the previous frame and points
            old_gray = frame_gray.copy()
            p0 = good_new.reshape(-1, 1, 2)
        else:
            print("Unable to determine tracking features, please adjust bounding box.")
            break  # Break out of the loop if tracking fails

        pbar.update(1)
        iteration += 1

cap.release()

# Convert the list of displacements to a DataFrame
df_spoke = pd.DataFrame(displacements)

# Plot the displacement in the ROI
for point_id in df_spoke['id'].unique():
    plot_displacement(df_spoke, point_id, x_axis='iteration')  # Change x_axis to 'iteration' to plot against iterations

In [None]:
# You can copy paste the output of this cell to not lose the ROI 
print(x, y, w, h)

In [None]:
start_iteration = df_spoke["iteration"].iloc[0] # start iteration
end_iteration = df_spoke["iteration"].iloc[-1]  # end iteration
phase = 'spoke'       

# Filter the DataFrame to get only the data between the specified iterations
df_spoke = df_spoke.iloc[start_iteration:end_iteration+1].copy()

# Add a new column 'phase' with the specified phase value using .loc
df_spoke.loc[:, 'phase'] = phase

#calculate z-score of 'dy' column
df_spoke['z_score'] = stats.zscore(df_spoke["dy"])
df_spoke['z_score'] = np.abs(df_spoke['z_score'])  # Use absolute value 

#determine viable displacement for subbing outliers
replacement_upper = np.max(df_spoke.loc[df_spoke['z_score'] <= 2].dy)
replacement_lower = np.min(df_spoke.loc[df_spoke['z_score'] <= 2].dy)

# Change outlier displacements regular displacements
df_spoke.loc[(df_spoke['z_score'] > 2) & (df_spoke['dy'] >= 0), 'dy'] = replacement_upper
df_spoke.loc[(df_spoke['z_score'] > 2) & (df_spoke['dy'] < 0), 'dy'] = replacement_lower

# Define the output filename based on the 'phase' variable and the iteration range
if amplified == False:
    output_filename = f"df_{freq}hz_{fps}fps_{phase}_{start_iteration}_{end_iteration}.csv"
elif amplified == True:
    output_filename = f"df_{freq}hz_{fps}fps_{phase}_{start_iteration}_{end_iteration}_amplified.csv"

# Save the filtered data to a new CSV file, dropping the 'id' column
df_spoke.to_csv(output_filename, index=False)

# Read the newly created CSV file and check its shape
df_spoke

In [None]:
# Plot the capped displacements in the ROI
for point_id in df_spoke['id'].unique():
    plot_displacement(df_spoke, point_id, x_axis='iteration')

In [None]:
# List of file names
file_names = [output_filename]  # Add more file names as needed

# Load and concatenate the datasets
df_list = [pd.read_csv(file_name) for file_name in file_names]
df_combined = pd.concat(df_list)

# Sort the combined dataset by 'iteration'
df_combined.sort_values(by='iteration', inplace=True)

# Reset the index of the combined dataset
df_combined.reset_index(drop=True, inplace=True)

# Save the combined dataset
df_combined.to_csv('__', index=False)

# Print the shape of the combined dataset
df_combined

In [None]:
# Function to perform FFT and return the positive frequency part of the spectrum
def fft_positive_frequencies(signal, fps):
    # Zero-pad het signaal tot de gewenste lengte
    signal = np.pad(signal, (0, len(signal)*9), 'constant') # enlarge the signal x10
    fft_vals = np.fft.fft(signal)
    fft_freq = np.fft.fftfreq(len(signal), d=1/(fps))
    # Take only the positive part of the spectrum
    fft_vals_positive = fft_vals[:len(signal)//2]
    fft_freq_positive = fft_freq[:len(signal)//2]
    return fft_freq_positive, np.abs(fft_vals_positive)

In [None]:
spoke_data = df_combined[df_combined['phase']=='spoke']

# Perform FFT for 'dy' for 'spoke'
freqs_spoke, fft_spoke_dy = fft_positive_frequencies(spoke_data['dy'], fps)

# Create a figure with trace for spoke dy displacement
fig = go.Figure()

fig.add_trace(go.Scatter(
    x=freqs_spoke,
    y=fft_spoke_dy,
    mode='lines',
    name='Spoke dy Displacement',
    line=dict(color='blue')
))


# Update the layout of the plot
fig.update_layout(
    title='Frequency Spectrum for Spoke Phases',
    xaxis_title='Frequency (Hz)',
    yaxis_title='Amplitude',
    showlegend=True
)

# Display the plot
fig.show()
if amplified == False:
    fig.write_image(f"frequency_spoke_{freq}hz_{fps}fps.png")
elif amplified == True:
    fig.write_image(f"frequency_spoke_{freq}hz_{fps}fps_amplified.png")