# Create a structured database for the Iris dataset

### Common Imports

In [None]:
#%% lib.py
# from json import load, dumps
from pathlib import Path
from datetime import datetime
## Logger setup
import logging
pymongo_logger = logging.getLogger('pymongo')
# Set its level to WARNING to silence INFO and DEBUG messages
pymongo_logger.setLevel(logging.WARNING)
class LoggerManager:
    """Singleton Logger Manager for consistent logging across the app."""
    _logger = None

    @classmethod
    def get_logger(cls, log_dir="logs/", name=__name__, level="DEBUG"):
        if cls._logger is None:
            log_dir = Path(log_dir)
            log_dir.mkdir(exist_ok=True)
            log_file = log_dir / f"{name}.log"
            logging.basicConfig(
                level=logging.DEBUG,
                # also log the calling function name
                format='%(asctime)s - %(name)s - %(funcName)s - %(levelname)s - %(message)s',
                # datefmt='%Y-%m-%d %H:%M:%S',
                handlers=[
                    logging.FileHandler(log_file, encoding="utf-8"),
                    logging.StreamHandler()
                ]
            )
            # set logging level
            if isinstance(level, str):
                level = getattr(logging, level.upper(), logging.INFO)
            cls._logger = logging.getLogger(name)
            cls._logger.setLevel(level)
        return cls._logger

import shutil
class FileManager:
    """Singleton File Manager for consistent file operations."""
    _instance = None
    lg = LoggerManager.get_logger(__name__)
    def __new__(cls):
        if cls._instance is None:
            cls._instance = super(FileManager, cls).__new__(cls)
        return cls._instance

    # @staticmethod
    # def load_json_file(filename):
    #     """Load and return data from a JSON file."""
    #     with open(filename, 'r') as file:
    #         return load(file)

    # @staticmethod
    # def write_json_file(filename, data):
    #     """Write data to a JSON file."""
    #     with open(filename, 'w') as file:
    #         file.write(dumps(data, indent=4, ensure_ascii=False))
    
    # @staticmethod
    def read_creds(self,filename="mongo_creds.txt"):
        """Read MongoDB credentials from a file using pathlib"""
        
        if not Path(filename).exists():
            _ERROR_MSG = f"Credentials file {filename} does not exist.Please create a mongo_creds.txt with user:passwd"
            self.lg.error(_ERROR_MSG)
            raise FileNotFoundError(_ERROR_MSG)
        user, passwd = Path(filename).read_text().strip().split(":")
        self.lg.debug("MongoDB credentials loaded successfully.")
        return user, passwd
    
    # @staticmethod
    def ensure_exists(self,path):
        # check if path is Path() type if not make it
        if not isinstance(path, Path):
            path = Path(path)
        if not path.exists():
            path.mkdir(parents=True, exist_ok=True)
        self.lg.debug(f"Ensured directory exists: {path}")
    
    # @staticmethod
    def copy_file(self, src, dest, overwrite=False):
        if isinstance(dest, str):
            dest = Path(dest)
        if not overwrite and dest.exists():
            self.lg.info(f"File {dest} already exists. Skipping copy.")
            return
        self.ensure_exists(dest.parent)
        shutil.copy2(src, dest)
        self.lg.info(f"Copied file from {src} to {dest}")



#%% Testing
# lg.WARNING
# import LoggerManager as lm
# logging.WARNING
# %%


In [None]:
# import sys
from pathlib import Path
# from lib import LoggerManager as lm, FileManager as fm, logging
lm=LoggerManager()
fm=FileManager()
# TODO: move common imports to vars
# sys.path.append(str(Path(__file__).parent))
# from vars import *

from pymongo import MongoClient, ASCENDING
from pymongo.collection import Collection
DB_BASE_ = Path("~/datasets/iris_db/").expanduser()
fm.ensure_exists(DB_BASE_)

### Copying to structured folder and adding to mongodb as well

In [None]:
DS_ID = "CASIA_v1"
DS_NAME_ = Path(DS_ID)
DS_BASE_ = DB_BASE_ / DS_NAME_
fm.ensure_exists(DS_BASE_)

