In [323]:
import pymongo
import pandas as pd
import os
import csv
import logging as lg
import ast

class loggerClass:
    def __init__(self):
        logger = lg.getLogger()
        fhandler = lg.FileHandler(filename='MongoDB_Operation.log', mode='a')
        formatter = lg.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        fhandler.setFormatter(formatter)
        if (logger.hasHandlers()):
            logger.handlers.clear()
        logger.addHandler(fhandler)
        logger.setLevel(lg.INFO)
     
    def logFunc(self,lg_type,content):
        if lg_type == "INFO":
            lg.info(str(content))
            lg.shutdown()
        elif lg_type == "ERROR":
            lg.error(str(content))
            lg.shutdown()
        else:
            lg.info(str(content))
            lg.shutdown()

In [324]:
class MongoDBOperation:
    def __init__(self,connection_url,dbname):
        self.__connection_url = connection_url
        self.__dbname= dbname
        self.__lg = loggerClass()
        
    def db_Connection(self):
        try:
            client = pymongo.MongoClient(self.__connection_url)
            return client[self.__dbname]
        except Exception as e:
            self.__lg.logFunc('error', "Error while establishing connection with Mongo DB:" + str(e))
            
    def checkExistence_COL(self,collection_name):
        try:
            db = self.db_Connection()
            collection_list = db.list_collection_names()
            if collection_name in collection_list:
                return True
            else:
                return False
        except Exception as e:
            self.__lg.logFunc('error', "Error while finding collection in Mongo DB:" + str(e))    
        
                
    def read_File(self,file_path,file_name,delim):
        """Reads file and returns DataFrame
            Parameters: File Path,File name, Delimiter
            Returns Dataframe"""
        try:
            file_location = file_path + '\\' + file_name
            self.__lg.logFunc("info", f"Opening File {file_location}")
            df = pd.read_csv(file_location, delimiter=delim)
            df.columns = df.columns.str.replace(' ','_')
            self.__lg.logFunc("info", f"File read complete")
            return df

        except Exception as e:
            self.__lg.logFunc('error', f"Error while reading file:{file_path} {file_name}" + str(e))
          
    def insert_Data(self,file_path,file_name,delim,collection_name):
        """Inserts multiple documents for given collection name
            Parameters: File Path,File name, Delimiter,Collection_name"""
        try:
            db = self.db_Connection()
            collec = db[collection_name]
            data = self.read_File(file_path,file_name,delim)
            collec.insert_many(data.to_dict('records'))
            print(f"Inserted {data.shape[0]} records")
            self.__lg.logFunc("info", f"Inserted {data.shape[0]} records")

        except Exception as e:
            self.__lg.logFunc('error', f"Error while inserting data from file:{file_path} {file_name}" + str(e))

    def find_one_Data(self,collection_name):
        """Finds data from given collection name
        Parameters: Collection name"""
        try:
            if(self.checkExistence_COL(collection_name)):
                db = self.db_Connection()
                collec = db[collection_name]
                data = collec.find_one()
                print(data)
                self.__lg.logFunc("info", f"The find_one method on {collection_name} has been performed successfully")
            else:
                self.__lg.logFunc('error', f"Error while finding collection :{collection_name}")
                
        except Exception as e:
            self.__lg.logFunc('error', f"Error while finding data from collection:{collection_name}" + str(e))  
            
    def find_all_Data(self,collection_name):
        """Finds data from given collection name
        Parameters: Collection name"""
        try:
            if(self.checkExistence_COL(collection_name)):
                db = self.db_Connection()
                collec = db[collection_name]
                data = collec.find()
                for idx,record in enumerate(data):
                    print(f"{idx} : {record}")
                self.__lg.logFunc("info", f"The find method on {collection_name} has been performed successfully")
            else:
                self.__lg.logFunc('error', f"Error while finding collection :{collection_name}")
                
        except Exception as e:
            self.__lg.logFunc('error', f"Error while finding data from collection:{collection_name}" + str(e))  
                              
    def filter_all_Data(self,collection_name,**kwargs):
        """Finds data from given collection name
        Parameters: Collection name, Selection Criteria in Key-value pair(Eg: Column name:[[operator, [List of filter values]])"""
        try:
            if(self.checkExistence_COL(collection_name)):
                db = self.db_Connection()
                collec = db[collection_name]
                filter_criteria = '{' + ", ".join({'"'+key+'"'" :{'"+kwargs[key][0]+"':"+str(kwargs[key][1])+"}" for key in kwargs.keys()}) +'}'
                filter_criteria = ast.literal_eval(filter_criteria)
                data = collec.find(filter_criteria)
                count = collec.count_documents(filter_criteria)
                print(f"Found {count} records")
                for doc in data:
                    print(doc)
                self.__lg.logFunc("info", f"The filter criteria {filter_criteria} of {collection_name} has been performed successfully")
                self.__lg.logFunc("info", str(count) + ' records fetched')
                
            else:
                self.__lg.logFunc('error', f"Error while finding collection :{collection_name}")
                
        except Exception as e:
            self.__lg.logFunc('error', f"Error while finding data from collection:{collection_name}" + str(e))      
            
    def update_Data(self,collection_name,filter_criteria,update_criteria):
        """Updates data for given collection name
        Parameters: Collection name, Filter Criteria in dictionary format(Eg: Column name:[[operator, [List of filter values]]),
        Update Criteria in Dictionary format """
        try:
            if(self.checkExistence_COL(collection_name)):
                db = self.db_Connection()
                collec = db[collection_name]
                filter_criteria_ = '{' + ", ".join({'"'+key+'"'" :{'"+filter_criteria[key][0]+"':"+str(filter_criteria[key][1])+"}" for key in filter_criteria.keys()}) +'}'
                filter_criteria_ = ast.literal_eval(filter_criteria_)
                data = collec.find(filter_criteria_)
                count = collec.count_documents(filter_criteria_)
                update_criteria_ = {"$set" : update_criteria}
                collec.update_many(filter_criteria_,update_criteria_)
                print(f"Updated {count} records")
                self.__lg.logFunc("info", f"The Update criteria {update_criteria_} over {collection_name} has been performed successfully")
                self.__lg.logFunc("info", str(count) + ' records updated')

            else:
                self.__lg.logFunc('error', f"Error while finding collection :{collection_name}")
                
        except Exception as e:
            self.__lg.logFunc('error', f"Error while updating data in collection:{collection_name} with criteria {update_criteria_}" + str(e)) 
        
    def delete_many_Data(self,collection_name,**kwargs):
        """Deletes data from given collection name
        Parameters: Collection name, Selection Criteria in Key-value pair(Eg: Column name:[[operator, [List of filter values]])"""
        try:
            if(self.checkExistence_COL(collection_name)):
                db = self.db_Connection()
                collec = db[collection_name]
                filter_criteria = '{' + ", ".join({'"'+key+'"'" :{'"+kwargs[key][0]+"':"+str(kwargs[key][1])+"}" for key in kwargs.keys()}) +'}'
                filter_criteria = ast.literal_eval(filter_criteria)
                count = collec.count_documents(filter_criteria)
                data = collec.delete_many(filter_criteria)
                print(f"Deleted {count} records")
                self.__lg.logFunc("info", f"The Delete criteria {filter_criteria} of {collection_name} has been performed successfully")
                self.__lg.logFunc("info", str(count) + ' records deleted')
                
            else:
                self.__lg.logFunc('error', f"Error while finding collection :{collection_name}")
                
        except Exception as e:
            self.__lg.logFunc('error', f"Error while deleting data from collection:{collection_name}" + str(e))                

    def drop_collection(self,collection_name):
        """Drops given collection name
        Parameters: Collection name"""
        try:
            if(self.checkExistence_COL(collection_name)):
                db = self.db_Connection()
                collec = db[collection_name]
                collec.drop()
                print(f"{collection_name} dropped")
                self.__lg.logFunc("info", f"The collection {collection_name} has been dropped")
                
            else:
                self.__lg.logFunc('error', f"Error while finding collection :{collection_name}")
                            
        except Exception as e:
            self.__lg.logFunc('error', f"Error while dropping collection:{collection_name}" + str(e))                
        


