In [20]:
import sys
import os
sys.path.insert(0, '..')

In [21]:
os.environ["UNIVERSE_DIR"] = ("/home/lennon/workspace/hdd/ecad_database/features/65k/universe_test")

In [22]:
# from identificationWorker.utils import file_utils
import os
from tqdm import tqdm
import deepdish as dd
import glob


def get_unique_by_order(data):
    from collections import OrderedDict

    w = list(OrderedDict.fromkeys(data))
    return w


class DataLoader:
    """
    A class for loading and managing data for the identification engine.

    Args:
        universe_dir (str): The directory path where the universe data is stored.

    """

    def __init__(self, universe_dir):
        self.universe_mapping = []
        self.universe_features = []
        self.universe_context_idx = []
        self.ready = False
        self.universe_dir = universe_dir
        self.timestamp_dataloader = None
        self.universe_selection = []

    def get_status(self) -> bool:
        return self.ready

    def set_status(self, st: bool):
        self.ready = st

    def get_features_path(self, selection: list[str] = None) -> list[str]:
        """
        Retrieves the paths of the .h5 feature files.

        Args:
            selection (list[str], optional): A list of directory names to filter the search. Defaults to None.

        Returns:
            list[str]: A list of paths to the .h5 feature files.

        Raises:
            ValueError: If no .h5 files are found in the specified directory/directories.
        """
        h5_list = []
        if selection is not None:
            for dir_name in selection:
                h5_files = glob.glob(os.path.join(self.universe_dir, dir_name, "*.h5"))

                if not h5_files:
                    raise ValueError(f"No .h5 files found in directory {dir_name}")
                h5_list.extend(h5_files)
        else:
            h5_list = glob.glob(os.path.join(self.universe_dir, "**/*.h5"))
            if not h5_list:
                raise ValueError(
                    f"No .h5 files found in the universe directory {self.universe_dir}"
                )

        return h5_list

    def load_h5_features(self, file: str) -> dict:
        try:
            file_data = dd.io.load(file)

            if "segments" in file_data:
                return file_data["segments"][0]
            else:
                return file_data
        except Exception:
            print(f"Error: {file} not found or corrupt data.")
            return None

    def get_h5data_dict(self, work_dict):
    
        file = os.path.join(self.universe_dir, work_dict["obra"], work_dict["fonograma"] + ".h5")

        features = self.load_h5_features(file)

        return features
    
    def get_h5data(self, work_id, track_id):    
        file = os.path.join(self.universe_dir, work_id, track_id + ".h5")

        features = self.load_h5_features(file)

        return features
    
    def batch_load_h5data(self, list_files):
        pass

    def load_universe(self, selection: list[str] = None) -> bool:
        """
        Loads the universe data by iterating over the provided selection of files.

        Args:
            selection (list[str], optional): A list of file paths to load. Defaults to None.

        Returns:
            bool: True if the universe data is successfully loaded, False otherwise.

        Raises:
            ValueError: If no universe files data is found.
        """
        universe_features_path: list[str] = self.get_features_path(selection)

        if not universe_features_path:
            raise ValueError("No universe files data found.")
        else:
            for file in universe_features_path:
                parts = file.split("/")
                work_id = parts[-2]
                track_id = parts[-1].split(".")[0]
                self.universe_mapping.append({"obra": work_id, "fonograma": track_id})
                self.universe_selection.append(f"{work_id}")
        universe_features_path = []

        return True

    def set_universe(self, selection: list[str] = None) -> None:
        """
        Prepares the data loader by loading the universe data and setting the readiness status.

        Args:
            selection (list[str], optional): List of universe selections. Defaults to None.

        Returns:
            bool: True if the universe features and mapping are loaded successfully, False otherwise.
        """
        self.ready = False
        self.universe_mapping = []
        self.universe_features = []
        self.universe_context_idx = []
        self.universe_selection = []
        if self.load_universe(selection):
            if len(self.universe_mapping) > 0:
                self.ready = True
                # self.universe_selection = selection

    def get_universe(self):
        return self.universe_mapping

    def get_universe_idx(self):
        return range(len(self.universe_mapping))

    def get_universe_mapping(self):
        return self.universe_mapping

    def _search_restricted_universe(self, universe_context=None):
        self.universe_context_idx = []

        for idx, u in enumerate(self.universe_mapping):
            if u["obra"] in universe_context:
                self.universe_context_idx.append(idx)

        return self.universe_context_idx

    def get_restricted_universe(
        self, universe_context_by_id=None, universe_context_by_idx=None
    ):
        if universe_context_by_id is None and universe_context_by_idx is None:
            return (
                self.get_universe_idx(),
                self.universe_mapping,
            )

        self.universe_context_idx = []
        # universe_restricted = []
        universe_restricted_mapping = []

        if universe_context_by_id is not None:
            for idx, u in enumerate(self.universe_mapping):
                if u["obra"] in universe_context_by_id:
                    self.universe_context_idx.append(idx)
                    # universe_restricted.append(self.universe_features[idx])
                    universe_restricted_mapping.append(u)

        else:
            for idx in universe_context_by_idx:
                self.universe_context_idx.append(idx)
                # universe_restricted.append(self.universe_features[idx])
                universe_restricted_mapping.append(self.universe_mapping[idx])

        return (
            self.universe_context_idx,
            # universe_restricted,
            universe_restricted_mapping,
        )

    def get_restricted_universe_idx(self, universe_context=None):
        if universe_context is not None:
            self._search_restricted_universe(universe_context=universe_context)

        if len(self.universe_context_idx) == 0:
            return self.get_universe_idx()

        return self.universe_context_idx

    def get_work_from_idx(self, list_idx):
        worksList = []
        for idx in list_idx:
            worksList.append(self.universe_mapping[idx]["obra"])

        return get_unique_by_order(worksList)

    def get_fonograma_from_idx(self, list_idx):
        fonogramaList = []
        for idx in list_idx:
            fonogramaList.append(self.universe_mapping[idx]["fonograma"])

        return fonogramaList

