In [1]:
pip install mysql-connector-python

Note: you may need to restart the kernel to use updated packages.


In [93]:
import mysql.connector
from mysql.connector import Error
from contextlib import contextmanager
import logging
logging.basicConfig(filename = 'logger.log', level = logging.DEBUG, filemode = 'w', format = "%(asctime)s %(levelname)s %(message)s")


class MySQLDatabase:
    def __init__(self,user,password, port,host,database =None):
        self.user = user
        self.password = password
        self.port = port
        self.host = host
        self.database = database
        self.connection = None

    def connect_to_mysql(self):
        """ Establising the connection to the MYSQL server """
        try:
            self.connection = mysql.connector.connect(
                host = self.host,
                port = self.port,
                user = self.user,
                password = self.password
            )
            if self.connection.is_connected():
                logging.info("successfully connected to mysql server")
                print("successfully connected to mysql server")
        except Error as e:
            logging.exception(f"Error: {e} occurred while connecting to mysql. ")
            print (f"Error: {e}")  

    @contextmanager
    def get_cursor(self, dictionary=False):
        """Context manager to get a cursor and ensure it is closed."""
        cursor = self.connection.cursor(dictionary=dictionary)
        try:
            yield cursor
        finally:
            cursor.close()
            
    def create_database(self, database_name):
        """Creates a new database if it does not already exist."""
        try:
            with self.get_cursor() as cursor:
                cursor.execute(f"show databases like '{database_name}'")
                result = cursor.fetchone()
                if result:
                    logging.error(f"Database '{database_name}' already exists")
                    print(f"Database '{database_name}' already exists")
                else:
                    cursor.execute(f"CREATE DATABASE {database_name}")
                    logging.info(f"Database '{database_name}' created successfully.")
                    print(f"Database '{database_name}' created successfully.")
        except Error as e:
            logging.exception(f"Error: {e} occurred during database creation")
            print(f"Error: {e} occurred during database creation")

    
    def use_database (self, database_name):
        """Select the database to use."""
        cursor = None
        try: 
            with self.get_cursor() as cursor:
                logging.info(f"Switched to database '{database_name}'.")
                cursor.execute(f"USE {database_name}")
                print(f"Switched to database '{database_name}'.")
        except Error as e:
            logging.exception(f"Error: {e} occurred during switching database")
            print(f"Error: {e} occurred during switching database")


    def close(self):
        """Closes the connection to the MySQL server."""
        if self.connection and self.connection.is_connected():
            self.connection.close()
            logging.info("Connection closed.")
            print("Connection closed.")

    def create_table(self,table_name,table_definition):
        """Creates a table in the database, distinguishing whether it was newly created or already existed."""
        try:
            with self.get_cursor() as cursor: 
                # Check if the table already exists
                cursor.execute(f"SHOW TABLES LIKE '{table_name}'")
                result = cursor.fetchone()
                if result:
                    logging.info(f"Table '{table_name}' already exists.")
                    print(f"Table '{table_name}' already exists.")
                else:
                    cursor.execute(f"CREATE TABLE {table_name} ({table_definition})")
                    logging.info(f"Table '{table_name} created successfully")
                    print(f"Table '{table_name}' created successfully.")
        except Error as e:
            logging.info(f"Error: {e} occured while creating table")
            print(f"Error: {e} occured while creating table'")
        finally:
            if cursor:
                cursor.close()

    def insert(self, table, columns, values):
        """Inserts data into a table."""
        placeholders = ', '.join(['%s'] * len(values))     # values = ('Alice Johnson', 'Data Analyst',70000.00)
        #When using %s, the database driver automatically handles the conversion of Python data types into the appropriate SQL data types. 
        #For example, a Python int is converted to an SQL INTEGER, and a Python str is converted to an SQL VARCHAR.
        columns_str = ', '.join(columns)  # columns = ['name','position','salary']
        #['%s'] * len(values) creates a list where the string '%s' is repeated as many times as there are elements in values.
        query = f"INSERT INTO {table} ({columns_str}) VALUES ({placeholders})"
        
        try:
            with self.get_cursor() as cursor:
                cursor.execute(query, values)
                self.connection.commit()
                if cursor.rowcount > 0:
                    logging.info(f"Record inserted successfully. Rows affected: {cursor.rowcount}.")
                    print(f"Record inserted successfully. Rows affected: {cursor.rowcount}.")
                else:
                    print("No record was inserted.")
        except Error as e:
            logging.error(f"Error: {e} inserting data into a table.")
            print(f"Error: {e} inserting data into a table.")
            
    def update(self, table, set_values, where_clause):
        """Updates records in a table."""
        set_clause = ', '.join([f"{col} = %s" for col in set_values.keys()])
        query = f"UPDATE {table} SET {set_clause} WHERE {where_clause}"
        
        try:
            with self.get_cursor() as cursor:
                cursor.execute(query, list(set_values.values()))
                self.connection.commit()
                if cursor.rowcount > 0:
                    logging.info(f"Record(s) updated successfully. Rows affected: {cursor.rowcount}.")
                    print(f"Record(s) updated successfully. Rows affected: {cursor.rowcount}.")
                else:
                    print("No records were updated because no matching records were found.")
        except Error as e:
            logging.error(f"Error: {e} while updating table")
            print(f"Error: {e} while updating table")

    def delete(self, table, where_clause):
        """Deletes records from a table."""
        query = f"DELETE FROM {table} WHERE {where_clause}"
        
        try:
            with self.get_cursor() as cursor:
                cursor.execute(query)
                affected_rows = cursor.rowcount  # Number of rows affected by the query
                self.connection.commit()
                
                if affected_rows > 0:
                    logging.info("Record(s) deleted successfully. Rows affected: {affected_rows}.")
                    print(f"Record(s) deleted successfully. Rows affected: {affected_rows}.")
                else:
                    print("No records were deleted because no matching records were found.")
        except Error as e:
            logging.error(f"Error: {e} while deleting table")
            print(f"Error: {e} while deleting table")

    def fetch_all(self, query, params=None):
        """Fetches all records from a query."""
        try:
            with self.get_cursor(dictionary=True) as cursor:
                cursor.execute(query, params)
                result = cursor.fetchall()
                logging.info("fetched all the data based on the queries")
                return result
        except Error as e:
            logging.error(f"Error: {e} fetching all the data based on the queries")
            print(f"Error: {e} fetching all the data based on the queries")
            return []

    def fetch_one(self, query, params=None):
        """Fetches a single row from the result of the query."""
        try:
            with self.get_cursor(dictionary=True) as cursor:
                cursor.execute(query, params)
                result = cursor.fetchone()
                
                # Consume any remaining results to avoid the "Unread result found" error
                while cursor.nextset():
                    cursor.fetchall()
                logging.info("fetched single row based on the queries")
                return result
        except Error as e:
            logging.error(f"Error: {e} fetching a single row record based on the queries")
            print(f"Error: {e} fetching a single row record based on the queries")
            return None
    