In [325]:
mdb = MongoDBOperation("mongodb+srv://mongodb:mongodb@cluster0.lrz7y.mongodb.net/myFirstDatabase?retryWrites=true&w=majority",'carbon_db')

In [326]:
mdb.insert_Data("D:\python_1","carbon_nanotubes.csv",";","carbon_collection")

Inserted 10721 records


In [327]:
mdb.checkExistence_COL('carbon_collection')

True

In [328]:
mdb.find_one_Data('carbon_collection')

{'_id': ObjectId('6216b4a685eeb4bf2dbbd720'), 'Chiral_indice_n': 2, 'Chiral_indice_m': 1, 'Initial_atomic_coordinate_u': '0,679005', 'Initial_atomic_coordinate_v': '0,701318', 'Initial_atomic_coordinate_w': '0,017033', "Calculated_atomic_coordinates_u'": '0,721039', "Calculated_atomic_coordinates_v'": '0,730232', "Calculated_atomic_coordinates_w'": '0,017014'}


In [329]:
mdb.find_all_Data('carbon_collection')

0 : {'_id': ObjectId('6216b4a685eeb4bf2dbbd720'), 'Chiral_indice_n': 2, 'Chiral_indice_m': 1, 'Initial_atomic_coordinate_u': '0,679005', 'Initial_atomic_coordinate_v': '0,701318', 'Initial_atomic_coordinate_w': '0,017033', "Calculated_atomic_coordinates_u'": '0,721039', "Calculated_atomic_coordinates_v'": '0,730232', "Calculated_atomic_coordinates_w'": '0,017014'}
1 : {'_id': ObjectId('6216b4a685eeb4bf2dbbd721'), 'Chiral_indice_n': 2, 'Chiral_indice_m': 1, 'Initial_atomic_coordinate_u': '0,717298', 'Initial_atomic_coordinate_v': '0,642129', 'Initial_atomic_coordinate_w': '0,231319', "Calculated_atomic_coordinates_u'": '0,738414', "Calculated_atomic_coordinates_v'": '0,65675', "Calculated_atomic_coordinates_w'": '0,232369'}
2 : {'_id': ObjectId('6216b4a685eeb4bf2dbbd722'), 'Chiral_indice_n': 2, 'Chiral_indice_m': 1, 'Initial_atomic_coordinate_u': '0,489336', 'Initial_atomic_coordinate_v': '0,303751', 'Initial_atomic_coordinate_w': '0,088462', "Calculated_atomic_coordinates_u'": '0,47767

IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.

Current values:
NotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
NotebookApp.rate_limit_window=3.0 (secs)



In [330]:
mdb.filter_all_Data("carbon_collection",Chiral_indice_n = ['$in',[2]],Chiral_indice_m=['$in',[1,2]])

Found 28 records
{'_id': ObjectId('6216b4a685eeb4bf2dbbd720'), 'Chiral_indice_n': 2, 'Chiral_indice_m': 1, 'Initial_atomic_coordinate_u': '0,679005', 'Initial_atomic_coordinate_v': '0,701318', 'Initial_atomic_coordinate_w': '0,017033', "Calculated_atomic_coordinates_u'": '0,721039', "Calculated_atomic_coordinates_v'": '0,730232', "Calculated_atomic_coordinates_w'": '0,017014'}
{'_id': ObjectId('6216b4a685eeb4bf2dbbd721'), 'Chiral_indice_n': 2, 'Chiral_indice_m': 1, 'Initial_atomic_coordinate_u': '0,717298', 'Initial_atomic_coordinate_v': '0,642129', 'Initial_atomic_coordinate_w': '0,231319', "Calculated_atomic_coordinates_u'": '0,738414', "Calculated_atomic_coordinates_v'": '0,65675', "Calculated_atomic_coordinates_w'": '0,232369'}
{'_id': ObjectId('6216b4a685eeb4bf2dbbd722'), 'Chiral_indice_n': 2, 'Chiral_indice_m': 1, 'Initial_atomic_coordinate_u': '0,489336', 'Initial_atomic_coordinate_v': '0,303751', 'Initial_atomic_coordinate_w': '0,088462', "Calculated_atomic_coordinates_u'": '0,

In [331]:
mdb.update_Data("carbon_collection",{"Chiral_indice_n" : ['$in',[2]],"Chiral_indice_m":['$in',[1,2]]},{"Chiral_indice_n":33})

Updated 28 records


In [332]:
mdb.filter_all_Data("carbon_collection",Chiral_indice_n = ['$in',[2]],Chiral_indice_m=['$in',[1,2]])
#No records found since records were updated

Found 0 records


In [333]:
mdb.drop_collection("carbon_collection")

carbon_collection dropped