In [23]:
UNIVERSE_DIR = os.getenv("UNIVERSE_DIR")
dl = DataLoader(UNIVERSE_DIR)

In [24]:
dl.load_universe()

True

In [25]:
features_path = dl.get_features_path()
features_list = [dl.load_h5_features(path) for path in features_path]

In [45]:
features_list[0]["cqtnet"].keys()

dict_keys(['features', 'params'])

In [27]:
def load_features(feature_name):
    features_path = dl.get_features_path()
    features_list = [dl.load_h5_features(path) for path in features_path]
    vectors = [feature[feature_name]["features"] for feature in features_list]
    payloads = dl.get_universe_mapping()
    return vectors, payloads

In [28]:
vectors_cqtnet, payloads_cqtnet = load_features("cqtnet")
vectors_coverhunter, payloads_coverhunter = load_features("coverhunter")
vectors_cqtnet_v1, payloads_cqtnet = load_features("cqtnet_v1")

In [29]:
from db.qdrant import Qdrant
qdrant = Qdrant()

In [30]:
from abc import ABC
from typing import List, TypedDict, overload
import numpy as np
from db.vector_db import Item, VectorDB

class Filter(TypedDict):
    obra: str
    fonograma: str


class FeatureRepository(ABC):
    def __init__(self, vector_db: VectorDB):
        self.vector_db = vector_db
        self.collection_name = None
        self.dim = None

    def try_create_collection(self):
        self.vector_db.try_create_collection(self.collection_name, self.dim)
    
    # TODO: Add basic logic to process the feature
    def process(self, feature: np.ndarray):
        return feature

    def add(self, features: list[np.ndarray], payloads=None):
        for i, feature in enumerate(features):
            vectors = [vector for vector in feature]
            payload = payloads[i] if payloads else None
            if payload:
                payload = [payload for _ in vectors]
            
            self.vector_db.add(self.collection_name, vectors, payload)

    def search(self, queries: np.ndarray, top_k=5, filter: dict = None):
        for feature in queries:
            vectors = [vector for vector in feature]
            yield self.vector_db.search(
                self.collection_name, vectors, top_k=top_k, filter=filter
            )
    
    @overload
    def get(self, id: str) -> List[Item]: ...

    @overload
    def get(self, filter: Filter, top_k=5) -> List[Item]: ...
        
    def get(self, id_or_filter: str = None, top_k=5):
        return self.vector_db.get(self.collection_name, id_or_filter, top_k)

    def delete(self, id: str):
        self.vector_db.delete(self.collection_name, id)

    def delete_collection(self):
        self.vector_db.delete_collection(self.collection_name)


