MuJoCo environment simulation for body-awareness training

In this notebook, I explore the use of a 3D physics engine to train a robot on proprioception with a recurrent neural network. Proprioception, or the sense of an agent's own body position and movement, is essential for sophisticated, autonomous capabilities. I explore a framework that utilizes feedback from proprioceptive sensors (accelerometers, gyroscopes, and flex sensors) to predict the agent's 3D configuration via deep learning.

Proprioception is the ability to sense and perceive the position, movement, and orientation of the body and its parts in space. It is an essential aspect of body awareness that enables us to navigate and interact with our environment effectively. In robotics, proprioception is also a critical aspect that enables robots to move and interact with their environment autonomously.

Training an RGAN deep learning model on preprocessed sensor data to generate synthetic sensor data can help simulate proprioception in robots. By training the model on real-world sensor data, the generated synthetic sensor data can mimic the same patterns and characteristics observed in the real data. This can enable the robot to perceive its body parts' position, movement, and orientation without relying solely on external sensors or pre-programmed knowledge of its body parts.

Emergent body-awareness can be achieved when the robot is trained on the generated synthetic sensor data, as it learns to recognize the patterns and characteristics in the data and incorporate them into its movement and interaction with the environment. As a result, the robot can gain a sense of its body's position, movement, and orientation in space, similar to how humans have a sense of body awareness.

Body awareness is a critical aspect of consciousness as it enables an organism to perceive and understand its body's relationship with the environment. In humans, body awareness is associated with the sense of self, the ability to distinguish between self and others, and a sense of agency, the feeling of being in control of one's body and actions. By simulating body awareness in robots, we can help bridge the gap between artificial and human intelligence and create more autonomous and intuitive robotic systems.

Training and testing platform:

<img src="img/Robot-back.png" alt="Robot Backside Photo" style="width: 40%;"/>
<img src="img/Robot-Screenshot-3-Unity.png" alt="Robot Screenshot from Unity Simulation" style="width: 40%; float: right;"/>

 The following script collect, preprocess and save data from two MPU6050 sensors, situated in the base of each hand, and six flex sensors, situated in each joint, connected to a Raspberry Pi on the research platform. The data is saved to a CSV file for further analysis. The rolling window preprocessing is applied to both the MPU6050 and flex sensor data to reduce noise and improve data quality.

In [None]:
# v0.0.1-a
import time
import numpy as np
import board
import busio
import adafruit_mpu6050
import RPi.GPIO as GPIO
from adafruit_ads1x15.ads1015 import ADS1015
from adafruit_ads1x15.analog_in import AnalogIn


