In [1]:
import psycopg2
def get_db_connection():
    """데이터베이스 연결을 반환합니다."""
    try:
        conn = psycopg2.connect(
            host="localhost",
            database="mydb",
            user="admin",
            password="admin123"
        )
        return conn
    except psycopg2.Error as e:
        raise

In [None]:
import psycopg2

def get_all_tables():
    """데이터베이스의 모든 테이블 이름을 조회합니다."""
    conn = None
    try:
        # DB 연결
        conn =get_db_connection()
        cur = conn.cursor()

        # 모든 테이블 이름 조회 (시스템 테이블 제외)
        query = """
        SELECT table_name 
        FROM information_schema.tables 
        WHERE table_schema = 'public' 
        ORDER BY table_name;
        """
        
        cur.execute(query)
        tables = cur.fetchall()

        print("--- 데이터베이스의 모든 테이블 ---")
        for table in tables:
            print(f"테이블명: {table[0]}")

        return [table[0] for table in tables]

    except (Exception, psycopg2.DatabaseError) as error:
        print("오류 발생:", error)
        return []
    finally:
        if conn is not None:
            cur.close()
            conn.close()

# 사용 예시
get_all_tables()

--- 데이터베이스의 모든 테이블 ---
테이블명: products


['products']

In [None]:
import psycopg2
from psycopg2 import sql

def create_table():
    """PostgreSQL 데이터베이스에 새로운 테이블을 생성합니다."""
    conn = None
    try:
        # 데이터베이스 연결
        conn = get_db_connection()

        # 커서(cursor) 생성
        cur = conn.cursor()

        # 실행할 SQL 쿼리 (테이블 생성)
        # IF NOT EXISTS를 사용하여 테이블이 이미 존재하면 에러를 방지합니다.
        create_table_query = """
        CREATE TABLE IF NOT EXISTS products (
            id SERIAL PRIMARY KEY,
            name VARCHAR(255) NOT NULL,
            price NUMERIC(10, 2) NOT NULL,
            created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
        );
        """

        # SQL 쿼리 실행
        cur.execute(create_table_query)

        # 변경사항을 데이터베이스에 커밋(commit)
        conn.commit()

        print("'products' 테이블이 성공적으로 생성되었습니다.")

    except (Exception, psycopg2.DatabaseError) as error:
        print("Error while connecting to PostgreSQL", error)
    finally:
        # 연결 종료
        if conn is not None:
            cur.close()
            conn.close()
            print("PostgreSQL connection is closed.")

create_table()

'products' 테이블이 성공적으로 생성되었습니다.
PostgreSQL connection is closed.


In [None]:
import psycopg2

def insert_multiple_products():
    """products 테이블에 여러 상품 데이터를 한 번에 삽입합니다."""
    conn = None
    try:
        conn = get_db_connection()
        cur = conn.cursor()

        # 1. 여러 행을 삽입할 SQL
        sql = "INSERT INTO products (name, price) VALUES (%s, %s);"

        # 2. 튜플의 리스트 형태로 데이터 준비
        products_to_insert = [
            ('Mechanical Keyboard', 125.50),
            ('4K Monitor', 450.00),
            ('Webcam', 75.25),
            ('USB Hub', 22.99)
        ]

        # 3. executemany()로 한 번에 실행
        cur.executemany(sql, products_to_insert)

        # 4. 변경사항 커밋
        conn.commit()
        print(f"{len(products_to_insert)}개의 상품 데이터가 성공적으로 삽입되었습니다.")

    except (Exception, psycopg2.DatabaseError) as error:
        print("오류 발생:", error)
        if conn:
            conn.rollback()
    finally:
        if conn is not None:
            cur.close()
            conn.close()

insert_multiple_products()

4개의 상품 데이터가 성공적으로 삽입되었습니다.


In [None]:
import psycopg2

def select_all_products():
    """products 테이블의 모든 데이터를 조회합니다."""
    conn = None
    try:
        # DB 연결
        conn = get_db_connection()
        cur = conn.cursor()

        # 1. SELECT 쿼리 실행
        cur.execute("SELECT id, name, price, created_at FROM products ORDER BY id;")

        # 2. 조회된 모든 데이터를 한 번에 가져오기
        products = cur.fetchall()

        # 3. 결과 출력
        print("--- 모든 상품 목록 ---")
        for product in products:
            # product는 (id, name, price, created_at) 형태의 튜플입니다.
            print(f"ID: {product[0]}, 이름: {product[1]}, 가격: ${product[2]:.2f}")

    except (Exception, psycopg2.DatabaseError) as error:
        print("오류 발생:", error)
    finally:
        if conn is not None:
            cur.close()
            conn.close()

select_all_products()

--- 모든 상품 목록 ---
ID: 2, 이름: 4K Monitor, 가격: $450.00
ID: 3, 이름: Webcam, 가격: $75.25
ID: 4, 이름: USB Hub, 가격: $22.99
ID: 5, 이름: Mechanical Keyboard, 가격: $125.50
ID: 6, 이름: 4K Monitor, 가격: $450.00
ID: 7, 이름: Webcam, 가격: $75.25
ID: 8, 이름: USB Hub, 가격: $22.99


In [None]:
import psycopg2

def delete_product_by_id(product_id):
    """특정 ID를 가진 상품을 products 테이블에서 삭제합니다."""
    conn = None
    deleted_rows = 0
    try:
        # DB 연결
        conn = get_db_connection()
        cur = conn.cursor()

        # 1. DELETE 쿼리 실행
        delete_query = "DELETE FROM products WHERE id = %s;"
        cur.execute(delete_query, (product_id,))

        # 2. 삭제된 행 수 확인
        deleted_rows = cur.rowcount

        # 3. 변경사항 커밋
        conn.commit()

        if deleted_rows > 0:
            print(f"ID {product_id}인 상품이 성공적으로 삭제되었습니다.")
        else:
            print(f"ID {product_id}인 상품을 찾을 수 없습니다.")

    except (Exception, psycopg2.DatabaseError) as error:
        print("오류 발생:", error)
        # 오류 발생 시 롤백
        if conn is not None:
            conn.rollback()
    finally:
        if conn is not None:
            cur.close()
            conn.close()
    
    return deleted_rows

# 사용 예시
if __name__ == "__main__":
    # 삭제할 상품 ID 입력
    product_id_to_delete = 2  # 여기에 삭제하려는 상품 ID를 입력하세요
    
    # 상품 삭제 실행
    delete_product_by_id(product_id_to_delete)
    
    # 삭제 후 전체 상품 목록 확인 (선택사항)
    select_all_products()

ID 2인 상품이 성공적으로 삭제되었습니다.
--- 모든 상품 목록 ---
ID: 3, 이름: Webcam, 가격: $75.25
ID: 4, 이름: USB Hub, 가격: $22.99
ID: 5, 이름: Mechanical Keyboard, 가격: $125.50
ID: 6, 이름: 4K Monitor, 가격: $450.00
ID: 7, 이름: Webcam, 가격: $75.25
ID: 8, 이름: USB Hub, 가격: $22.99