In [31]:
class CQTNetRepository(FeatureRepository):
    def __init__(self, vector_db: VectorDB):
        self.vector_db = vector_db
        self.collection_name = "cqtnet_test" # TODO: Mudar para o nome correto
        self.dim = 300

class CQTNetV1Repository(FeatureRepository):
    def __init__(self, vector_db: VectorDB):
        self.vector_db = vector_db
        self.collection_name = "cqtnet_v1_test" # TODO: Mudar para o nome correto
        self.dim = 300

class FusionRepository(FeatureRepository):
    def __init__(self, vector_db: VectorDB):
        self.vector_db = vector_db
        self.collection_name = "fusion_test" # TODO: Mudar para o nome correto
        self.dim = 300

    def add(self, features_1: np.ndarray, features_2: np.ndarray, payloads=None):
        for i, (f1, f2) in enumerate(zip(features_1, features_2)):
            vectors = [(v1 + v2) / 2 for v1, v2 in zip(f1, f2)]
            payload = payloads[i] if payloads else None
            if payload:
                payload = [payload for _ in vectors]
            self.vector_db.add(self.collection_name, vectors, payload)

class CoverhunterRepository(FeatureRepository):
    def __init__(self, vector_db: VectorDB):
        self.vector_db = vector_db
        self.collection_name = "coverhunter_test" # TODO: Mudar para o nome correto
        self.dim = 128

In [32]:
cqtnet_repository = CQTNetRepository(qdrant)
coverhunter_repository = CoverhunterRepository(qdrant)
cqtnetv1_repository = CQTNetV1Repository(qdrant)
fusion_repository = FusionRepository(qdrant)


In [14]:
cqtnet_repository.try_create_collection()
coverhunter_repository.try_create_collection()
cqtnetv1_repository.try_create_collection()
fusion_repository.try_create_collection()



In [15]:
cqtnet_repository.add(vectors_cqtnet[20:35], payloads=payloads_cqtnet[20:35])
coverhunter_repository.add(vectors_coverhunter[20:35], payloads=payloads_coverhunter[20:35])
fusion_repository.add(vectors_cqtnet[20:35], vectors_cqtnet_v1[20:35], payloads=payloads_coverhunter[20:35])

In [16]:
results_cqtnet = cqtnet_repository.search(vectors_cqtnet[23:26])
results_coverhunter = coverhunter_repository.search(vectors_coverhunter[23:26])
results_fusion = fusion_repository.search(vectors_cqtnet[23:26])

In [17]:
for result in results_cqtnet:
    print(result)

print("----------------------")

for result in results_coverhunter:
    print(result)

print("----------------------")

for result in results_fusion:
    print(result)