In [95]:
abc = MySQLDatabase(host = '127.0.0.1',port = 3306,user = 'root',password = 'Abhi@1972',database = 'savdb')

In [97]:
abc.connect_to_mysql()

successfully connected to mysql server


In [98]:
abc.create_database('savdb')

Database 'savdb' already exists


In [101]:
# abc.close()

In [103]:
abc.create_database("new_db_from_jupyter")

Database 'new_db_from_jupyter' already exists


In [105]:
abc.use_database("new_db_from_jupyter")

Switched to database 'new_db_from_jupyter'.


In [107]:
table_name = "employees"
table_definition = """
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100),
position VARCHAR(100),
salary DECIMAL(10,2)
"""
abc.create_table(table_name,table_definition)

Table 'employees' already exists.


In [109]:
table_name = "employees"
columns = ['name','position','salary']
values = ('Alice Johnson', 'Data Analyst',70000.00)

In [111]:
abc.insert(table_name,columns,values)

Record inserted successfully. Rows affected: 1.


In [113]:
QUERY = "SELECT * FROM EMPLOYEES"
abc.fetch_all(QUERY)

[{'id': 1,
  'name': 'Alice Johnson',
  'position': 'Data Analyst',
  'salary': Decimal('70000.00')},
 {'id': 2,
  'name': 'John Giblson',
  'position': 'Senior Data Analyst',
  'salary': Decimal('829829.88')},
 {'id': 3,
  'name': 'Alice Johnson',
  'position': 'Data Analyst',
  'salary': Decimal('70000.00')},
 {'id': 4,
  'name': 'Alice Johnson',
  'position': 'Data Analyst',
  'salary': Decimal('70000.00')},
 {'id': 5,
  'name': 'Alice Johnson',
  'position': 'Data Analyst',
  'salary': Decimal('70000.00')},
 {'id': 7,
  'name': 'Alice Johnson',
  'position': 'Data Analyst',
  'salary': Decimal('70000.00')},
 {'id': 8,
  'name': 'Alice Johnson',
  'position': 'Data Analyst',
  'salary': Decimal('70000.00')},
 {'id': 10,
  'name': 'Alice Johnson',
  'position': 'Data Analyst',
  'salary': Decimal('70000.00')},
 {'id': 11,
  'name': 'Alice Johnson',
  'position': 'Data Analyst',
  'salary': Decimal('70000.00')}]

