# Connect Database

In [None]:
!pip install pymysql cryptography

In [None]:
import pymysql
connection = pymysql.connect(
    host='localhost',
    port=3306,
    user='root',
    password='root',
    database='classicmodels',
    cursorclass=pymysql.cursors.DictCursor  # Return dictionary of row that access by column name
)
print(connection)

# Select First Row

In [None]:
import pymysql
connection = pymysql.connect(
    host='localhost',
    port=3306,
    user='root',
    password='root',
    database='classicmodels',
    cursorclass=pymysql.cursors.DictCursor  # Return dictionary of row that access by column name
)

cursor = connection.cursor()
cursor.execute('select * from customers')
results = cursor.fetchall()

print(results[0]) # row 0

cursor.close()
connection.close()

# Loop First Row (Dictionary)

In [None]:
import pymysql
connection = pymysql.connect(
    host='localhost',
    port=3306,
    user='root',
    password='root',
    database='classicmodels',
    cursorclass=pymysql.cursors.DictCursor  # Return dictionary of row that access by column name
)

cursor = connection.cursor()
cursor.execute('select * from customers')
results = cursor.fetchall()

for key, value in results[0].items():
    print(f"Key = {key}, Value = {value}")


cursor.close()
connection.close()

# Loop All Rows

In [None]:
import pymysql
connection = pymysql.connect(
    host='localhost',
    port=3306,
    user='root',
    password='root',
    database='classicmodels',
    cursorclass=pymysql.cursors.DictCursor  # Return dictionary of row that access by column name
)

cursor = connection.cursor()
cursor.execute('select * from customers')
results = cursor.fetchall()

print(results[0]) # row 0

for row in results: # print all rows
    for key, value in row.items():
        print(f"Key = {key}, Value = {value}\n")


cursor.close()
connection.close()

# Select specific column by name

In [None]:
import pymysql
connection = pymysql.connect(
    host='localhost',
    port=3306,
    user='root',
    password='root',
    database='classicmodels',
    cursorclass=pymysql.cursors.DictCursor  # Return dictionary of row that access by column name
)

cursor = connection.cursor()
cursor.execute('select * from customers where creditLimit between %s and %s', [100000, 110000])
results = cursor.fetchall()

for row in results: # print all rows
    print(row['creditLimit'])


cursor.close()
connection.close()

# Create Database

In [None]:
import pymysql
connection = pymysql.connect(
    host='localhost',
    port=3306,
    user='root',
    password='root',
    # database='classicmodels',
    cursorclass=pymysql.cursors.DictCursor  # Return dictionary of row that access by column name
)
cursor = connection.cursor()
cursor.execute("CREATE DATABASE IF NOT EXISTS shop_db")
cursor.close()
connection.close() # Close connection

In [None]:
import pymysql
connection = pymysql.connect(
    host='localhost',
    port=3306,
    user='root',
    password='root',
    database='shop_db',
    cursorclass=pymysql.cursors.DictCursor  # Return dictionary of row that access by column name
)
cursor = connection.cursor()
create_users_table = """
            CREATE TABLE IF NOT EXISTS users (
                id INT AUTO_INCREMENT PRIMARY KEY,
                name VARCHAR(100) NOT NULL,
                email VARCHAR(100) UNIQUE NOT NULL,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
            """
            
create_products_table = """
            CREATE TABLE IF NOT EXISTS products (
                id INT AUTO_INCREMENT PRIMARY KEY,
                name VARCHAR(200) NOT NULL,
                price DECIMAL(10, 2) NOT NULL,
                stock INT DEFAULT 0,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
            """
            
cursor.execute(create_users_table)
cursor.execute(create_products_table)
cursor.close()
connection.close() # Close connection

# Insert Users

In [None]:
import pymysql
connection = pymysql.connect(
    host='localhost',
    port=3306,
    user='root',
    password='root',
    database='shop_db',
    cursorclass=pymysql.cursors.DictCursor  # Return dictionary of row that access by column name
)
cursor = connection.cursor()

users_data = [
            ('John Smith', 'john.smith@email.com'),
            ('Sarah Johnson', 'sarah.johnson@email.com'),
            ('Mike Davis', 'mike.davis@email.com'),
            ('Emily Brown', 'emily.brown@email.com'),
            ('David Wilson', 'david.wilson@email.com')
        ]
        
