In [8]:
import pymysql

timeout = 10
connection = pymysql.connect(
  charset="utf8mb4",
  connect_timeout=timeout,
  cursorclass=pymysql.cursors.DictCursor,
  db="defaultdb",
  host="mysql-adcb970-yukichen-8278.g.aivencloud.com",
  password="REMOVED_SECRET",
  read_timeout=timeout,
  port=12955,
  user="avnadmin",
  write_timeout=timeout,
)
  
print("Database connection successful!")

Database connection successful!


In [9]:
cursor = connection.cursor()

In [None]:
# Create a table
create_table_query = """
CREATE TABLE employees (
    employee_id INT AUTO_INCREMENT PRIMARY KEY,
    first_name VARCHAR(50) NOT NULL,
    last_name VARCHAR(50) NOT NULL,
    email VARCHAR(100) NOT NULL UNIQUE,
    date_of_birth DATE NOT NULL,
    salary DECIMAL(10, 2) NOT NULL
)
"""

cursor.execute(create_table_query)
connection.commit() 
print("Table created successfully!")

In [5]:
insert_query = """
INSERT INTO employees (first_name, last_name, email, date_of_birth, salary) 
VALUES (%s, %s, %s, %s, %s);
"""

data = ('John', 'Doe', 'john.doe@example.com', '1985-03-25', 50000.00)

cursor.execute(insert_query, data)
connection.commit()  
print("Data inserted successfully!")

Data inserted successfully!


In [6]:
data_to_insert = [
    ('Jane', 'Doe', 'jane.doe@example.com', '1990-07-15', 55000.00),
    ('Alice', 'Johnson', 'alice.johnson@example.com', '1982-11-05', 62000.00)
]

cursor.executemany(insert_query, data_to_insert)
connection.commit()
print("Multiple data inserted successfully!")

Multiple data inserted successfully!


In [7]:
cursor.close()
connection.close()

In [16]:
import pymysql
import pandas as pd
from typing import List, Dict, Any, Optional
import logging

# Set up logger
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class mysql_crud_operations:
    def __init__(self, host: str, user: str, password: str, database: str, port: int, use_ssl: bool = False, ssl_ca: Optional[str] = None):
        self.host = host
        self.user = user
        self.password = password
        self.database = database
        self.port = port
        self.use_ssl = use_ssl
        self.ssl_ca = ssl_ca

    def create_connection(self):
        ssl_settings = {'ca': self.ssl_ca} if self.use_ssl else None
        try:
            connection = pymysql.connect(
                host=self.host,
                user=self.user,
                passwd=self.password,
                db=self.database,
                port=self.port,
                charset='utf8mb4',
                cursorclass=pymysql.cursors.DictCursor,
                ssl=ssl_settings
            )
            logger.info("Database connection successful!")
            return connection
        except pymysql.MySQLError as e:
            logger.error(f"Error connecting to the database: {e}")
            return None

    def execute_query(self, query: str, params: Optional[List[Any]] = None) -> None:
        connection = self.create_connection()
        if connection is not None:
            try:
                cursor = connection.cursor()
                cursor.execute(query, params)
                connection.commit()
                logger.info(f"Query executed: {query}")
            except pymysql.MySQLError as e:
                logger.error(f"An error occurred: {e}")
            finally:
                cursor.close()
                connection.close()

    def insert_record(self, table_name: str, data: Dict[str, Any]) -> None:
        keys = ", ".join(data.keys())
        values = tuple(data.values())
        placeholders = ", ".join(["%s"] * len(data))
        query = f"INSERT INTO {table_name} ({keys}) VALUES ({placeholders})"
        self.execute_query(query, values)

    def find_all_records(self, table_name: str) -> None:
        query = f"SELECT * FROM {table_name}"
        connection = self.create_connection()
        if connection is not None:
            try:
                cursor = connection.cursor()
                cursor.execute(query)
                records = cursor.fetchall()
                for record in records:
                    logger.info(record)
            finally:
                cursor.close()
                connection.close()

    def find_records(self, table_name: str, conditions: Dict[str, Any]) -> None:
        condition_strings = " AND ".join([f"{k} = %s" for k in conditions.keys()])
        values = tuple(conditions.values())
        query = f"SELECT * FROM {table_name} WHERE {condition_strings}"
        self.execute_query(query, values)

    def update_records(self, table_name: str, data: Dict[str, Any], conditions: Dict[str, Any]) -> None:
        set_clause = ", ".join([f"{key} = %s" for key in data.keys()])
        condition_clause = " AND ".join([f"{key} = %s" for key in conditions.keys()])
        query = f"UPDATE {table_name} SET {set_clause} WHERE {condition_clause}"
        values = list(data.values()) + list(conditions.values())
        self.execute_query(query, values)

    def delete_records(self, table_name: str, conditions: Dict[str, Any]) -> None:
        condition_clause = " AND ".join([f"{key} = %s" for key in conditions.keys()])
        query = f"DELETE FROM {table_name} WHERE {condition_clause}"
        values = tuple(conditions.values())
        self.execute_query(query, values)

    def insert_in_bulk(self, datafile: str, table_name: str) -> None:
        if datafile.endswith(".csv"):
            data = pd.read_csv(datafile)
        elif datafile.endswith(".xlsx"):
            data = pd.read_excel(datafile)
        
        records = data.to_dict(orient='records')
        query = f"INSERT INTO {table_name} ({', '.join(records[0].keys())}) VALUES ({', '.join(['%s'] * len(records[0]))})"
        values = [tuple(record.values()) for record in records]
        
        connection = self.create_connection()
        if connection is not None:
            try:
                cursor = connection.cursor()
                cursor.executemany(query, values)
                connection.commit()
                logger.info(f"Bulk insert completed for {len(records)} records.")
            finally:
                cursor.close()
                connection.close()


In [19]:
host="mysql-adcb970-yukichen-8278.g.aivencloud.com"
user="avnadmin"
password="************"
database="defaultdb"
port=12955

In [25]:
# Initialize the MySQL CRUD operations class
crud = mysql_crud_operations(host=host, user=user, password=password, database=database, port=port)


# Data to insert
employee_data = {
    'first_name': 'Max',
    'last_name': 'Min',
    'email': 'max.min@example.com',
    'date_of_birth' : '1985-03-28',
    'salary' : '50000.00'
}

# Insert the record
crud.insert_record('employees', employee_data)


INFO:__main__:Database connection successful!
INFO:__main__:Query executed: INSERT INTO employees (first_name, last_name, email, date_of_birth, salary) VALUES (%s, %s, %s, %s, %s)
