In [3]:
#@title Initialization
import os
import csv
import torch
from matplotlib.pyplot import *
from torch import nn
from math import pi
from time import time
# from google.colab import drive
# drive.mount('/content/drive')
# os.chdir('/content/drive/MyDrive/Аппроксимация средствами PyTorch')
device = 'cpu'


class SinApproximator(nn.Module):
    def __init__(self, ndims, N, L, activation):
        super(SinApproximator, self).__init__()
        self.layers = nn.Sequential(
            nn.Linear(ndims, N), activation,
            *[nn.Linear(N, N), activation]*(L-1),
            nn.Linear(N, 1),
        )


    def forward(self, x):
        return self.layers(x)


MSELoss = nn.MSELoss()


def train(model, optimizer, x, y, batch_size=1):
    timestamp = time()
    for i in range(0, len(x), batch_size):
        pred = model(x[i:i+batch_size])
        loss = MSELoss(pred, y[i:i+batch_size])

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Display progress
        if time() - timestamp >= 1:
            print(f'[{i+batch_size}/{len(x)}]', end='\r')
            timestamp = time()
            
            
def test(model, x, y):
    with torch.no_grad():
        pred = model(x)
        loss = MSELoss(pred, y)
        RMSE = torch.sqrt(loss)
    return loss, RMSE

In [None]:
# Parameters
directory = 'ndims=1, approximation'
ndims = 2
N = 512 # number of nodes per hidden layer
L = 2 # number of hidden layers
activation = nn.ReLU() # activation function
bounds = [-pi, +pi]
num_data = 10000
use_sobol = False # whether to generate training points using Sobol sequence (True) or uniformly (False)
batch_size = 1024
lr = 1e-2
max_time = 30
load_model = False # whether to load the model (True) or create a new one (False)

name = f'{ndims},{N},{L},{activation},{num_data},{use_sobol},{batch_size},{lr}'

# Data
torch.manual_seed(2022)
if use_sobol:
    sobolengine = torch.quasirandom.SobolEngine(ndims)
    x_train = (sobolengine.draw(num_data)*(bounds[1] - bounds[0]) + bounds[0]).to(device)
else:
    x_train = (torch.rand(num_data, ndims)*(bounds[1] - bounds[0]) + bounds[0]).to(device)
x_test = (torch.rand(num_data, ndims)*(bounds[1] - bounds[0]) + bounds[0]).to(device)
y_train = torch.prod(torch.sin(x_train), 1, True)
y_test = torch.prod(torch.sin(x_test), 1, True)

# Initialization
model = SinApproximator(ndims, N, L, activation).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
if load_model:
    model.load_state_dict(torch.load(os.path.join(directory, name, 'model.pth')))
    optimizer.load_state_dict(torch.load(os.path.join(directory, name, 'optimizer.pth')))
    with open(os.path.join(directory, name, 'loss(time).csv')) as datafile:
        data = list(csv.reader(datafile, quoting=csv.QUOTE_NONNUMERIC))
    del data[0]
    epoch_data, time_data, loss_data, RMSE_data = [list(x) for x in zip(*data)]
    epoch_data = [int(x) for x in epoch_data]
    epoch = epoch_data[-1]
    elapsed_time = time_data[-1]
else:
    epoch = 0
    elapsed_time = 0
    epoch_data, time_data, loss_data, RMSE_data = [], [], [], []

# Training
start_time = time() - elapsed_time
output_timestamp = save_timestamp = time()
print(f'\n{name}\nepoch | time | {"loss":9} | RMSE')
while elapsed_time < max_time:
    epoch += 1
    train(model, optimizer, x_train, y_train, batch_size)
    loss, RMSE = test(model, x_test, y_test)
    elapsed_time = time() - start_time
    time_data.append(elapsed_time)
    loss_data.append(loss)
    RMSE_data.append(RMSE)
    if time() - output_timestamp >= 1:
        print(f'{epoch:5} | {elapsed_time:4.0f} | {loss:.3e} | {RMSE:.4f}')
        output_timestamp = time()
    if time() - save_timestamp >= 600 or elapsed_time >= max_time:
        save_timestamp = time()

        # Saving
        if not os.path.isdir(os.path.join(directory, name)):
            os.makedirs(os.path.join(directory, name))
        with open(os.path.join(directory, name, 'loss(time).csv'), 'w+') as output:
            output.write('"epoch","time","loss","RMSE"\n')
            output.writelines([f'{i+1},{time_data[i]:.2f},{loss_data[i]:.3e},{RMSE_data[i]:.4f}\n' for i in range(len(RMSE_data))])
        with open('results.txt', 'a') as output:
            output.write(f'{name}\n{len(RMSE_data)},{time_data[-1]:.2f},{loss_data[-1]:.3e},{RMSE_data[-1]:.4f}\n')
        torch.save(model.state_dict(), os.path.join(directory, name, 'model.pth'))
        torch.save(optimizer.state_dict(), os.path.join(directory, name, 'optimizer.pth'))
        
        # Plot RMSE(time)
        figure()
        semilogy(time_data, RMSE_data)
        xlabel('time [sec]')
        ylabel('RMSE')
        savefig(os.path.join(directory, 'RMSE(time).svg'), bbox_inches='tight')
        close()

        start_time += time() - save_timestamp # subtract time spent on saving from measured time
        save_timestamp = time()

# Plot sin(x)
x_test, y_test, pred = x_test.cpu().detach(), y_test.cpu().detach(), model(x_test).cpu().detach()
if ndims in (1, 2):
    if ndims == 1:
        figure(figsize=(10,3))
        plot(x_test, y_test, '.', markersize=1)
        plot(x_test, pred, '.', markersize=1)
        xlabel('x')
        ylabel('sin(x)')
    if ndims == 2:
        fig = figure(figsize=(20,10))
        ax = fig.add_subplot(1, 2, 1, projection='3d')
        ax.scatter(x_test[:,0], x_test[:,1], y_test, c=y_test, cmap='coolwarm', antialiased=False)
        title(r'$sin(x_1)\cdotsin(x_2)$, target')
        ax = fig.add_subplot(1, 2, 2, projection='3d')
        ax.scatter(x_test[:,0], x_test[:,1], pred, c=pred, cmap='coolwarm', antialiased=False)
        title(r'$sin(x_1)*sin(x_2)$, approximation')
    savefig(os.path.join(directory, 'sin(x).svg'), bbox_inches='tight')
    close()