[[{id: d335d04d-73bd-4773-9185-6140da4ab98d, score: 0.99999994, vector: None, payload: {'obra': '1673194', 'fonograma': '1727267'}}, {id: 9419bffd-f0ec-4a51-8164-cdd8824cd55b, score: 0.99999994, vector: None, payload: {'obra': '1673194', 'fonograma': '1727267'}}, {id: 084af5c6-2d7c-470d-9a73-ba22358459ab, score: 0.9517004, vector: None, payload: {'obra': '1673194', 'fonograma': '1727267'}}, {id: cabb26fc-c780-4146-9126-6e88a9d2310d, score: 0.9517004, vector: None, payload: {'obra': '1673194', 'fonograma': '1727267'}}, {id: 9559f39a-1f13-460e-a471-c9acca524823, score: 0.8960926, vector: None, payload: {'obra': '1673194', 'fonograma': '1727267'}}], [{id: 084af5c6-2d7c-470d-9a73-ba22358459ab, score: 1.0, vector: None, payload: {'obra': '1673194', 'fonograma': '1727267'}}, {id: cabb26fc-c780-4146-9126-6e88a9d2310d, score: 1.0, vector: None, payload: {'obra': '1673194', 'fonograma': '1727267'}}, {id: 9419bffd-f0ec-4a51-8164-cdd8824cd55b, score: 0.9517004, vector: None, payload: {'obra': '16

In [18]:
id = result[0][0].id

In [19]:
cqtnet_repository.get(id)

In [20]:
payload:dict = payloads_cqtnet[22]
cqtnet_repository.get(payload)

[{id: 027c4ecd-bc45-4268-b2b1-4deebf9d61c0, score: None, vector: [ 2.83927200e-02  4.74516340e-02  6.27961900e-02  3.83075250e-02
  -3.18318350e-03 -2.35605870e-02  1.20053840e-01  4.80813770e-03
   4.68056100e-02 -8.09722800e-02 -9.22862100e-03 -2.40580100e-02
   5.06423900e-02 -1.07475850e-01 -1.08289026e-01 -8.76876500e-03
   5.62435950e-02  1.79245800e-02 -4.55424970e-02 -1.39570200e-01
   1.65617930e-02 -6.49826930e-03  5.60289030e-02  6.47199750e-02
  -8.59785400e-02 -6.15384060e-02 -2.83355200e-02 -1.11086994e-01
  -3.44939600e-02 -1.84389870e-02  1.79240100e-02  1.30923780e-02
  -1.56682190e-02 -1.39404510e-02 -1.64514560e-02 -1.40307370e-01
  -5.17731870e-02 -4.82486520e-02 -1.25329720e-02 -6.09695460e-02
   2.15154630e-02 -3.35599500e-02  3.72267480e-02 -1.78018030e-02
   6.06334770e-03  1.92668000e-02 -1.21579490e-02 -2.26939770e-02
  -2.53826510e-02 -4.53910860e-02 -5.18394520e-02  9.07475400e-03
   2.48702150e-03  2.08420500e-02 -2.81664230e-02 -8.42049800e-02
   4.9270216

## TODO: FeaturesDbLoader

In [73]:
import copy
from sklearn.preprocessing import normalize
from typing_extensions import Unpack


class PreprocessOptions(TypedDict):
    granularity: str
    with_confidency: bool


class FeaturesDbLoader:
    def __init__(
        self,
        data_loader: DataLoader,
        cqtnet_repository: CQTNetRepository,
        cqtnetv1_repository: CQTNetV1Repository,
        coverhunter_repository: CoverhunterRepository,
        fusion_repository: FusionRepository,
    ):
        self.data_loader = data_loader
        self.cqtnet_repository = cqtnet_repository
        self.cqtnetv1_repository = cqtnetv1_repository

        self.coverhunter_repository = coverhunter_repository
        self.fusion_repository = fusion_repository

    def get_feature_vector(
        self, feature: dict, feature_name: str, **pre_process_options: Unpack[PreprocessOptions]
    ) -> np.ndarray:
        if pre_process_options:
            return self.pre_process_features(
                feature,
                feature_name,
                pre_process_options.get("granularity", "all"),
                pre_process_options.get("with_confidency", True),
            )
        return feature[feature_name]["features"]

    def load_features(self, *features: str, **pre_process_options: Unpack[PreprocessOptions]):
        features_path = self.data_loader.get_features_path()
        features_list = [
            self.data_loader.load_h5_features(path) for path in features_path
        ]
        results = []
        for feature_name in features:
            if feature_name == "fusion":
                vectors_cqtnet = [
                    self.get_feature_vector(feature, "cqtnet", **pre_process_options)
                    for feature in features_list
                ]
                vectors_cqtnet_v1 = [
                    self.get_feature_vector(feature, "cqtnet_v1", **pre_process_options)
                    for feature in features_list
                ]
                vectors = [
                    (v1 + v2) / 2 for v1, v2 in zip(vectors_cqtnet, vectors_cqtnet_v1)
                ]
            else:
                vectors = [
                    self.get_feature_vector(feature, feature_name, **pre_process_options)
                    for feature in features_list
                ]
            results.append(vectors)

        results.append(dl.get_universe_mapping())

        return tuple(results)

    def add_features(self, *features: str, **pre_process_options: Unpack[PreprocessOptions]):
        features_payload = self.load_features(*features, **pre_process_options)
        vectors = features_payload[: len(features) - 1]
        payloads = features_payload[-1]
        feature_repository_map: dict[str, FeatureRepository] = {
            "cqtnet": cqtnet_repository,
            "cqtnet_v1": cqtnetv1_repository,
            "coverhunter": coverhunter_repository,
            "fusion": fusion_repository,
        }

        for f, v in zip(features, vectors):
            feature_repository_map[f].add(v, payloads)

    def pre_process_features(
        self, feature: dict, feature_name: str, granularity: str, with_confidence=True
    ):
        chroma_feat = feature_name

        if with_confidence:
            if "features" in feature["confidence"]:
                if granularity == "all":
                    vectors = feature[chroma_feat]["features"]
                    confidence = self._ret_confidence(feature["confidence"]["features"])
                elif granularity == "only_all_song":
                    vectors = feature[chroma_feat]["features"][None, -1, :]
                    confidence = 1.0
                else:
                    vectors = feature[chroma_feat]["features"][0:-1, :]
                    confidence = self._ret_confidence(
                        feature["confidence"]["features"][0:-1, :]
                    )
            elif "confidence" in feature:
                if granularity == "all":
                    vectors = feature[chroma_feat]
                    confidence = self._ret_confidence(feature["confidence"])
                elif granularity == "only_all_song":
                    vectors = feature[chroma_feat][None, -1, :]
                    confidence = 1.0
                else:
                    vectors = feature[chroma_feat][0:-1, :]
                    confidence = self._ret_confidence(feature["confidence"][0:-1, :])
            else:
                if granularity == "all":
                    vectors = feature[chroma_feat]
                elif granularity == "only_all_song":
                    vectors = feature[chroma_feat][None, -1, :]
                else:
                    vectors = feature[chroma_feat][0:-1, :]

                confidence = 1.0

        else:
            if "features" in feature[chroma_feat]:
                if feature[chroma_feat]["features"].shape[0] > 1:
                    if granularity == "all":
                        vectors = feature[chroma_feat]["features"]
                    elif granularity == "only_all_song":
                        vectors = feature[chroma_feat]["features"][None, -1, :]
                    else:
                        vectors = feature[chroma_feat]["features"][0:-1, :]
                else:
                    vectors = feature[chroma_feat]["features"]
            else:
                if feature[chroma_feat].shape[0] > 1:
                    if granularity == "all":
                        vectors = feature[chroma_feat]
                    elif granularity == "only_all_song":
                        vectors = feature[chroma_feat][None, -1, :]
                    else:
                        vectors = feature[chroma_feat][0:-1, :]
                else:
                    vectors = feature[chroma_feat]

        vectors = normalize(vectors, norm="l2")

        if with_confidence:
            size = min(vectors.shape[0], len(confidence))
            vectors = (vectors[:size, :] * confidence[:size, :]).astype(np.float32)
            # vectors = (vectors * confidence).astype("float32")
        else:
            vectors = vectors.astype("float32")

        return vectors

    def _ret_confidence(self, confidence, th=0.6):
        conf = copy.deepcopy(confidence)
        conf[confidence > th] = 1.0
        return conf

In [55]:
def test(*p1, **p2):
    return p1, p2

test("a", "b", t1="a", t2="b")

(('a', 'b'), {'t1': 'a', 't2': 'b'})

In [78]:
fdbl = FeaturesDbLoader(
    data_loader=dl,
    cqtnet_repository=cqtnet_repository,
    cqtnetv1_repository=cqtnetv1_repository,
    coverhunter_repository=coverhunter_repository,
    fusion_repository=fusion_repository,
)

In [88]:
features = fdbl.load_features("cqtnet", "coverhunter", "fusion")
print(features)

([array([[ 0.01541745,  0.07506393,  0.01054523, ..., -0.02282887,
        -0.01075536,  0.03090297],
       [ 0.04096205,  0.04962189,  0.0201225 , ..., -0.00987714,
        -0.01539594,  0.02178608],
       [ 0.01038085,  0.02389989,  0.04939166, ..., -0.03447903,
         0.00838493,  0.01228601],
       ...,
       [ 0.01557735,  0.04114617,  0.03753144, ...,  0.04654374,
         0.06647179,  0.04737635],
       [ 0.04983202,  0.04720256,  0.04035247, ...,  0.05469109,
         0.06633258,  0.03521673],
       [ 0.05755227,  0.02419389,  0.0326692 , ...,  0.03170856,
         0.05515858,  0.07723529]]), array([[-0.05073863,  0.00055979, -0.00645618, ..., -0.02517009,
         0.01256078, -0.01730108],
       [-0.05978299, -0.02398369,  0.00820911, ..., -0.03262146,
         0.00724259, -0.02820942],
       [-0.0640671 ,  0.00290098, -0.01521912, ..., -0.03640061,
         0.02998048, -0.02115773],
       ...,
       [-0.00319899, -0.02229429,  0.0400708 , ..., -0.00486986,
       

In [89]:
features_pre_processed = fdbl.load_features("cqtnet", "coverhunter", "fusion", granularity="all", with_confidency=True)
print(features_pre_processed)

([array([[ 0.01541745,  0.07506393,  0.01054523, ..., -0.02282887,
        -0.01075536,  0.03090297],
       [ 0.04096205,  0.04962188,  0.0201225 , ..., -0.00987714,
        -0.01539594,  0.02178608],
       [ 0.01038085,  0.02389989,  0.04939166, ..., -0.03447903,
         0.00838493,  0.01228601],
       ...,
       [ 0.00778867,  0.02057308,  0.01876572, ...,  0.02327187,
         0.03323589,  0.02368817],
       [ 0.02491601,  0.02360128,  0.02017623, ...,  0.02734555,
         0.03316629,  0.01760837],
       [ 0.05755227,  0.02419389,  0.0326692 , ...,  0.03170856,
         0.05515857,  0.07723529]], dtype=float32), array([[-0.05073863,  0.00055979, -0.00645618, ..., -0.02517009,
         0.01256078, -0.01730108],
       [-0.05978299, -0.02398369,  0.00820911, ..., -0.03262146,
         0.00724259, -0.02820942],
       [-0.0640671 ,  0.00290098, -0.01521912, ..., -0.0364006 ,
         0.02998048, -0.02115773],
       ...,
       [-0.00319899, -0.02229429,  0.04007079, ..., -0.00

In [23]:
fdbl.add_features("cqtnet", "coverhunter", "fusion")

In [24]:
cqtnet_repository.delete_collection()
coverhunter_repository.delete_collection()
cqtnetv1_repository.delete_collection()
fusion_repository.delete_collection()