In [115]:
QUERY = "SELECT * FROM employees WHERE id = %s"
PARAMS = (1,)
RESULT = abc.fetch_one(QUERY,PARAMS)
print(RESULT)

{'id': 1, 'name': 'Alice Johnson', 'position': 'Data Analyst', 'salary': Decimal('70000.00')}


In [117]:
QUERY = "SELECT * FROM employees"
result = abc.fetch_one(QUERY)
print(result)

{'id': 1, 'name': 'Alice Johnson', 'position': 'Data Analyst', 'salary': Decimal('70000.00')}


In [119]:
table = "employees"
set_values = {'name' : 'John Giblson', 'position' : 'Senior Data Analyst','salary' : '829829.88'}
where_clause = "id = '2'"

abc.update(table,set_values,where_clause)

No records were updated because no matching records were found.


In [121]:
table = "employees"
set_values = {'name' : 'John Giblson', 'position' : 'Senior Data Analyst','salary' : '829829.88'}
where_clause = "id = '3'"

abc.update(table,set_values,where_clause)

Record(s) updated successfully. Rows affected: 1.


In [123]:
QUERY = "SELECT * FROM EMPLOYEES"
abc.fetch_all(QUERY)

[{'id': 1,
  'name': 'Alice Johnson',
  'position': 'Data Analyst',
  'salary': Decimal('70000.00')},
 {'id': 2,
  'name': 'John Giblson',
  'position': 'Senior Data Analyst',
  'salary': Decimal('829829.88')},
 {'id': 3,
  'name': 'John Giblson',
  'position': 'Senior Data Analyst',
  'salary': Decimal('829829.88')},
 {'id': 4,
  'name': 'Alice Johnson',
  'position': 'Data Analyst',
  'salary': Decimal('70000.00')},
 {'id': 5,
  'name': 'Alice Johnson',
  'position': 'Data Analyst',
  'salary': Decimal('70000.00')},
 {'id': 7,
  'name': 'Alice Johnson',
  'position': 'Data Analyst',
  'salary': Decimal('70000.00')},
 {'id': 8,
  'name': 'Alice Johnson',
  'position': 'Data Analyst',
  'salary': Decimal('70000.00')},
 {'id': 10,
  'name': 'Alice Johnson',
  'position': 'Data Analyst',
  'salary': Decimal('70000.00')},
 {'id': 11,
  'name': 'Alice Johnson',
  'position': 'Data Analyst',
  'salary': Decimal('70000.00')}]

In [127]:
table = "employees"
where_clause = "id = '11'"
abc.delete(table,where_clause)

Record(s) deleted successfully. Rows affected: 1.


In [129]:
table = "employees"
where_clause = "id = '11'"
abc.delete(table,where_clause)

No records were deleted because no matching records were found.


In [131]:
QUERY = "SELECT * FROM EMPLOYEES"
abc.fetch_all(QUERY)

[{'id': 1,
  'name': 'Alice Johnson',
  'position': 'Data Analyst',
  'salary': Decimal('70000.00')},
 {'id': 2,
  'name': 'John Giblson',
  'position': 'Senior Data Analyst',
  'salary': Decimal('829829.88')},
 {'id': 3,
  'name': 'John Giblson',
  'position': 'Senior Data Analyst',
  'salary': Decimal('829829.88')},
 {'id': 4,
  'name': 'Alice Johnson',
  'position': 'Data Analyst',
  'salary': Decimal('70000.00')},
 {'id': 5,
  'name': 'Alice Johnson',
  'position': 'Data Analyst',
  'salary': Decimal('70000.00')},
 {'id': 7,
  'name': 'Alice Johnson',
  'position': 'Data Analyst',
  'salary': Decimal('70000.00')},
 {'id': 8,
  'name': 'Alice Johnson',
  'position': 'Data Analyst',
  'salary': Decimal('70000.00')},
 {'id': 10,
  'name': 'Alice Johnson',
  'position': 'Data Analyst',
  'salary': Decimal('70000.00')}]