# Connect To Google Drive


In [None]:
# connecting to google drive
from google.colab import drive
drive.mount('/content/drive')

# Install Packages

In [None]:
!pip install hdbscan
!pip install scikit-learn-extra

In [None]:
import numpy as np
from src.Helpers.Clustering import Clustering
import hdbscan
from sklearn.metrics import f1_score
import torch
from configuration import num_of_labeled_data

# Prepare Data

In [None]:
# creating dataset using some labeled and a lot of unlabeled samples
import pickle
data_dict = None
with open('/content/human_supervision_data.pickle', 'rb') as handle:
    data_dict = pickle.load(handle)

In [None]:
# change structure
prompt_template = "qnli question: qqqqqq sentence: cccccc"
inputs = []
outputs = []
for key in data_dict.keys():
  for item in data_dict[key]:
    inputs.append(prompt_template.replace('qqqqqq',item[0]).replace('cccccc',item[1]))
    outputs.append(2 if key == 'high' else 1 if key == 'medium' else 0)
labeled_data = (inputs , outputs)

In [None]:

# get unlabeled_data
with open('/content/drive/MyDrive/Persian LLM/New Codes/CRAG/unlabeled_data_train_stripped.pickle', 'rb') as handle:
    unlabeled_data = pickle.load(handle)

In [None]:
clustering = Clustering(unlabeled_data,labeled_data)
with open('/content/drive/MyDrive/Persian LLM/New Codes/CRAG/embeddings_data_train_stripped.pickle', 'rb') as handle:
    out_dict = pickle.load(handle)
    clustering.data = out_dict['data']
    clustering.labels = out_dict['labels']
clustering.data = np.array(clustering.data)
clustering.labels = np.array(clustering.labels)

## Finding the intrinsic dimension

The following code only shows the best dimension for the latent space of the auto-encoder

In [None]:
!pip install skdim
from skdim import id
# Initialize the TwoNN estimator
estimator = id.TwoNN()

# Fit the model and estimate intrinsic dimension
intrinsic_dim = estimator.fit_transform(clustering.data)
print(intrinsic_dim)

## Dimension Reduction

In [None]:
# clustering.data = clustering.dimensionality_reduction_use_labeled_data_only()
clustering.train_autoencoder()
clustering.data = clustering.dimensionality_reduction()

# Visualization

In [None]:
# import matplotlib.pyplot as plt
# from sklearn.manifold import TSNE

# # tsne = TSNE(n_components=2, random_state=42)
# # tsne_data = tsne.fit_transform(clustering.data)
# # tsne_data = clustering.data


# tsne = TSNE(n_components=2, random_state=42)
# tsne_data = tsne.fit_transform(clustering.data)

# # labels = labeled_data[1] Wrong!
# labels = clustering.labels[:num_of_labeled_data].tolist()
# unknown_labels = [clustering.UNK_LABEL] * (num_of_data + num_of_labeled_data - len(labels))
# labels += unknown_labels

# color_map = {clustering.LOW_LABEL: ('gray', 'Low'), clustering.AMBIGOUS_LABEL: ('green', 'Medium'), clustering.HIGH_LABEL: ('red', 'High'), clustering.UNK_LABEL: ('black', 'Unknown')}

# plt.figure(figsize=(10, 10))

# indices = [i for i, x in enumerate(labels) if x == clustering.UNK_LABEL]
# plt.scatter(tsne_data[indices, 0], tsne_data[indices, 1], c=color_map[clustering.UNK_LABEL][0], label=color_map[clustering.UNK_LABEL][1], zorder=1,s=4)

# for label in set(labels):
#    if label != clustering.UNK_LABEL:
#       indices = [i for i, x in enumerate(labels) if x == label]
#       plt.scatter(tsne_data[indices, 0], tsne_data[indices, 1], c=color_map[label][0], label=color_map[label][1], zorder=label+3,s=100)