class SensorData:
    def __init__(self, mpu_sample_rate=100, mpu_num_samples=1000, flex_window_size=10):
        # initialize i2c bus and sensors
        i2c = busio.I2C(board.SCL, board.SDA)
        self.mpu1 = adafruit_mpu6050.MPU6050(i2c, address=0x68)
        self.mpu2 = adafruit_mpu6050.MPU6050(i2c, address=0x69)

        # initialize GPIO and flex sensors
        GPIO.setmode(GPIO.BCM)
        self.ads1 = ADS1015(i2c)
        self.ads2 = ADS1015(i2c, address=0x49)
        self.chan0 = AnalogIn(self.ads1, ADS1015.P0)
        self.chan1 = AnalogIn(self.ads1, ADS1015.P1)
        self.chan2 = AnalogIn(self.ads1, ADS1015.P2)
        self.chan3 = AnalogIn(self.ads1, ADS1015.P3)
        self.chan4 = AnalogIn(self.ads2, ADS1015.P0)
        self.chan5 = AnalogIn(self.ads2, ADS1015.P1)

        # define parameters
        self.mpu_sample_rate = mpu_sample_rate  # Hz
        self.mpu_num_samples = mpu_num_samples  # number of samples to collect for MPU
        self.flex_window_size = flex_window_size  # window size for rolling window preprocessing on flex sensors

        # initialize data array
        num_sensors = 14
        max_samples = self.mpu_num_samples
        self.data = np.empty((max_samples, num_sensors+1))

        # add header to data array
        self.data[0, :-1] = ['mpu1_ax', 'mpu1_ay', 'mpu1_az', 'mpu1_gx', 'mpu1_gy', 'mpu1_gz',
                             'mpu2_ax', 'mpu2_ay', 'mpu2_az', 'mpu2_gx', 'mpu2_gy', 'mpu2_gz',
                             'flex1', 'flex2', 'flex3', 'flex4', 'flex5', 'flex6']
        self.data[0, -1] = 'timestamp'

    def collect_data(self):
        # collect and preprocess data
        i = 1
        flex_samples = []
        mpu1_samples = []
        mpu2_samples = []
        while i <= self.mpu_num_samples:
            # collect data from MPU6050 sensors
            mpu1_data = self.mpu1.acceleration + self.mpu1.gyro
            mpu2_data = self.mpu2.acceleration + self.mpu2.gyro

            # collect data from flex sensors
            flex_data = [self.chan0.voltage, self.chan1.voltage, self.chan2.voltage,
                         self.chan3.voltage, self.chan4.voltage, self.chan5.voltage]

            # preprocess data
            mpu1_samples.append(mpu1_data)
            mpu2_samples.append(mpu2_data)
            flex_samples.append(flex_data)

# apply rolling window to MPU data
if len(mpu1_samples) >= mpu_sample_rate:
    window_mean = np.mean(mpu1_samples[-mpu_sample_rate:], axis=0)
    window_std = np.std(mpu1_samples[-mpu_sample_rate:], axis=0)
    mpu1_samples[-mpu_sample_rate:] = (mpu1_samples[-mpu_sample_rate:] - window_mean) / window_std
    
    window_mean = np.mean(mpu2_samples[-mpu_sample_rate:], axis=0)
    window_std = np.std(mpu2_samples[-mpu_sample_rate:], axis=0)
    mpu2_samples[-mpu_sample_rate:] = (mpu2_samples[-mpu_sample_rate:] - window_mean) / window_std

# apply rolling window to flex data
if len(flex_samples) >= flex_window_size:
    window_mean = np.mean(flex_samples[-flex_window_size:], axis=0)
    window_std = np.std(flex_samples[-flex_window_size:], axis=0)
    flex_samples[-flex_window_size:] = (flex_samples[-flex_window_size:] - window_mean) / window_std

    # add data to data array and increment counter
    timestamp = time.monotonic()
    data[i, :12] = mpu1_samples[-1] + mpu2_samples[-1]
    data[i, 12:18] = flex_samples[-1]
    data[i, -1] = timestamp
    i += 1

# stop program if user presses Ctrl-C
except KeyboardInterrupt:
    print('\n\nKeyboard interrupt detected. Stopping program.\n')
    break

# save data to file
np.savetxt('data.csv', data, delimiter=',', fmt='%s')
print('\nData saved to file data.csv.\n')

The script is a Python code used for collecting, preprocessing and saving data from two MPU6050 sensors and six flex sensors on the research platform. The MPU6050 sensors are used to measure acceleration and gyroscopic data, while the flex sensors are used to measure bending and pressure in each joint.

The first part of the script initializes the I2C bus and sensors, and defines the necessary parameters for data collection and preprocessing. Two MPU6050 sensors are connected to the I2C bus, with addresses 0x68 and 0x69, respectively. Six flex sensors are connected to the analog-to-digital converter (ADC), using two ADS1x15 chips. The GPIO pins on the Raspberry Pi are configured to read the digital signals from the flex sensors.

The data collection and preprocessing part of the script starts with initializing an empty NumPy array to store the collected data. The data array has a size of mpu_num_samples x (num_sensors + 1), where mpu_num_samples is the number of samples to collect for MPU sensors, num_sensors is the total number of sensors, and 1 is added for the timestamp. The header of the data array is defined to include the names of each sensor.