sql = "INSERT INTO users (name, email) VALUES (%s, %s)"
        
for name, email in users_data:
    cursor.execute(sql, (name, email))
            
cursor.close()
connection.commit()
connection.close() 

# Update

In [None]:
import pymysql
connection = pymysql.connect(
    host='localhost',
    port=3306,
    user='root',
    password='root',
    database='shop_db',
    cursorclass=pymysql.cursors.DictCursor  # Return dictionary of row that access by column name
)
cursor = connection.cursor()

        
sql = "UPDATE users SET name = %s WHERE id = %s"
name = "Krit Chomaitong" 
cursor.execute(sql, (name, 5))
            
cursor.close()
connection.commit() # Commit to save changes
connection.close() 

# Delete user

In [None]:
import pymysql
connection = pymysql.connect(
    host='localhost',
    port=3306,
    user='root',
    password='root',
    database='shop_db',
    cursorclass=pymysql.cursors.DictCursor  # Return dictionary of row that access by column name
)
cursor = connection.cursor()

        
sql = "DELETE FROM users WHERE id = %s"
cursor.execute(sql, ( 5))
            
cursor.close()
connection.commit() # Commit to save changes
connection.close() 

# Active Record

In [None]:
import pymysql
class Database:
    def __init__(self, host='localhost',port='', user='root', password='root', database='shop_db'):
        try:
            self.connection = pymysql.connect(
                host=host,
                port=port,
                user=user,
                password=password,
                database=database,
                cursorclass=pymysql.cursors.DictCursor
            )
            print(f"Connected to database: {database}")
        except Exception as e:
            print(f"Failed to connect to database: {e}")
            raise
    
    def execute_non_query(self, sql, params=None):
        cursor = self.connection.cursor()
        try:
            affected_rows = cursor.execute(sql, params)
            self.connection.commit()
            return affected_rows
        except:
            self.connection.rollback()
        finally:
            cursor.close()

    def execute_query(self, sql, params=None):
        cursor = self.connection.cursor()
        try:
            cursor.execute(sql, params)
            return cursor.fetchall()
        finally:
            cursor.close() 

    def execute_single(self, sql, params=None):
        cursor = self.connection.cursor()
        try:
            cursor.execute(sql, params)
            result = cursor.fetchone()
            return result
        except Exception as e:
            print(f"Error executing single query: {e}")
            raise
        finally:
            cursor.close()
    
    def get_last_insert_id(self):
        return self.connection.insert_id()
    
    def close(self):
        self.connection.close()


In [None]:
class User:
    def __init__(self,db: Database):
        self.db = db

    def add_user(self,name, email):
        affected_rows = self.db.execute_non_query("INSERT INTO users (name, email) VALUES (%s, %s)",(name, email))

    def update_user(self, id, name, email):
        affected_rows = self.db.execute_non_query("UPDATE users SET name = %s, email = %s WHERE id = %s",(name, email, id))
        return affected_rows

    def delete_user(self, id):
        affected_rows = self.db.execute_non_query("DELETE FROM users WHERE id = %s",(id))
        return affected_rows

    def get_user_by_email(self, email):
        user = self.db.execute_single("SELECT * FROM users WHERE email = %s",(email))
        return user

    def get_user_by_name(self, name):
        user = self.db.execute_query("SELECT * FROM users WHERE name LIKE %s",(f"%{name}%"))
        return user

    def get_user_by_id(self, id):
        user = self.db.execute_single("SELECT * FROM users WHERE id = %s",(id))
        return user

    def get_users(self):
        users = self.db.execute_query("SELECT * FROM users ")
        return users


In [None]:
db = Database()
user = User(db=db)
affected_rows = user.add_user("Krit2","aaacc@gmail.com")
print(f"Add user = {affected_rows}")

find_user = user.get_user_by_email("aaacc@gmail.com")
print(f"Find user = {find_user}")

find_user = user.get_user_by_name("Krit")
print(f"Find user = {find_user}")

users = user.get_users()
print(f"All users = {find_user}")

affected_rows = user.update_user(6,name="Krit", email="AABBCC@gmail.com")
print(f"Update user = {find_user}")