# plt.legend()
# plt.title('TSNE Plot before clustering method')
# plt.xlabel('X-axis')
# plt.ylabel('Y-axis')
# plt.show()

# Applying Clustering Algorithm

In [None]:
# uncomment one of the below lines
# clustering_algorithm = "hdbscan"
clustering_algorithm = "kmeans"
# clustering_algorithm = "agg"
# clustering_algorithm = "ls"
# clustering_algorithm = "kmedoids"

In [None]:
if clustering_algorithm == "kmeans":
  new_labels = clustering.start_kmeans()
elif clustering_algorithm == "agg":
  new_labels = clustering.start_agglomerative()
elif clustering_algorithm == "hdbscan":
  new_labels = clustering.start_hdbscan()
elif clustering_algorithm == "ls":
  new_labels = clustering.start_label_spreading()
elif clustering_algorithm == "kmedoids":
  new_labels = clustering.start_kmedoids()



### Loading Test Data Here

In [None]:


with open('/content/embeddings_data.pickle', 'rb') as handle:
    out_dict = pickle.load(handle)
    data = np.array(out_dict['data'][:189])
    labels = np.array(out_dict['labels'][:189])
    dimention_reductioned_data = clustering.autoencoder.encoder(torch.tensor(data, dtype=torch.float32)).detach().numpy()
    if clustering_algorithm == "kmeans":
      predicted_labels = clustering.kmeans.predict(dimention_reductioned_data)
    elif clustering_algorithm == "agg":
      predicted_labels = clustering.agg.fit_predict(dimention_reductioned_data)
    elif clustering_algorithm == "hdbscan":
      predicted_labels= clustering.hdbscan.fit_predict(dimention_reductioned_data)
    elif clustering_algorithm == "ls":
      predicted_labels = clustering.label_spread.predict(dimention_reductioned_data)
    elif clustering_algorithm == "kmedoids":
      predicted_labels = clustering.kmedoids.predict(dimention_reductioned_data)

    print("Accuracy on test passages: ",clustering.evaluate_clustering_results(real_labels=labels,predicted_labels=predicted_labels))
    f1_score = f1_score(labels, predicted_labels, average='micro')
    print("F1 Score: ", f1_score)

In [None]:
with open('/content/embeddings_data_stripped.pickle', 'rb') as handle:
    out_dict = pickle.load(handle)
    data = np.array(out_dict['data'][:180])
    labels = np.array(out_dict['labels'][:180])
    dimention_reductioned_data = clustering.autoencoder.encoder(torch.tensor(data, dtype=torch.float32)).detach().numpy()
    if clustering_algorithm == "kmeans":
      predicted_labels = clustering.kmeans.predict(dimention_reductioned_data)
    elif clustering_algorithm == "agg":
      predicted_labels = clustering.agg.fit_predict(dimention_reductioned_data)
    elif clustering_algorithm == "hdbscan":
      predicted_labels, _ = hdbscan.approximate_predict(clustering.hdbscan,dimention_reductioned_data)
    elif clustering_algorithm == "ls":
      predicted_labels = clustering.label_spread.predict(dimention_reductioned_data)
    elif clustering_algorithm == "kmedoids":
      predicted_labels = clustering.kmedoids.predict(dimention_reductioned_data)
    print("accuracy on test strips: ", clustering.evaluate_clustering_results(real_labels=labels,predicted_labels=predicted_labels))

# Evaluation and Save Results


In [None]:
mapped_labels = clustering.map_labels(clustering.labels[:num_of_labeled_data], new_labels)

In [None]:
clustering.evaluate_clustering_results(real_labels=clustering.labels[:num_of_labeled_data],predicted_labels=new_labels[:num_of_labeled_data])

In [None]:
with open('/content/clustering_result.pickle', 'wb') as handle:
    pickle.dump({"texts" : labeled_data[0] + unlabeled_data , "labels" : mapped_labels},handle)