In [None]:
import pandas as pd
import numpy as np
from scipy import stats, special
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import FactorAnalysis, PCA

# Прочитаем таблицу класса данных в таблицу
fc_path = r'C:\Projects\geochem_python_lessons\Lessons\5 Correlation and Factor analysis\Lesson_5.gdb\soil_samples_100x20'
fields = sorted(
                [i.name for i in arcpy.ListFields(fc_path) 
                 if i.name not in ['Shape', 'Profile', 'Sample']]
               )
data = [i for i in arcpy.da.SearchCursor(fc_path, fields)]
df = pd.DataFrame(data, columns = [i.replace('_', '') for i in fields]).set_index('OBJECTID')
df

In [None]:
# вычислим коэффициент чувствительности

include_list = [] # список для подходящих переменных

total_rows = len(df) # общее количество переменных
for col in df:
    min_value = min(df[col]) # минимальное значение (предположение, что все элементы имеют замену < LOD)
    sec_min_value = min(df.loc[df[col] != min_value, col]) # второе минимальное значение
    k_sensitivity = (sum(df[col] == min_value) + sum(df[col] == sec_min_value)/2 ) / total_rows # коэффициент чувствительности
    
    # выбор
    if k_sensitivity < 0.8:
        print(f"{col}: {k_sensitivity:.2f}")
        include_list.append(col)
    else:
        print(f"{col}: {k_sensitivity:.2f} ИСКЛЮЧИТЬ")
print(f'\nОбщее количество подходящих переменных: {len(include_list)}\n\nСписок: {include_list}')

In [None]:
# проверим нормальное распределение, нормализуем

df_norm = df.loc[:, include_list].copy() # копия для нормализованных данных

print("p-значения коэффициентов ассиметрии и эксцесса:")
for col in df_norm:
    subset_data = df_norm[col].to_numpy() # выборка
    
    # вычислим коэффициенты асимметрии и эксцесса
    skewness_coef = stats.skew(subset_data)
    kurtosis_coef = stats.kurtosis(subset_data)
    
    # Определим коэффициенты стат.значимости
    p_skewness = 2 * (1 - special.erf(np.abs(skewness_coef) / np.sqrt(2)))
    p_kurtosis = 2 * (1 - special.erf(np.abs(kurtosis_coef - 3) / np.sqrt(24)))
    
    # Сверим
    if p_skewness < 0.05 or p_kurtosis < 0.05:
        print(f"{col}: {p_skewness:.2f}, {p_kurtosis:.2f} Предполагаем ЛОГнормальное распределение")
        df_norm[col] = np.log(subset_data) # логарифмирование для любителей логарифмов
        #df_norm[col], _ = stats.boxcox(subset_data) # преобразование box-cox
    else:
        print(f"{col}: {p_skewness:.2f}, {p_kurtosis:.2f} Предполагаем нормальное распределение")

In [None]:
# График матрицы коэффициентов корреляции (простой)
def custom_corr_matrix(df, method = 'pearson'):
    """
    Функция создаёт график матрицы коэффициентов корреляции
    
    method - str: pearson, kendall, spearman
    """
    
    fig = plt.figure(figsize=(10/2.54, 10/2.54), constrained_layout=True)
    corrMatrix = df.corr(method = method)
    #sns.set_context("paper", rc={"axes.labelsize":12})
    sns.heatmap(corrMatrix,
                fmt='.1f',
                #mask = corrMatrix.abs() < 0.2,
                vmin= -1, vmax=1, center= 0,
                cmap = 'bwr',#sns.diverging_palette(220, 20, n=200),
                linewidths=0.2, linecolor='grey', yticklabels=1, xticklabels=1,
                annot=True, annot_kws={"fontsize":8}, cbar=False)
    
    plt.yticks(rotation=0)
    plt.show()

custom_corr_matrix(df_norm)

In [None]:
# График матрицы коэффициентов корреляции (с кластеризацией)
def custom_corr_matrix_v2(df, method = 'pearson'):
    """
    Функция создаёт график матрицы коэффициентов корреляции c кластеризацией
    
    method - str: pearson, kendall, spearman
    """
    g = sns.clustermap(
                        df.corr(method = method),
                        figsize=(16/2.54, 14/2.54),
                        method = 'complete', 
                        cmap   = 'seismic',
                        annot  = True,
                        fmt='.1f',
                        #mask = df.corr().abs() < 0.2,
                        vmin= -1, vmax=1, center= 0,
                        linewidths=0.2, linecolor='grey', yticklabels=1, xticklabels=1,
                        annot_kws = {'size': 10})
    plt.setp(g.ax_heatmap.get_yticklabels(), rotation=0)

custom_corr_matrix_v2(df_norm)

In [None]:
# Определим количество факторов

pca = PCA() # создаём экземпляр класса PCA с параметрами метода
df_stand = pd.DataFrame( StandardScaler().fit_transform(df_norm), columns = df_norm.columns)
_ = pca.fit_transform(df_stand) # делаем PCA на данных

#  Точка перегиба по второй производной
features_list = np.arange(df_stand.shape[1])
n_factors = np.diff(pca.explained_variance_ratio_,2).argmin()+1

# Построение графика каменистой осыпи
def scree_plot(features_list, pca, n_factors):
    fig = plt.figure(figsize=(16/2.54, 10/2.54), constrained_layout=True)
    plt.plot(features_list+1, pca.explained_variance_, 'ro-', linewidth=2)
    plt.plot(n_factors, pca.explained_variance_[n_factors-1], 'o', markersize=12)
    total_explained_variance = np.sum(pca.explained_variance_ratio_[0:n_factors-1])
    plt.text(n_factors+0.5,
             pca.explained_variance_[n_factors-1]*1.10,
             f"Количество факторов = {n_factors}. σ\u00b2 {total_explained_variance:.2%}"
             )
    plt.axhline(y=1)
    plt.title('График каменистой осыпи')
    plt.xlabel('Главные компоненты')
    plt.ylabel('Объясненная дисперсия')
    plt.show()