ORIG_DB_BASE_ = Path("~/datasets/iris_datasets/CASIA/V1/CASIA-IrisV1/CASIA-IrisV1/CASIA Iris Image Database (version 1.0)").expanduser()

In [None]:

# insert into db

metadata_casia_v1 = {
    "ds_id": DS_ID,
    "name": "CASIA-IrisV1",
    "db_info": {
        "desc": """CASIA Iris Image Database Version 1.0 (CASIA-IrisV1) includes 756 iris images
        from 108 eyes. For each eye, 7 images are captured in two sessions with our
        self-developed device CASIA close-up iris camera (Fig.1), where three samples are
        collected in the first session (Fig.2(a)) and four in the second session (Fig.2(b)). All
        images are stored as BMP format with resolution 320*280
        In order to protect our IPR in the design of our iris camera (especially the NIR
        illumination scheme), the pupil regions of all iris images in CASIA-IrisV1 were
        automatically detected and replaced with a circular region of constant intensity to
        mask out the specular reflections from the NIR illuminators. Such editing
        clearly makes iris boundary detection much easier but has minimal or no effects on
        other components of an iris recognition system, such as feature extraction and
        classifier design.
        It is suggested that you compare two samples from the same eye taken in different
        sessions when you want to compute the within-class variability. For example, the iris
        images in the first session can be employed as training dataset and those from the
        second session are used for testing.
        """,
        "capture_device": "CASIA close-up iris camera",
        "environment": "Indoor, controlled lighting",
        "type":"NIR",
        "notes":"",
        "periocular":False
    },
    # "db_specs":{
    "num_images": 756,
    "num_people": 108,
    "num_eyes": 108,
    "num_eyes_per_person": 1,
    "num_samples_per_eye": 7,
    "num_sessions": 2,
    # },
    'img_specs':{
        "ext": ".bmp",
        "res": "320x280",
        "width": 320,
        "height": 280
    },
    'paths':{
        'orig_base_': str(ORIG_DB_BASE_),
        'base_': str(DS_BASE_),
    },
    'injested_at': datetime.now()
}


In [None]:
# create an unqiue index with ds_id
# coll.create_index([("ds_id", ASCENDING)], unique=True)

In [None]:
# class IRIS_DB:
#     def __init__(self, mongo_db_name=MONGO_DB_NAME, mongo_colleciton=META_COLL):
#         self.conn = MongoClient(DB_IP, username="admin", password="Temppass@123", authSource="admin")[mongo_db_name][db_name]

#     def insert_metadata(self, metadata):
#         self.conn.insert_one(metadata)

#     def find_metadata(self, ds_id):
#         return self.conn.find_one({"ds_id": ds_id})


In [None]:


# class IrisMeta:
#     """Class to handle operations related to the meta collection."""
#     def __init__(self, iris_db):
#         if not isinstance(iris_db, IrisDB):
#             raise ValueError("iris_db must be an instance of IrisDB")
#         self.iris_db = iris_db
#         self.coll = iris_db.get_meta_coll()

#     def get_all_metadata(self):
#         """Fetch all metadata documents from the meta collection."""
#         return list(self.coll.find({}, {'_id': 0}))

#     def get_metadata(self, ds_id):
#         """Fetch metadata for a specific dataset ID."""
#         return self.coll.find_one({'ds_id': ds_id}, {'_id': 0})

#     def update_metadata(self, metadata):
#         """Update or insert metadata document based on ds_id."""
#         if 'ds_id' not in metadata:
#             raise ValueError("Metadata must contain 'ds_id' field.")
#         self.iris_db.update_one(metadata, key='ds_id', collection=self.coll)
#         lg.info(f"Metadata for ds_id '{metadata['ds_id']}' updated/inserted.")
#         return True
    


In [None]:
# ## USAGE of IrisMeta

# # ```python
# # with IrisDB() as iris_db:
# #     iris_meta = IrisMeta(iris_db)
# #     all_metadata = iris_meta.get_all_metadata()
# #     specific_metadata = iris_meta.get_metadata("some_ds_id")
# #     iris_meta.update_metadata({"ds_id": "some_ds_id", "new_field": "value"})
# # ```


## IrisDB Class

In [None]:
# Setup logging
from functools import lru_cache
from difflib import get_close_matches

