## NEP_Active_learning

In [1]:
import math
import matplotlib.pyplot as plt
import numpy as np
import os
import scienceplots
import umap.umap_ as umap
from ase.io import read
from calorine.calculators import CPUNEP
from calorine.nep import get_descriptors
from scipy.spatial.distance import cdist
from sklearn.decomposition import PCA
from wizard.io import dump_xyz

plt.style.use(['science', 'ieee', 'no-latex', 'bright'])
colors = ["#1C3C63", "#F0686C"]

class ForceAnalyzer:
    def __init__(self, calculators, frame_paths, labels):
        self.calculators = calculators
        self.frame_paths = frame_paths
        self.labels = labels

    def compute_max_deltas(self, frames, minimum, maximum, save_output):
        selected_file = 'selected_atoms.xyz'
        unselected_file = 'unselected_atoms.xyz'
        
        max_deltas = []

        for atoms in frames:
            forces = []
            for calculator in self.calculators:
                atoms.calc = calculator
                forces.append(atoms.get_forces())
            forces = np.array(forces)
            mean_forces = np.mean(forces, axis=0)
            deviations = forces - mean_forces
            squared_norm_deviations = np.linalg.norm(deviations, axis=2) ** 2
            max_delta = np.max(np.sqrt(squared_norm_deviations))
            max_deltas.append(max_delta)

            if save_output:
                target_file = selected_file if minimum < max_delta < maximum else unselected_file
                dump_xyz(target_file, atoms)

        return max_deltas

    @staticmethod
    def compute_frequency_counts(max_deltas, bin_centers):
        bin_edges = np.histogram_bin_edges(max_deltas, bins=len(bin_centers), range=(0, max(bin_centers)))
        frequency_counts, _ = np.histogram(max_deltas, bins=bin_edges)
        relative_frequency = frequency_counts / len(max_deltas) * 100

        actual_bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2
        return actual_bin_centers, relative_frequency

    def plot_max_force_differences(self, output_image_file, minimum=0.2, maximum=0.4, save_output=False):
        plt.figure()

        all_max_deltas = []
        for path, label in zip(self.frame_paths, self.labels):
            frames = read(path, index=':')
            max_deltas = self.compute_max_deltas(frames, minimum, maximum, save_output)
            all_max_deltas.append((max_deltas, label))

        bin_centers = np.linspace(0, 0.6, 60)
        for max_deltas, label in all_max_deltas:
            centers, rel_freq = self.compute_frequency_counts(max_deltas, bin_centers)
            plt.plot(centers, rel_freq, label=label, marker='o', markersize=3)

        plt.xlabel(r'$\sigma_f^{max} (eV/Å)$')
        plt.ylabel('Relative Frequency (%)')
        plt.xlim(0, 0.6)
        plt.ylim(0, None)
        plt.axvline(x=minimum, color='k', linestyle='--')
        plt.axvline(x=maximum, color='k', linestyle='--')
        plt.legend(loc="best", ncol=3, fontsize=plt.rcParams['font.size'] - 2)
        plt.tight_layout()
        plt.savefig(output_image_file, bbox_inches='tight')

class FarthestPointSample:
    def __init__(self, min_distance=0.1, metric='euclidean', metric_para={}):
        self.min_distance = min_distance
        self.metric = metric
        self.metric_para = {}

    def select(self, new_data, now_data=[], min_distance=None, min_select=1, max_select=None):
        min_distance = min_distance or self.min_distance
        max_select = max_select or len(new_data)
        to_add = []
        if len(new_data) == 0:
            return to_add
        if len(now_data) == 0:
            to_add.append(0)
            now_data.append(new_data[0])
        distances = np.min(cdist(new_data, now_data, metric=self.metric, **self.metric_para), axis=1)

        while np.max(distances) > min_distance or len(to_add) < min_select:
            i = np.argmax(distances)
            to_add.append(i)
            if len(to_add) >= max_select:
                break
            distances = np.minimum(distances, cdist([new_data[i]], new_data, metric=self.metric)[0])
        return to_add
    