The script then enters a loop to collect data from the MPU6050 sensors and flex sensors. For each sample, the acceleration and gyroscopic data from each MPU6050 sensor and the digital signals from each flex sensor are collected and stored in their respective lists. The rolling window preprocessing is then applied to the MPU6050 data, where the mean and standard deviation are calculated for the last mpu_sample_rate number of samples, and used to normalize the current sample. Similarly, a rolling window of size flex_window_size is applied to the flex sensor data to reduce noise and variability.

After collecting and preprocessing the data, the timestamp is added to the last column of the data array, and the data is saved to a CSV file using the np.savetxt function. The delimiter argument is set to a comma, and the fmt argument is set to '%s' to save the data as strings. Finally, the loop is repeated until the desired number of samples is collected.

Example of collected data:

In [None]:
mpu1_ax,mpu1_ay,mpu1_az,mpu1_gx,mpu1_gy,mpu1_gz,mpu2_ax,mpu2_ay,mpu2_az,mpu2_gx,mpu2_gy,mpu2_gz,flex1,flex2,flex3,flex4,flex5,flex6,timestamp
-0.034050085,0.015164975,0.090684295,-0.0004788732394366197,-0.0001348314606741573,-0.000208955223880597,-0.0277175,-0.0116785,0.1210115,0.003322815,-0.003735075,0.001119902,1,0,1,0,0,1,1648422037.7668145
0.0027563,0.0187872,0.1015783,-0.00043410852713178295,-0.00012052112676056337,-0.0001780701754385965,-0.0208315,-0.018998,0.129907,0.005028482,0.001199077,-0.00103611,1,0,1,0,0,1,1648422037.771128
0.0123457,0.0222971,0.1007684,-0.0004388719101123595,-0.0001232394366197183,-0.00019294776119402988,-0.017754,-0.019604,0.123924,0.001165215,0.000771492537313433,-0.000882686567164179,1,0,1,0,0,1,1648422037.7754028
0.0240914,0.0125794,0.0905976,-0.0004387957746478873,-0.0001232394366197183,-0.00016134328358208955,-0.017661,0.008106,0.108489,-0.004226269,-0.001270347,-0.0007460298507462687,1,0,1,0,0,1,1648422037.7796712
0.0393459,0.00895775,0.080964,-0.0004059140845070423,-0.00010126760563380283,-0.0001832761194029851,-0.018992,0.0232435,0.112529,-0.002417522,0.001981232,-0.00023856716417910447,1,0,1,0,0,1,1648422037.783938
0.0453197,0.00392372,0.080778,-0.0004646197183098592,-0.0001167605633802817,-0.00018717164179104472,-0.0246605,0.035023,0.1107075,-0.0009518028,0.001007448,-0.0005358731343283583,1,0,1,0,0,1,1648422037.7882073

The following script produces a line chart to show the sensor activity over time. It can help to identify trends and patterns in the data. A scatter plot is used to visualize the relationship between different sensors.

In [None]:
# v0.0.1-a
import matplotlib.pyplot as plt
import numpy as np

# Load the data
data = np.loadtxt('sensor_data.csv', delimiter=',')

# Get the timestamp values
timestamps = data[1:, -1]

# Get the MPU data
mpu_data = data[1:, :6]

# Get the flex sensor data
flex_data = data[1:, 6:-1]

# Create a line chart of the MPU data
plt.plot(timestamps, mpu_data)
plt.xlabel('Timestamp')
plt.ylabel('Acceleration / Gyroscope (m/s^2 / deg/s)')
plt.legend(['mpu1_ax', 'mpu1_ay', 'mpu1_az', 'mpu1_gx', 'mpu1_gy', 'mpu1_gz'], loc='upper left')
plt.show()