from time import sleep
# logger = LoggerManager.get_logger(name=Path(__file__).stem)
lg = lm.get_logger(__name__,level="DEBUG")
# lg = lm.get_logger(__name__,level="INFO")
# set log level to info
# lg.setLevel(logging.DEBUG)

class IrisDB:
    """
    In: None
    Out: IrisDB Object that can be used to connect to a particular db
    param: 
        ds_id - name of the dataset collection
    """
    DB_IP = 'localhost'
    DB_NAME = 'iris_db'
    META_COLL_NAME = 'meta'

    def __init__(
        self, 
        ds_id=None,
        db_ip=None, 
        mongo_db_name=None, 
        meta_coll_name=None
        # db_coll=None, # this could be used to shortcircuit the process?
        ) -> object:
        self.db_ip = self.DB_IP if db_ip is None else db_ip
        self.mongo_db_name = self.DB_NAME if mongo_db_name is None else mongo_db_name
        self.meta_coll_name = self.META_COLL_NAME if meta_coll_name is None else meta_coll_name
        # self.db_coll = DB_COLL if db_coll is None else db_coll
        ## get user:passwd from mongo_creds.txt file
        user,passwd = fm.read_creds()
        self._mongo_admin_user = user
        self._mongo_admin_password = passwd
        self.closing = False
        if ds_id is not None:
            self.connect(ds_id)
        # set it have the same properties as Collection class
        
    @property
    @lru_cache(maxsize=None) # Caches the result after the first call
    def mongo_client(self):
        """Lazily creates and returns the MongoClient instance."""
        if self.closing:
            delattr(self, 'mongo_client')
            return None
        lg.debug("MongoDB client is not initialized. Creating client...")
        mc = MongoClient(
            self.db_ip,
            username=self._mongo_admin_user,
            password=self._mongo_admin_password,
            authSource="admin"
        )
        lg.debug("MongoDB client created successfully.")
        return mc
    

    @property
    @lru_cache(maxsize=None) # Caches the result after the first call
    def mongo_db(self):
        """Lazily creates and returns the Database object using the client."""
        if self.closing:
            delattr(self, 'mongo_db')
            return None
        lg.debug("Establishing MongoDB database connection...")
        # This will automatically trigger the mongo_client property if needed
        conn = self.mongo_client[self.mongo_db_name]
        lg.debug("MongoDB connection established successfully.")
        return conn
    
    @property
    @lru_cache(maxsize=None) # Caches the result after the first call
    def meta_coll(self):
        """Lazily creates and returns the Meta collection using the client."""
        coll = self.mongo_db[self.meta_coll_name]
        lg.debug("Returned meta collection.")
        return coll

    @property
    @lru_cache(maxsize=None) # Caches the result after the first call
    def avail_ds(self) -> set:
        """Lazily creates and returns the set of available iris_db from meta coll using the mongo client."""
        lg.debug("Connecting to meta db to find avail ds")
        # This will automatically trigger the mongo_client property if needed
        try:
            avail_ds = {i['ds_id'] for i in self.meta_coll.find({}, {'_id': 0, 'ds_id': 1})}
        except Exception as e:
            lg.error(f"Error fetching available datasets: {e}")
            raise e
        avail_ds.add(self.meta_coll_name)
        # print(avail_ds)
        lg.debug(f"Fetched List of avail databases -> {avail_ds}")
        return avail_ds

    def get_avail_ds(self) -> set:
        """Get a set of available IRIS ds_id."""
        print(f"Avail Datasets: {self.avail_ds}")
        return self.avail_ds
    get_datasets = get_ds = list_ds = get_avail_ds = get_avail_ds
    
    # @property
    # @lru_cache(maxsize=None) # Caches the result after the first call
    # def coll(self):
    #     """Lazily creates and returns the collection object using the database."""
    #     if self.closing:
    #         delattr(self, 'coll')
    #         return None
    #     lg.debug("Establishing MongoDB collection connection...")
    #     # This will automatically trigger the mongo_db property if needed
    #     coll = self.mongo_db[self.ds_name]
    #     lg.debug("MongoDB collection connection established successfully.")
    #     return coll

    def find_ds(self, ds_id, avail_ds=None, acc=0.4, count=1) -> str|set:
        # 1. Create a mapping from lowercase name to original name.
        mapping = {db.lower(): db for db in (avail_ds or self.avail_ds)}
        # 2. Get the lowercase versions of all available DBs for matching.
        lower_avail_ds = list(mapping.keys())
        # 3. Perform the match on the lowercase versions.
        matches = get_close_matches(ds_id.lower(), lower_avail_ds, n=count, cutoff=acc)
        # 4. If a match is found, use the mapping to return the original name.
        if matches:
            if count > 1:
                res = {mapping[match] for match in matches}
            else:
                res = mapping[matches[0]]
            msg = f"Found matches for {ds_id}: {res}"
            print(msg)
            lg.debug(msg)
            return res
        return None

    # @lru_cache(maxsize=None) # Caches the result after the first call
    def set_meta_primary(self):
        """Get the meta collection"""
        lg.debug("Accessing meta collection...")
        self.ds_id = self.meta_coll_name
        self.coll = self.meta_coll
        return self.meta_coll
    meta_connect = set_meta_primary

    # def meta_connect(self):
    #     """Connect to the meta collection"""
    #     lg.info("Connecting to meta collection...")
    #     return self.meta_coll

    def connect(self, ds_id, acc=0.4) -> Collection:
        """Will connect to the database into a given collection
        It sets the self.ds_id attrib and self.coll
        """
        # if meta is tring to be connected then return the meta collection
        # if ds_id == self.meta_coll_name:
        #     self.ds_id = self.meta_coll_name
        #     # self.coll = self.get_meta_coll()
        #     lg.info(f"Connecting to {ds_id} Collection")
        #     return self.meta_coll
        
        avail_ds = self.avail_ds
        if (closest_match := self.find_ds(ds_id=ds_id, avail_ds=avail_ds, acc=acc)):
            self.ds_id = closest_match
            lg.info(f"Connecting to {self.ds_id} Collection")
            # self.ds_prefix=Path(DS_PREFIX)
            # self.ds_path=self.ds_prefix/self.ds_id
        else:
            lg.error(f"{ds_id} Collection Not found in available datasets. List: {avail_ds}")
            return None
        self.coll = self.mongo_db[self.ds_id]
        return self.coll
    get_coll = connect

    # def determine_coll(self,collection=None):
    #     if collection is None:
    #         if self.ds_id == self.meta_coll_name:
    #             collection = self.meta_coll
    #         else:
    #             collection = self.coll
    #     return collection
        
    def update(self, doc, key = None, coll=None):
        """Update a single document in the connected collection"""
        if '_id' in doc:
            key = '_id'
        if key is None:
            key = 'ds_id'
        if coll is None:
            coll = self.coll
        res=coll.update_one({key: doc[key]}, {'$set': doc}, upsert=False)
        lg.info(f"Updated document in {self.ds_id} collection.")
        return res
    
    def insert(self, docs, coll = None):
        if coll is None:
            coll = self.coll
        try:
            if isinstance(docs, dict):
                res = coll.insert_one(docs)
            elif isinstance(docs, list):
                # ignore if duplicate key error
                res = coll.insert_many(docs, ordered=False)
        except Exception as e:
            lg.error(f"Error inserting document into {self.ds_id} collection: {e}")
            res = None
        lg.info(f"Inserted document(s) into {self.ds_id} collection.")
        return res

    ## feature to get a Mongo Collection by getitem on iris_db object
    def __getitem__(self, coll_name):
        """Get a MongoDB collection by name"""
        # if not hasattr(self, 'mongo_conn'):
            # lg.error("No MongoDB connection established.")
            # return None
        return self.get_coll(coll_name)

    # def find(self, query, proj=None, collection=None):
    #     """Get data from the connected collection"""
    #     if collection is None:
    #         if not hasattr(self, 'coll'):
    #             lg.error("No collection connected. Please call connect() first.")
    #             return None
    #         collection = self.coll

    #     return collection.find(query, proj)

    # def update_one(self, doc, key=None, collection=None):
    #     """Update a single document in the connected collection"""
    #     if not hasattr(self, 'coll'):
    #         lg.error("No collection connected. Please call connect() first.")
    #         return None
    #     if collection is None:
    #         collection = self.coll
    #     if key is None:
    #         if '_id' in doc:
    #             key = '_id'
    #         elif self.ds_name=='meta' and 'ds_id' in doc:
    #             key = 'ds_id'
    #         else:
    #             raise ValueError("No valid key found for document.")
    #     if key in doc:
    #         collection.update_one({key: doc[key]}, {'$set': doc}, upsert=True)
    #     else:
    #         raise ValueError(f"Key '{key}' not found in document.")
    #     lg.info(f"Updated document in {self.ds_name} collection.")
    #     return True
    
    # def update_many(self, docs, key=None):
    #     """Update data in the connected collection"""
    #     if not hasattr(self, 'coll'):
    #         lg.error("No collection connected. Please call connect() first.")
    #         return None
    #     if isinstance(docs, dict):
    #         docs = [docs]
    #     for doc in docs:
    #         if key is None:
    #             if '_id' in doc:
    #                 key = '_id'
    #             elif self.ds_name=='meta' and 'ds_id' in doc:
    #                 key = 'ds_id'
    #             else:
    #                 raise ValueError("No valid key found for document.")
    #         if key in doc:
    #             self.coll.update_one({key: doc[key]}, {'$set': doc}, upsert=True)
    #         else:
    #             raise ValueError(f"Key '{key}' not found in document.")
    #     lg.info(f"Updated {len(docs)} documents in {self.ds_name} collection.")
    #     return True



    # def insert_one(self, doc):
    #     """Insert a single document into the connected collection"""
    #     if not hasattr(self, 'coll'):
    #         lg.error("No collection connected. Please call connect() first.")
    #         return None
    #     self.coll.insert_one(doc)
    #     lg.info(f"Inserted document into {self.ds_name} collection.")
    #     return True
    def __enter__(self):
        """Called when entering the 'with' statement."""
        lg.debug("Entering context...")
        return self # Return the instance to be used in the 'with' block
    
    def __exit__(self,*args):
        """Called when exiting the 'with' statement."""
        # This method is always called, ensuring the connection is closed.
        self.close()
        # lg.debug("MongoClient connection closed.")

    def close(self):
        """Explicitly close the mongo client connection"""  
        if self.closing:
            return
        try:
            self.closing = True
            if hasattr(self, 'mongo_client'):
                lg.debug("checking for mongoclient")
                self.mongo_client.close()
                lg.info("MongoDB client connection closed successfully.")
            else:
                lg.debug("MongoDB client was not initialized; no connection to close.")
        except Exception as e:
            lg.error(f"Error closing MongoDB client connection: {e}")
    
    def __del__(self):
        try:
            if self.closing:
                return
            self.close()
            lg.debug("MongoDB connection closed successfully.")
        except Exception as e:
            lg.error(str(e))
         