class DescriptorAnalyzer:
    def __init__(self, model_filename, method='pca'):
        self.descriptors = [] 
        self.structure_indices_per_atom = []
        self.frames = [] 
        self.nframes = 0
        self.natoms = []
        self.labels = []
        self.method = method 
        self.model_filename = model_filename
        self.pca_result = None
        self.umap_result = None
    
    def add_xyz_file(self, file_path, label):
        frames = read(file_path, index=':')
        nframes = 0
        natoms = 0
        descriptors = []
        structure_indices_per_atom = []
        for i, atoms in enumerate(frames):
            descriptors.append(get_descriptors(atoms, model_filename=self.model_filename))
            structure_indices_per_atom.extend([i + self.nframes] * len(atoms))
            nframes += 1
            natoms += len(atoms)
        self.descriptors.extend(descriptors)
        self.frames.extend(frames)
        self.nframes += nframes
        self.natoms.append(natoms)
        self.labels.append(label)
        self.structure_indices_per_atom.extend(structure_indices_per_atom)
        
    def perform_dimensionality_reduction(self):
        all_descriptors = np.concatenate(self.descriptors, axis=0) 
        if self.method == 'pca':
            pca = PCA(n_components=2)
            self.pca_result = pca.fit_transform(all_descriptors)
            return self.pca_result
        elif self.method == 'umap':
            umap_model = umap.UMAP(n_components=2)
            self.umap_result = umap_model.fit_transform(all_descriptors) 
            return self.umap_result
        
    def _plot(self, projection, output_file, selected_projection=None):
        plt.figure()

        start = 0
        for label, num in zip(self.labels, self.natoms):
            end = start + num
            plt.scatter(projection[start:end, 0], projection[start:end, 1], alpha=0.7, s=1, label=label)
            start = end
        
        if selected_projection is not None:
            plt.scatter(selected_projection[:, 0], selected_projection[:, 1], c='orange', alpha=0.7, s=1, label='selected')
        
        plt.xlabel('Dimension 0')
        plt.ylabel('Dimension 1')
        plt.legend(loc="upper right", frameon=False, fontsize=plt.rcParams['font.size'] - 2)
        
        plt.tight_layout()
        plt.savefig(output_file, bbox_inches='tight')
        
    def plot_projection(self, output_file='descriptor_space.png'):
        projection = self.perform_dimensionality_reduction()
        self._plot(projection, output_file)
    
    def plot_latent_space(self, projection, selected_projection, output_file='latent_space.png'):
        self._plot(projection, output_file, selected_projection)

    def perform_latent_analysis(self, min_select=0.8, level='structure', output_file='latent_space.png'):
        if level == 'structure':
            descriptors = np.array([np.mean(d, axis=0) for d in self.descriptors])
            structure = np.arange(len(descriptors))  
        elif level == 'atomic':
            descriptors = np.concatenate(self.descriptors) 
            structure = np.array(self.structure_indices_per_atom) 

        sampler = FarthestPointSample()
        num_select = math.floor(len(descriptors) * min_select)
        selected_indices = sampler.select(descriptors, [], min_select=num_select)
        
        if level == 'structure':
            selected_set = set(selected_indices)
        elif level == 'atomic': 
            selected_set = {self.structure_indices_per_atom[i] for i in selected_indices}
        
        unselected_set = set(range(len(self.frames))) - selected_set

        indices_list = []
        for value in selected_set:
            indices = [index for index, elem in enumerate(self.structure_indices_per_atom) if elem == value]
            indices_list.extend(indices)
        
        projection = self.perform_dimensionality_reduction()
        selected_projection = projection[indices_list]
        self.plot_latent_space(projection, selected_projection, output_file)
        
        return selected_set, unselected_set
    
def read_xyz(file_path):
    frames = []
    with open(file_path, 'r') as file:
        while True:
            num_atoms_line = file.readline()
            if not num_atoms_line:
                break
            num_atoms = int(num_atoms_line.strip())
            comment_line = file.readline()
            if not comment_line:
                break
            atoms = []
            for _ in range(num_atoms):
                line = file.readline()
                if not line:
                    break
                atoms.append(line.strip())
            frames.append((num_atoms, comment_line.strip(), atoms))
    return frames

def filter_frames(frames, selected_set):
    return [frames[i] for i in selected_set]

def write_xyz(input_file, output_file, selected_set):
    frames = read_xyz(input_file)
    filtered_frames = filter_frames(frames, selected_set)
    with open(output_file, 'w') as file:
        for num_atoms, comment, atoms in filtered_frames:
            file.write(f"{num_atoms}\n")
            file.write(f"{comment}\n")
            for atom in atoms:
                file.write(f"{atom}\n")

### 基于不确定性的主动学习

类似DPGEN，通过对比不同超参NEP势的预测结果，获得原子受力最大偏差分布，挑选出候选构型。

In [2]:
calculators = [CPUNEP('./1.txt'),
               CPUNEP('./2.txt'),
               CPUNEP('./3.txt'),
               CPUNEP('./4.txt'),]
frame_paths = ['./XYZ_file/300.xyz', './XYZ_file/500.xyz', './XYZ_file/700.xyz', './XYZ_file/900.xyz']
labels = ['300K', '500K', '700K', '900K']
analyzer = ForceAnalyzer(calculators, frame_paths, labels)
analyzer.plot_max_force_differences(output_image_file='max_force_diff.png', minimum=0.2, maximum=0.4, save_output=False)

### 最远点采样

- **PCA**（Principal Component Analysis），即主成分分析，可以有效的找出数据中最“主要”的元素和结构，去除噪音和冗余，将原有的数据降维，揭示隐藏在复杂数据背后的简单结构。

- **UMAP**（Uniform Manifold Approximation and Projection），即统一流形逼近与投影，是一种新的降维流形学习技术。UMAP是建立在黎曼几何和代数拓扑理论框架上的。UMAP是一种非常有效的可视化和可伸缩降维算法。

In [3]:
analyzer = DescriptorAnalyzer(model_filename='../nep.txt', method='pca')
analyzer.add_xyz_file('./mlmd.xyz', 'mlmd')
analyzer.add_xyz_file('./XYZ_file/300.xyz', '300K')
analyzer.add_xyz_file('./XYZ_file/500.xyz', '500K')
analyzer.add_xyz_file('./XYZ_file/700.xyz', '700K')
analyzer.add_xyz_file('./XYZ_file/900.xyz', '900K')

analyzer.plot_projection(output_file='descriptor_space.png')
selected_set, unselected_set = analyzer.perform_latent_analysis(min_select=0.1, level='structure', output_file='pca.png')
write_xyz('./train.xyz', './selected.xyz', selected_set)
write_xyz('./train.xyz', './unselected.xyz', unselected_set)