scree_plot(features_list, pca, n_factors)

In [None]:
n_factors = 5 # если нужно в ручную изменить количество факторов
scree_plot(features_list, pca, n_factors)

In [None]:
# Собственно факторный анализ
fa = FactorAnalysis(n_components=n_factors, rotation='quartimax') # 'varimax', 'quartimax'
fa.fit(df_stand)

# Проверка общностей
communalities_calc = np.sum(np.transpose(fa.components_)**2,axis=1)
communalities = pd.DataFrame(communalities_calc, index=df_stand.columns)
features_comm = list(communalities[communalities[0] > 0.2].index)
features_low =  list(communalities[communalities[0] < 0.2].index)
print('Количество переменных с общностью >0.2: {}'.format(len(features_comm)))
print(f'Элементы описанные удовлетворительно: {features_comm}')
print(f'Элементы описанные неудовлетворительно: {features_low}')

# Таблица нагрузок
fa_loading_matrix = pd.DataFrame(
                                np.transpose(fa.components_),
                                columns = [f'FA{i}' for i in range(1, n_factors+1)],
                                index= df_stand.columns
                                )
factor_names = fa_loading_matrix.columns
fa_loading_matrix['Высшая_нагрузка'] = fa_loading_matrix.abs().idxmax(axis=1)
fa_loading_matrix = fa_loading_matrix.sort_values('Высшая_нагрузка')
# Исключим переменные с низкой общностью
fa_loading_matrix = fa_loading_matrix.drop(features_low)

# Таблица факторных нагрузок
def plot_factor_scores(fa_loading_matrix):
    fig = plt.figure(figsize=(16/2.54, 10/2.54), constrained_layout=True)
    
    map_mask = fa_loading_matrix.drop('Высшая_нагрузка', axis=1).abs() < 0.32

    plot = sns.heatmap(
                        fa_loading_matrix.drop('Высшая_нагрузка', axis=1),
                        fmt = '.2f', 
                        mask = map_mask,
                        vmin = -1, vmax = 1, center = 0, 
                        cmap = sns.diverging_palette(220, 20, n=200), 
                        linewidths = 0.2, linecolor = 'grey', yticklabels = 1,
                        annot = True, annot_kws = {"fontsize":11}, cbar = False
                        )

    plot.set_yticklabels(
                        plot.get_yticklabels(), 
                        rotation = 0, 
                        horizontalalignment = 'right',
                        fontweight = 'light',
                        fontsize = 'large'
                        )
    plt.show()

plot_factor_scores(fa_loading_matrix)

In [None]:
# Факторные значения
fa_scores = pd.DataFrame(fa.transform(df_stand), columns = factor_names)
fa_scores = fa_scores.set_index(df.index)
fa_scores

In [None]:
# Получим полные названия факторов

def get_sub(string):
    """Функция перевода текста в нижний регистр"""
    
    normal = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+-=()"
    sub_s = "ₐ₈CDₑբGₕᵢⱼₖₗₘₙₒₚQᵣₛₜᵤᵥwₓᵧZₐ♭꜀ᑯₑբ₉ₕᵢⱼₖₗₘₙₒₚ૧ᵣₛₜᵤᵥwₓᵧ₂₀₁₂₃₄₅₆₇₈₉₊₋₌₍₎"
    res = string.maketrans(''.join(normal), ''.join(sub_s))
    return string.translate(res)

def get_full_factor_name(col, loadings):
    """Функция для составления полного названия фактора из номера фактора и нагрузок"""
    
    # значимые нагрузки > 0.32
    idx = loadings.abs() > 0.32
    loadings_sub = loadings.loc[idx]
    
    # упорядочим строки по абсолютным значениям нагрузок
    desired_order = loadings_sub.abs().sort_values(ascending=False).index 
    
    # переведём нагрузки в строки
    loadings_subset = loadings_sub.reindex(desired_order).round(2).astype(str)
    
    # объединим элементы и нагрузки
    strings_list = [name+get_sub(value) for name, value in loadings_subset.items()]
    
    # составим полное название фактора
    factor_long_name = col + ' ' + ', '.join(strings_list)
    
    return factor_long_name

# проверим полные названия факторов
for col in fa_scores:
    print(get_full_factor_name(col, fa_loading_matrix[col]))

In [None]:
# Обновим класс данных

# Добавим столбцы в таблицу
for col in fa_scores:    
    
    # получим полное название фактора
    alias_name = get_full_factor_name(col, fa_loading_matrix[col])
    
    # Проверми если столбец существует
    current_fields = [i.name for i in arcpy.ListFields(fc_path)]
    if col not in current_fields:
        # Добавим новый столбец
        arcpy.management.AddField(in_table =  fc_path, 
                                  field_name = col, 
                                  field_type = 'FLOAT',
                                  field_scale = 3, 
                                  field_alias = alias_name
                                  )
        print(f'{col} - {alias_name} поле было добавлено')
    else:
        # изменим псевдоним
        arcpy.management.AlterField(in_table = fc_path, field = col, new_field_alias = alias_name)
        print(f'{col} - {alias_name} псевдоним поля был изменён')
    
# Добавим значения факторов в таблицу
fields = ['OID@'] + list(fa_scores.columns)
with arcpy.da.UpdateCursor(fc_path, fields) as cur:
    for row in cur:
        row[1:] = fa_scores.loc[row[0], :].to_list() 
        cur.updateRow(row)