# Create a scatter plot of the flex sensor data
plt.scatter(timestamps, flex_data)
plt.xlabel('Timestamp')
plt.ylabel('Flex Sensor Value')
plt.legend(['flex1', 'flex2', 'flex3', 'flex4', 'flex5', 'flex6'], loc='upper left')
plt.show()

This code creates a line chart of the MPU data over time, with the x-axis representing the timestamp and the y-axis representing the acceleration and gyroscope values. It also creates a scatter plot of the flex sensor data over time, with the x-axis representing the timestamp and the y-axis representing the flex sensor value. Both plots show how the sensor values change over time, and will be used to analyze patterns or trends in the data.

The follwing script defines and trains a recurrent generative adversarial network (RGAN) on preprocessed sensor data. The script starts by importing necessary libraries and loading the preprocessed sensor data from a CSV file. It then defines the generator and critic models using Keras, a high-level neural network API. The generator is a recurrent neural network (RNN) that takes in a random noise vector and produces a sequence of sensor data samples. The critic is another RNN that takes in a sequence of sensor data samples and outputs a scalar value representing its confidence in the samples being real or fake.

The script then compiles the generator and critic models with appropriate loss functions and optimizers. After this, it defines the function to train the RGAN. This function alternates between training the critic and generator models to improve their respective performances. The RGAN is trained using batches of preprocessed sensor data, where each batch contains a sequence of multiple sensor data samples. For each batch, the critic model is trained to maximize the difference between its output on the real and generated sequences of samples, while the generator model is trained to minimize the critic's output on the generated sequence of samples. This training loop continues for a specified number of epochs, and the generator model is saved to a file.

In [None]:
# v0.0.1-a
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense, Concatenate, Dropout
from tensorflow.keras.layers import BatchNormalization, LeakyReLU
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import plot_model
from tensorflow.keras import backend as K
import matplotlib.pyplot as plt

