<a href="https://colab.research.google.com/github/mtsizh/galaxy-morphology-manifold-learning/blob/main/find_best_reduction_parameters.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

If you performed dataset curation on your own - upload `curated_imgs.zip` and skip to the next step. Otherwise you can run the following code and download the curated dataseet from GitHub.

In [1]:
!wget -q https://raw.githubusercontent.com/mtsizh/galaxy-morphology-manifold-learning/main/curated_dataset/curated_imgs_multipart.zip && echo "HEAD dowloaded" || "ERROR downloading HEAD"

for i in range(1,8):
  !wget -q https://raw.githubusercontent.com/mtsizh/galaxy-morphology-manifold-learning/main/curated_dataset/curated_imgs_multipart.z0{i}  && echo "PART {i} of 7 OK" || "ERROR downloading PART {i}"

print('MERGING PARTS')
!zip -FF curated_imgs_multipart.zip --out curated_imgs.zip > /dev/null && rm curated_imgs_multipart.z* && echo "COMPLETE" || "FAILED"


HEAD dowloaded
PART 1 of 7 OK
PART 2 of 7 OK
PART 3 of 7 OK
PART 4 of 7 OK
PART 5 of 7 OK
PART 6 of 7 OK
PART 7 of 7 OK
MERGING PARTS
COMPLETE


Unzip the curated dataset.

In [2]:
!unzip -q -o curated_imgs.zip && echo "UNZIPPED" || "FAIL"

UNZIPPED


Few libraries are not installed by default. the following code installs `optuna` and `umap-learn`.

In [3]:
!pip install optuna
!pip install umap-learn

try:
  import optuna
  import umap
  from google.colab import output
  output.clear()
except:
  print('ERROR')
finally:
  print('COMPLETE')

COMPLETE


Run the following code to generate a report on different methods. `optuna` is used to get the best parameters for each of the methods: t-SNE, uMap, IsoMap, LLE, PCA. Result is saved in form of a `json` file.

In [None]:
import optuna
import numpy as np
from sklearn.manifold import TSNE, LocallyLinearEmbedding, Isomap
from sklearn.decomposition import PCA
from umap import UMAP
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score, train_test_split
import pandas as pd
import numpy as np
from tqdm.auto import tqdm
from PIL import Image
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
import json
import pprint


# use different class maps to get different estimations
class_map = {1: 'round', 2: 'inbetween', 3: 'cigar'}
#class_map = {4: 'edge on', 5: 'edge off'}
#class_map = {6: 'smooth', 7: 'featured'}
methods = ['t-SNE', 'LLE', 'Isomap', 'PCA', 'uMap']
n_bootstrap_samples = 4000
n_parameter_trials = 50


df = pd.read_parquet('curated_dataset.parquet')
regex_filter = '|'.join(class_map.values())
filtered_df = df[df['class'].str.contains(regex_filter, regex=True)]
bootstrapped_df = filtered_df.sample(n=n_bootstrap_samples, random_state=25)
X = np.zeros((len(bootstrapped_df), 120, 120))
y = np.zeros(len(bootstrapped_df))

for key, val in class_map.items():
  y[bootstrapped_df['class'].str.contains(val, regex=True)] = key

print('Dataset balance:')
for k,v in class_map.items():
  print(f'class {v} has {np.sum(y == k)} items')
print('-----------------------------------------')

print('LOAD IMAGES')
paths = bootstrapped_df['png_loc'].str.replace('dr5', 'curated_imgs')
with tqdm(total=len(paths)) as progress:
  for idx, file_path in enumerate(paths):
    with Image.open(file_path) as img:
      X[idx,:,:] = np.array(img)
      progress.update()
X_flattened = X.reshape(X.shape[0], -1)



