In [2]:
import pandas as pd
import numpy as np
import os
import matplotlib.pyplot as plt
import seaborn as sns
from pprint import pprint
from scipy.optimize import curve_fit
from sklearn.metrics import r2_score
from sklearn.preprocessing import StandardScaler, Normalizer, MinMaxScaler, PowerTransformer, MaxAbsScaler
from sklearn.feature_selection import VarianceThreshold
from sklearn import svm
from sklearn.pipeline import Pipeline
from sklearn.model_selection import GridSearchCV
pprint.pretty = True

Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
(to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
but was not found to be installed on your system.
If this would cause problems for you,
please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
        
  import pandas as pd


In [3]:
# Plots 9 sensor df with title 'Material' 
# Can only intake a single trial number per df in dfs
def plotdfs(dfs):
    for df in dfs:
        df_name = df['Material'].iloc[0]
        trial_num = df['Trial'].iloc[0]
        # Drop the 'Time' column
        trial_df = df.drop(columns=['Trial', 'Material'])
        
        # Create a new plot for each trial
        plt.figure()
        
        # Plot the data for the current trial
        for col in trial_df.columns:
            if col != 'Time':
                plt.plot(trial_df['Time'], trial_df[col], label=f'S{col}')
        
        # Add labels and title
        plt.xlabel('Time')
        plt.ylabel('Data')
        plt.title(f'{df_name} Trial {trial_num}')
        # plt.ylim(0, 10)
        plt.xlim(0, 50)
        plt.xticks(np.arange(0, 50, step=2))
        # plt.yscale('log')
        # Add legend
        plt.legend()
        
        # Show the plot for the current trial
        plt.show()

In [4]:

folders = ['PLA-PDMS-EcoFlex_data\\EcoFlex', 'PLA-PDMS-EcoFlex_data\\PDMS', 'PLA-PDMS-EcoFlex_data\\PLA']
data_dfs = []

for folder in folders:
    # List to store DataFrames from individual CSV files
    dfs = []

    # Iterate over all files in the folder
    for file_name in os.listdir(folder):
        if file_name.endswith('.csv'):
            # Read CSV file into a DataFrame
            df = pd.read_csv(os.path.join(folder, file_name))
            
            # Extract trial number from file name
            trial_number = file_name.split('_')[0]  # Assuming file names are in the format 'experiment_trialNumber.csv'
            
            # Add trial number as a new column
            df['Trial'] = int(trial_number)
            df['Material'] = folder.split('\\')[1]
            df.rename(columns={"255": 'Time'}, inplace=True)
            df.drop(columns=['Unnamed: 0'], inplace=True)
            
            # Append DataFrame to the list
            dfs.append(df)

    # Concatenate all DataFrames into one large DataFrame
    data_dfs.append(dfs)

In [5]:
time_cutoffs = [[29.5, 32, 30, 29.5, 30, 31, 31.5, 31.8, 31, 31.9, 31, 31],
                [30.8, 30.5, 32.2, 31, 30, 32, 30, 30, 29.5, 30, 31.5, 30.8],
                [37, 31.8, 35.5, 36.5, 30.5, 30.2, 29, 33, 32.5, 31, 30, 29.6]]
for i in range(len(data_dfs)):
    for j in range(len(data_dfs[i])):
        data_dfs[i][j] = data_dfs[i][j][data_dfs[i][j]['Time'] < time_cutoffs[i][j]]

In [6]:
ma_data_dfs = []
for dfs in data_dfs:
    temp = []
    for df in dfs:
        
        
        columns_to_average = df.columns.difference(['Time', 'Trial', 'Material'])
        window_size = 10  # MA window size
        new_df = pd.DataFrame()
        # Keep Time and Trial num
        new_df['Time'] = df['Time']
        new_df['Trial'] = df['Trial']
        new_df['Material'] = df['Material']

        for col in columns_to_average:
            new_df[f'{col} MA'] = df[col].rolling(window=window_size).mean() * 10e10

        new_df.dropna(inplace=True)
        temp.append(new_df)
    ma_data_dfs.append(temp)



In [None]:
# Trial 1 visualization for each material
dfs_to_plot = [ma_data_dfs[0][0], ma_data_dfs[1][0], ma_data_dfs[2][0]]

plotdfs(dfs_to_plot)


In [None]:
# Quadratic and linear fitting
temp = dfs_to_plot[0]
temp = temp[temp['Time'] <= 27]
dfs_to_plot[0] = temp
temp = dfs_to_plot[1]
temp = temp[temp['Time'] <= 30]
dfs_to_plot[1] = temp
temp = dfs_to_plot[2]
temp = temp[temp['Time'] <= 36.5]
dfs_to_plot[2] = temp

# fill data_pairs with 3 materials data, one sensor
data_pairs = [(dfs_to_plot[0]['Time'].values, dfs_to_plot[0]['8 MA'].values),
              (dfs_to_plot[1]['Time'].values, dfs_to_plot[1]['8 MA'].values),
              (dfs_to_plot[2]['Time'].values, dfs_to_plot[2]['8 MA'].values)]

# Define curve models
def linear_func(x, a, b):
    return a * x + b

def quadratic_func(x, a, b, c):
    return a * x ** 2 + b * x + c

# Fit data to each curve model
curve_fits = {}
for idx, (x_data, y_data) in enumerate(data_pairs, start=1):
    linear_params, _ = curve_fit(linear_func, x_data, y_data)
    linear_y_pred = linear_func(x_data, *linear_params)
    linear_r_squared = r2_score(y_data, linear_y_pred)
    
    quadratic_params, _ = curve_fit(quadratic_func, x_data, y_data)
    quadratic_y_pred = quadratic_func(x_data, *quadratic_params)
    quadratic_r_squared = r2_score(y_data, quadratic_y_pred)
    
    curve_fits[idx] = {
        'linear': (linear_params, linear_r_squared),
        'quadratic': (quadratic_params, quadratic_r_squared)
    }

# Generate curves using fitted parameters and plot
plt.figure(figsize=(12, 8))
for idx, (x_data, y_data) in enumerate(data_pairs, start=1):
    plt.subplot(2, 2, idx)
    plt.scatter(x_data, y_data, label='Data')
    x_curve = np.linspace(min(x_data), max(x_data), 1000)
    
    linear_params, linear_r_squared = curve_fits[idx]['linear']
    y_curve_linear = linear_func(x_curve, *linear_params)
    plt.plot(x_curve, y_curve_linear, label=f'Linear Fit (R-squared: {linear_r_squared:.2f})')
    print(f"Data Pair {idx} Linear Fit Coefficients: {linear_params}")
    
    quadratic_params, quadratic_r_squared = curve_fits[idx]['quadratic']
    y_curve_quadratic = quadratic_func(x_curve, *quadratic_params)
    plt.plot(x_curve, y_curve_quadratic, label=f'Quadratic Fit (R-squared: {quadratic_r_squared:.2f})')
    print(f"Data Pair {idx} Quadratic Fit Coefficients: {quadratic_params}")
    
    plt.xlabel('X')
    plt.ylabel('Y')
    plt.title(f'Data Pair {idx}')
    plt.legend()

plt.tight_layout()
plt.show()


In [None]:
# Exponential fit for PLA

x_data = dfs_to_plot[2]['Time'].values
y_data = dfs_to_plot[2]['8 MA'].values

# Define exponential function
def exponential_func(x, a, b):
    return a * np.exp(b * x)

# Fit data to exponential function
params, covariance = curve_fit(exponential_func, x_data, y_data)

# Generate fitted curve
x_curve = np.linspace(min(x_data), max(x_data), 1000)
y_curve = exponential_func(x_curve, *params)

# Calculate R-squared value
y_pred = exponential_func(x_data, *params)
r_squared = r2_score(y_data, y_pred)

# Plot data and fitted curve
plt.scatter(x_data, y_data, label='Data')
plt.plot(x_curve, y_curve, color='red', label='Fitted curve')
plt.xlabel('X')
plt.ylabel('Y')
plt.title('Exponential Curve Fitting')
plt.legend()
plt.show()

print(f'R-squared value: {r_squared}')

In [7]:
# Regress all data points and generate coefficients for each sensor in each trial

coefficents = []
for mat in ma_data_dfs:
    # plotdfs(mat)
    material_coefs = []
    for df in mat:
        x_data = df['Time'].values
        temp = []
        for i in range(9):
            y_data = df[f'{i} MA'].values
            try:
                coef = np.polyfit(x_data, y_data, 2)
                a, b, c = coef
                temp.append(tuple((a, b, c)))
            except:
                print(df['Material'].iloc[0], df['Trial'].iloc[0])
                # plotdfs([df])
        material_coefs.append(temp)
    coefficents.append(material_coefs)
            
coefficents = np.array(coefficents)


In [8]:
# Create augmented data and reshape data to prepare for training

coef_x_data = []
coef_y_data = []

for material_num in range(len(coefficents)):
    coef_augmented_data = []

    for coef_num in range(3):
        coefs = []

        for trial_num in range(len(coefficents[material_num])):
            for sensor in range(len(coefficents[material_num][trial_num])):
                coefs.append(coefficents[material_num][trial_num][sensor][coef_num])

        mu = np.average(coefs)
        stdev = np.std(coefs)

        values = np.random.default_rng().standard_t(107, 300)
        values = (values * (stdev/np.sqrt(108))) + mu
        coef_augmented_data.append(values)

    coefs_paired = [(coef_augmented_data[0][i], coef_augmented_data[1][i], coef_augmented_data[2][i]) for i in range(len(coef_augmented_data[0]))]
    
    for e in coefs_paired:
        coef_x_data.append(e)
        coef_y_data.append(material_num)


In [9]:
# Training sklearn classifier

# parameters = {
#     'scaler': [StandardScaler(), MinMaxScaler(), Normalizer(), MaxAbsScaler()],
#     'selector__threshold': [0, 0.001, 0.01],
#     'classifier__C': [1, 10, 100, 1000]
# }
# pipe = Pipeline([
#     ('scaler', StandardScaler()),
#     ('selector', VarianceThreshold()),
#     ('classifier', svm.SVC(kernel='linear', C=100))
# ])
clf = svm.SVC(kernel='linear', C=100)
clf.fit(coef_x_data, coef_y_data)
# pipe.fit(coef_x_data, coef_y_data)

# grid = GridSearchCV(pipe, parameters, cv=2).fit(coef_x_data, coef_y_data)

count = 0
total = 0
for mat in range(len(coefficents)):
    for trial in coefficents[mat]:
        for sensor in trial:
            total += 1
            val = clf.predict(sensor.reshape(1, -1))
            if (mat != val):
                print(f'Actual: {mat}, Predicted: {val}')
                count += 1

# print(grid.score(coef_x_data, coef_y_data))
print(count, total, (1-(count/total)))


Actual: 1, Predicted: [0]
Actual: 1, Predicted: [0]
Actual: 1, Predicted: [0]
Actual: 1, Predicted: [0]
Actual: 1, Predicted: [0]
Actual: 1, Predicted: [0]
Actual: 1, Predicted: [0]
Actual: 1, Predicted: [0]
Actual: 2, Predicted: [1]
Actual: 2, Predicted: [1]
Actual: 2, Predicted: [1]
Actual: 2, Predicted: [1]
12 324 0.962962962962963


In [10]:
# Saving classifier
import joblib

joblib.dump(clf, 'saved_models/SVC-96.3.pkl')

# to load:
# clf = joblib.load('saved_models\SVC-96.3.pkl')

['saved_models/SVC-96.3.pkl']

In [None]:
"""
Notes for 2/19

- Classifier is done, .963 accuracy
- To predict on new data:
    - Scale new data time axis to the same scale as input data - need to figure this out
    - Should we retrain with fast pressure application to be able to downsample slow grasps?
        - would have to see if faster presses still have as good performance on classifier

- Object detection:
    - paper had ~ 110,000 data "sets". Each set is a collection of 4*10 data from different types of sensors
    - acquired data from 13 object * 20 grasps per object * ~400 sets of data from each grasp
    - NO TIME FACTOR USED
    - only using grasp data past a certain point
    - used data from different grasp orientations and materials for each object

    - Can we mount sensors on a hand?
    - if this works, we don't need the camera for ground truth - groud truth is just the class of object in the trial

"""