# Define the RGAN model
class RGAN():
    def __init__(self, data_shape, latent_dim, generator_output_activation='tanh', discriminator_output_activation='sigmoid'):
        self.data_shape = data_shape
        self.latent_dim = latent_dim
        self.generator_output_activation = generator_output_activation
        self.discriminator_output_activation = discriminator_output_activation
        self.optimizer = Adam(lr=0.0002, beta_1=0.5)

        self.discriminator = self.build_discriminator()
        self.generator = self.build_generator()

        self.discriminator.compile(loss='binary_crossentropy', optimizer=self.optimizer, metrics=['accuracy'])

        z = Input(shape=(self.latent_dim,))
        data = Input(shape=self.data_shape)
        generated_data = self.generator(z)

        self.discriminator.trainable = False

        validity = self.discriminator([generated_data, data])

        self.rgan = Model(inputs=[z, data], outputs=validity)
        self.rgan.compile(loss='binary_crossentropy', optimizer=self.optimizer)

    def build_generator(self):

        model = tf.keras.Sequential()
        model.add(Dense(128, input_dim=self.latent_dim))
        model.add(LeakyReLU(alpha=0.2))
        model.add(BatchNormalization(momentum=0.8))
        model.add(Dense(256))
        model.add(LeakyReLU(alpha=0.2))
        model.add(BatchNormalization(momentum=0.8))
        model.add(Dense(512))
        model.add(LeakyReLU(alpha=0.2))
        model.add(BatchNormalization(momentum=0.8))
        model.add(Dense(np.prod(self.data_shape), activation=self.generator_output_activation))
        model.add(Reshape(self.data_shape))

        model.summary()
        plot_model(model, to_file='generator.png', show_shapes=True, show_layer_names=True)
        return model

    def build_discriminator(self):

        data = Input(shape=self.data_shape)
        generated_data = Input(shape=self.data_shape)
        input_data = Concatenate(axis=0)([generated_data, data])

        model = Dense(512)(input_data)
        model = LeakyReLU(alpha=0.2)(model)
        model = Dropout(0.5)(model)
        model = Dense(256)(model)
        model = LeakyReLU(alpha=0.2)(model)
        model = Dropout(0.5)(model)
        model = Dense(128)(model)
        model = LeakyReLU(alpha=0.2)(model)
        model = Dropout(0.5)(model)
        validity = Dense(1, activation=self.discriminator_output_activation)(model)

        model = Model(inputs=[generated_data, data], outputs=validity)

        model.summary()
        plot_model(model, to_file='discriminator.png', show_shapes=True, show_layer_names=True)
        return model
    
    def train(X_train, batch_size, epochs, n_critic, clip_value, latent_dim, n_samples):

    # define the generator
    generator = define_generator(latent_dim)

    # define the critic
    critic = define_critic()

    # define the composite model
    critic.trainable = False
    composite = define_composite(generator, critic)

    # load real data
    X_real = X_train.reshape(X_train.shape[0], X_train.shape[1], 1)

    # calculate the number of batches per training epoch
    bat_per_epo = int(X_real.shape[0] / batch_size)
    half_batch = int(batch_size / 2)

    # manually enumerate epochs
    for i in range(epochs):

        # enumerate batches over the training set
        for j in range(bat_per_epo):

            # get randomly selected 'real' samples
            X_real_batch = X_real[j * batch_size:(j + 1) * batch_size]
            
            # generate 'fake' examples
            X_fake_batch = generate_fake_samples(generator, latent_dim, half_batch)

            # update critic
            for _ in range(n_critic):
                c_loss = critic.train_on_batch(X_real_batch, -np.ones((half_batch, 1)))
                c_loss += critic.train_on_batch(X_fake_batch, np.ones((half_batch, 1)))
                for l in critic.layers:
                    weights = l.get_weights()
                    weights = [np.clip(w, -clip_value, clip_value) for w in weights]
                    l.set_weights(weights)

            # prepare points in latent space as input for the generator
            X_gan = generate_latent_points(latent_dim, batch_size)

            # create inverted labels for the fake samples
            y_gan = -np.ones((batch_size, 1))

            # update the generator via the critic's error
            g_loss = composite.train_on_batch(X_gan, y_gan)

        # evaluate the model performance every 'epoch'
        if (i+1) % 50 == 0:
            # generate 'n_samples' fake samples
            X_fake = generate_fake_samples(generator, latent_dim, n_samples)
            # visualize the first 'n_samples'
            plt.plot(X_fake[0])
            plt.title(f'Generated Samples (Epoch {(i+1)})')
            plt.show()

    # save the generator model
    generator.save('generator_model.h5')

Training an RGAN on preprocessed sensor data to generate synthetic sensor data can have several purposes, one of which is to simulate body-awareness for a robot. Body-awareness is an important aspect of robotics as it enables a robot to sense and understand its own physical state and movements in relation to its environment. This is essential for robots to perform tasks that require physical interaction with the environment, such as object manipulation, navigation, and human-robot interaction.

By training an RGAN on preprocessed sensor data, we can generate synthetic sensor data that closely resembles the real sensor data collected from the robot's sensors. This synthetic data can be used to train the robot's control policies or perception algorithms, allowing it to better understand its own physical state and movements. Additionally, the synthetic data can be used to augment the real sensor data, increasing the amount of training data available for the robot and improving its performance.

Overall, training an RGAN on preprocessed sensor data to generate synthetic sensor data can help improve the body-awareness of a robot and enable it to better interact with its environment.

Citation:

Todorov, Emanuel and Erez, Tom and Tassa, Yuval. (2012). MuJoCo: A physics engine for model-based control. IEEE/RSJ International Conference on Intelligent Robots and Systems. PP. 5026--5033. 10.1109/LRA.2020.2976320.

Truby, Ryan & Della Santina, Cosimo & Rus, Daniela. (2020). Distributed Proprioception of 3D Configuration in Soft, Sensorized Robots via Deep Learning. IEEE Robotics and Automation Letters. PP. 1-1. 10.1109/LRA.2020.2976320. 