## GPUMD训练集筛选及主动学习策略

```
!pip install umap-learn
```

In [1]:
import os
import math
import numpy as np
import matplotlib.pyplot as plt
from scipy.spatial.distance import cdist
from sklearn.decomposition import PCA
import umap.umap_ as umap

from calorine.calculators import CPUNEP
from calorine.nep import get_descriptors
from wizard.io import read_xyz, dump_xyz

plt.rcParams['font.size'] = 14
plt.rcParams["axes.unicode_minus"] = False
plt.rcParams['xtick.direction'] = 'in'
plt.rcParams['ytick.direction'] = 'in'

class ForceAnalyzer:
    def __init__(self, calculators, frame_paths, labels):
        self.calculators = calculators
        self.frame_paths = frame_paths
        self.labels = labels

    def compute_max_deltas(self, frames, output_file=None, save_output=False):
        if save_output and output_file and os.path.exists(output_file):
            os.remove(output_file)

        max_deltas = []

        for atoms in frames:
            forces = []
            for calculator in self.calculators:
                atoms.calc = calculator
                forces.append(atoms.get_forces())
            forces = np.array(forces)
            mean_forces = np.mean(forces, axis=0)
            deviations = forces - mean_forces
            squared_norm_deviations = np.linalg.norm(deviations, axis=2) ** 2
            max_delta = np.max(np.sqrt(squared_norm_deviations))
            max_deltas.append(max_delta)

            if save_output and output_file and 0.2 < max_delta < 0.3:
                dump_xyz(output_file, atoms)

        return max_deltas

    @staticmethod
    def compute_frequency_counts(max_deltas, bin_centers):
        bin_edges = np.histogram_bin_edges(max_deltas, bins=len(bin_centers), range=(0, max(bin_centers)))
        frequency_counts, _ = np.histogram(max_deltas, bins=bin_edges)
        relative_frequency = frequency_counts / len(max_deltas) * 100

        actual_bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2
        return actual_bin_centers, relative_frequency

    def plot_max_force_differences(self, output_image_file, selected_output_file=None, save_output=False):
        plt.figure(figsize=(8, 6), dpi=300)

        all_max_deltas = []
        for path, label in zip(self.frame_paths, self.labels):
            frames = read_xyz(path)
            max_deltas = self.compute_max_deltas(frames, selected_output_file, save_output)
            all_max_deltas.append((max_deltas, label))

        bin_centers = np.linspace(0, 0.5, 50)
        for max_deltas, label in all_max_deltas:
            centers, rel_freq = self.compute_frequency_counts(max_deltas, bin_centers)
            plt.plot(centers, rel_freq, label=label, marker='o')

        plt.xlabel(r'$\Sigma_f^{max} (eV/Å)$')
        plt.ylabel('Relative Frequency (%)')
        plt.xlim(0, 0.5)
        plt.ylim(0, None)
        plt.axvline(x=0.1, color='k', linestyle='--')
        plt.axvline(x=0.2, color='k', linestyle='--')
        plt.legend()
        plt.savefig(output_image_file, bbox_inches='tight')
        plt.close()

class FarthestPointSample:
    def __init__(self, min_distance=0.1, metric='euclidean', metric_para={}):
        self.min_distance = min_distance
        self.metric = metric
        self.metric_para = {}

    def select(self, new_data, now_data=[], min_distance=None, min_select=1, max_select=None):
        min_distance = min_distance or self.min_distance
        max_select = max_select or len(new_data)
        to_add = []
        if len(new_data) == 0:
            return to_add
        if len(now_data) == 0:
            to_add.append(0)
            now_data.append(new_data[0])
        distances = np.min(cdist(new_data, now_data, metric=self.metric, **self.metric_para), axis=1)

        while np.max(distances) > min_distance or len(to_add) < min_select:
            i = np.argmax(distances)
            to_add.append(i)
            if len(to_add) >= max_select:
                break
            distances = np.minimum(distances, cdist([new_data[i]], new_data, metric=self.metric)[0])
        return to_add
    