In [None]:
   
# %% TESTING
# db=IrisDB()
# # print(db.find_ds('casia-v1'))
# db.list_ds()
# db["meta"]
# # db.update(
# #     {
# #         'ds_id': db.find_ds('casia_v1'),
# #         'specs': {
# #             'num_eyes_per_person': 1
# #         }
# #     }
# # )
# # print(db.connect('casiav3'))
# # sleep(1)
# db.close()

In [None]:
metadata_casia_v1

In [None]:
# from datetime import datetime
with IrisDB() as db:
    # Use the db object to interact with the database
    meta=db['meta']
    print(db.list_ds())
    # db.update(metadata_casia_v1)
    # db.insert(metadata_casia_v1)
    # print(db.update({
    #     'ds_id': db.find_ds('casia-v1'),
    #     'injested_at': datetime.now()
    # }))
    # meta.update_one({'ds_id': db.find_ds('casia-v1')},
    #     {
    #         '$set':{
    #         'ds_id': db.find_ds('casia-v1'),
    #         'injested_at': datetime.now()
    #         }
    #     }
    # )
    # print(db.find_ds('cas'))
    # meta.create_index([('ds_id', 1)], unique=True)
    # db.update_data(
    #     [{
    #         "ds_id": db.find_ds('casia-v1'),
    #         'orig_base': ORIG_DB_BASE_.as_posix(),
    #         'base': DS_BASE_.as_posix()
    #     }],
    #     key='ds_id'
    # )
    # print(db.mongo_conn[db.meta_coll].find({}, {'_id': 0, 'ds_id': 1}))
    
    pass