def objective(trial, methods):
  dr_method = trial.suggest_categorical('dr_method', methods)

  if dr_method == 't-SNE':
    # for t-SNE # of components should be no greater that # of samples and # of features
    n_components = trial.suggest_int('n_components', 2, np.min([200, X.shape[0]-1, X.shape[1]-1]))
    perplexity = trial.suggest_int('perplexity', 5, min(50, X.shape[0]-1)) # perplexity < samples
    learning_rate = trial.suggest_float('learning_rate', 10, 1000, log=True)
    reducer = TSNE(n_components=n_components, perplexity=perplexity,
                    learning_rate=learning_rate, method='exact')
  elif dr_method == 'LLE':
    n_components = trial.suggest_int('n_components', 2, min(200, X.shape[0]-1)) # components < samples
    n_neighbors = trial.suggest_int('n_neighbors', min(2, n_components),
                                    max(50, n_components)) # neighbors <= samples
    reducer = LocallyLinearEmbedding(n_components=n_components, n_neighbors=n_neighbors)
  elif dr_method == 'Isomap':
    n_components = trial.suggest_int('n_components', 2, min(200, X.shape[0]-1)) # components < samples
    n_neighbors = trial.suggest_int('n_neighbors', 10, min(50, X.shape[0]//2))
    reducer = Isomap(n_components=n_components, n_neighbors=n_neighbors)
  elif dr_method == 'PCA':
    n_components = trial.suggest_int('n_components', 2, np.min([200, X.shape[0]-1, X.shape[1]-1]))
    reducer = PCA(n_components=n_components)
  elif dr_method == 'uMap':
    n_components = trial.suggest_int('n_components', 2, np.min([200, X.shape[0]//2, X.shape[1]-1]))
    n_neighbors = trial.suggest_int('n_neighbors', 5, np.min(50, X[0]-1)) # neighbors < samples
    reducer = UMAP(n_neighbors=n_neighbors, n_components=n_components)


  try: #ISOMAP fails without any reason
    X_reduced = reducer.fit_transform(X_flattened)
  except ValueError as e:
    print(f"Skipping {method} trial due to error: {e}")
    return -np.inf # not to spoil the result

  #n_estimators = trial.suggest_int('n_estimators', 50, 300)
  #clf = RandomForestClassifier(n_estimators=n_estimators)
  clf = make_pipeline(StandardScaler(),
                      LogisticRegression(solver='lbfgs', max_iter=1000, random_state=42))

  return np.mean(cross_val_score(clf, X_reduced, y, cv=5))

# ignore annoying futurewarnings
import warnings
warnings.filterwarnings("ignore", message=".*force_all_finite.*", category=FutureWarning)

result = []
for method in methods:
  print(f'********************************{method}***************************')
  study = optuna.create_study(direction="maximize")
  study.optimize(lambda T: objective(T, [method]),
                n_trials=n_parameter_trials, show_progress_bar=True, n_jobs=2)
  print("Best parameters:", study.best_params, "Best value:", study.best_value)
  result.append(study.best_params)
  result[-1]['best_vavlue'] = study.best_value


pretty_json_str = pprint.pformat(result, compact=True).replace("'",'"')
with open("results.json", "w") as outfile:
    outfile.write(pretty_json_str)

Dataset balance:
class round has 1332 items
class inbetween has 2019 items
class cigar has 649 items
-----------------------------------------
LOAD IMAGES


  0%|          | 0/4000 [00:00<?, ?it/s]

[I 2025-02-16 17:49:11,806] A new study created in memory with name: no-name-2badf700-ae4f-4f70-b86a-b64a90b6067a


********************************t-SNE***************************


  0%|          | 0/50 [00:00<?, ?it/s]

[I 2025-02-16 18:46:12,876] Trial 0 finished with value: 0.7905 and parameters: {'dr_method': 't-SNE', 'n_components': 16, 'perplexity': 34, 'learning_rate': 26.495919529954204}. Best is trial 0 with value: 0.7905.


In [None]:
with open('results.json') as json_file:
  data = json.load(json_file)
data