class DescriptorAnalyzer:
    def __init__(self, model_filename, method='pca'):
        self.descriptors = [] 
        self.structure_indices_per_atom = []
        self.frames = [] 
        self.nframes = 0
        self.natoms = []
        self.labels = []
        self.method = method 
        self.model_filename = model_filename
        self.pca_result = None
        self.umap_result = None
    
    def add_xyz_file(self, file_path, label):
        frames = read_xyz(file_path)
        nframes = 0
        natoms = 0
        descriptors = []
        structure_indices_per_atom = []
        for i, atoms in enumerate(frames):
            descriptors.append(get_descriptors(atoms, model_filename=self.model_filename))
            structure_indices_per_atom.extend([i + self.nframes] * len(atoms))
            nframes += 1
            natoms += len(atoms)
        self.descriptors.extend(descriptors)
        self.frames.extend(frames)
        self.nframes += nframes
        self.natoms.append(natoms)
        self.labels.append(label)
        self.structure_indices_per_atom.extend(structure_indices_per_atom)
        
    def perform_dimensionality_reduction(self):
        all_descriptors = np.concatenate(self.descriptors, axis=0) 
        if self.method == 'pca':
            pca = PCA(n_components=2)
            self.pca_result = pca.fit_transform(all_descriptors)
            return self.pca_result
        elif self.method == 'umap':
            umap_model = umap.UMAP(n_components=2)
            self.umap_result = umap_model.fit_transform(all_descriptors) 
            return self.umap_result
        
    def _plot(self, projection, output_file, selected_projection=None):
        plt.figure(figsize=(8, 6), dpi=300)

        start = 0
        for label, num in zip(self.labels, self.natoms):
            end = start + num
            plt.scatter(projection[start:end, 0], projection[start:end, 1], alpha=0.5, s=2, label=label)
            start = end
        
        if selected_projection is not None:
            plt.scatter(selected_projection[:, 0], selected_projection[:, 1], c='orange', alpha=0.5, s=2, label='selected')
        
        plt.xlabel('Dimension 0')
        plt.ylabel('Dimension 1')
        plt.legend(frameon=False)
        
        plt.tight_layout()
        plt.savefig(output_file, dpi=300, bbox_inches='tight')
        
    def plot_projection(self, output_file='descriptor_space.png'):
        projection = self.perform_dimensionality_reduction()
        self._plot(projection, output_file)
    
    def plot_latent_space(self, projection, selected_projection, output_file='latent_space.png'):
        self._plot(projection, output_file, selected_projection)
        
    def perform_latent_analysis(self, min_select=0.8, level='structure', output_file='latent_space.png'):
        if level == 'structure':
            descriptors = np.array([np.mean(d, axis=0) for d in self.descriptors])
            structure = np.arange(len(descriptors))  
        elif level == 'atomic':
            descriptors = np.concatenate(self.descriptors) 
            structure = np.array(self.structure_indices_per_atom) 

        sampler = FarthestPointSample()
        num_select = math.floor(len(descriptors) * min_select)
        selected_indices = sampler.select(descriptors, [], min_select=num_select)
        
        if level == 'structure':
            selected_set = set(selected_indices)
        elif level == 'atomic': 
            selected_set = {self.structure_indices_per_atom[i] for i in selected_indices}
        
        unselected_set = set(range(len(self.frames))) - selected_set

        indices_list = []
        for value in selected_set:
            indices = [index for index, elem in enumerate(self.structure_indices_per_atom) if elem == value]
            indices_list.extend(indices)
        
        projection = self.perform_dimensionality_reduction()
        selected_projection = projection[indices_list]
        self.plot_latent_space(projection, selected_projection, output_file)
        
        selected_atoms = [self.frames[i] for i in selected_set]
        unselected_atoms = [self.frames[i] for i in unselected_set]
        for atoms, filename in [(selected_atoms, './selected.xyz'), (unselected_atoms, './unselected.xyz')]:
            for atom in atoms:
                dump_xyz(filename, atom)

### 基于不确定性的主动学习

类似DPGEN，通过对比不同超参NEP势的预测结果，获得原子受力最大偏差分布，挑选出候选构型。

In [2]:
calculators = [CPUNEP('./potentials/nep01.txt'),
               CPUNEP('./potentials/nep02.txt'),
               CPUNEP('./potentials/nep03.txt'),
               CPUNEP('./potentials/nep04.txt')]
frame_paths = ['./dataset/aimd.xyz',
               './dataset/iter0.xyz',
               './dataset/iter1.xyz',
               './dataset/iter2.xyz'
              ]
labels = ['aimd', 'iter0', 'iter1', 'iter2']
analyzer = ForceAnalyzer(calculators, frame_paths, labels)
analyzer.plot_max_force_differences(output_image_file='max_force_diff.png', selected_output_file='selected_atoms.xyz', save_output=False)

### 最远点采样

- **PCA**（Principal Component Analysis），即主成分分析，可以有效的找出数据中最“主要”的元素和结构，去除噪音和冗余，将原有的数据降维，揭示隐藏在复杂数据背后的简单结构。

- **UMAP**（Uniform Manifold Approximation and Projection），即统一流形逼近与投影，是一种新的降维流形学习技术。UMAP是建立在黎曼几何和代数拓扑理论框架上的。UMAP是一种非常有效的可视化和可伸缩降维算法。

In [3]:
analyzer = DescriptorAnalyzer(model_filename='./potentials/nep04.txt', method='pca')
analyzer.add_xyz_file('./dataset/open_source_GeTe.xyz', 'GeTe')
analyzer.add_xyz_file('./dataset/Ge.xyz', 'Ge')
analyzer.add_xyz_file('./dataset/Te.xyz', 'Te')

analyzer.plot_projection(output_file='descriptor_space_pca.png')
analyzer.perform_latent_analysis(min_select=0.8, level='structure', output_file='latent_space_pca.png') # level='atomic'