In [None]:
#%% Working on uploading CASIA_v1 data to CASIA_v1 collection as well as making a copy of each image in the BASE_ path inside BASE_/orig/
from pathlib import Path


In [None]:

#%% 
DS_ID = "CASIA_v1"
DS_NAME_ = Path(DS_ID)
DS_BASE_ = DB_BASE_ / DS_NAME_
fm.ensure_exists(DS_BASE_)

ORIG_DB_BASE_ = Path("~/datasets/iris_datasets/CASIA/V1/CASIA-IrisV1/CASIA-IrisV1/CASIA Iris Image Database (version 1.0)").expanduser()

images = list(ORIG_DB_BASE_.rglob("*.bmp"))


In [None]:
test=images

In [None]:
docs=[]
# with IrisDB() as db:
db = IrisDB('casia-v1')
ds_id = db.find_ds('casia-v1')
meta_doc = db.meta_coll.find_one({'ds_id': ds_id})
try:
    db.coll.create_index([("image_id", ASCENDING)], unique=True)
    # create a 
    db.coll.create_index([("person_id",ASCENDING)])
    db.coll.create_index([("eye_id",ASCENDING)])

    # index status which can be one of 'orig','norm','segm'
    # db.coll.create_index([("status",ASCENDING)])
    # index path as well
    db.coll.create_index([("paths.rel_path_",ASCENDING)],unique=True)
    # db.coll.create_index([("paths.common_path_",ASCENDING)],unique=True)
    # db.coll.create_index([("paths.full_path_",ASCENDING)],unique=True)
    # db.coll.create_index([("person_id",ASCENDING),("person_sample_id",ASCENDING)],unique=True)
    # db.coll.create_index([("eye_id",ASCENDING),("sample_id",ASCENDING)],unique=True)
except Exception as e:
    lg.error(f"Error creating indexes for {ds_id} collection: {e}")
for img in test:
    person_id,session_id,sample_id = map(int, img.stem.split("_"))
    eye = "L"
    eye_id = f"{person_id}_{eye}"
    ext = img.suffix
    # renaming the 2nd session images into continuous id_s
    if session_id == 2:
        sample_id = 3+sample_id
    person_sample_id=sample_id # in this case
    new_filename_ = Path(f"{eye_id}_{sample_id}{ext}")
    # status = 'orig'
    base_ = DS_BASE_
    folder_tags = ['orig']
    rel_path_ = eye_id / new_filename_
    # orig_path_ = 'orig'/rel_path_
    # full_orig_path = DS_BASE_ / orig_path_
    # norm_path_ = 'norm'/rel_path_
    # full_norm_path_ = DS_BASE_ / norm_path_
    # seg_path_ = 'seg'/rel_path_
    # full_seg_path_ = DS_BASE_ / seg_path_


    paths = {
        'base_': str(base_),
        'rel_path_': str(rel_path_),
        # 'orig_path_': str(orig_path_),
        # 'full_orig_path_': str(full_orig_path),
        # 'norm_path_': str(norm_path_),
        # 'full_norm_path_': str(full_norm_path_),
        # 'seg_path_': str(seg_path_),
        # 'full_seg_path_': str(full_seg_path_)
    }
    
    orig_paths = {
        'base_': str(ORIG_DB_BASE_),
        'rel_path_': str(img.relative_to(ORIG_DB_BASE_)),
        'orig_path_': str(img),
    }
    
    doc = {
        'ds_id': ds_id,
        'person_id': str(person_id),
        'eye_id': eye_id,  # only one eye per person in this dataset
        'sample_id': str(sample_id),
        'person_sample_id': str(person_sample_id),
        'image_id': str(new_filename_.stem),
        'file_name': str(new_filename_),
        'session_id': str(session_id),
        # 'status': status,
        'eye': eye,
        'img_specs': meta_doc['img_specs'],
        'paths': paths,
        'orig_paths': orig_paths,
        'injested_at': datetime.now()
    }
    # docs.append(doc)
    db.insert(doc)
    fm.copy_file(img, base_ / 'orig' / new_filename_)

# db.insert(docs)
db.close()

