# SQLAlchemy를 통한 Python의 ORM 사용

## 밑준비

In [1]:
%pip install sqlalchemy sqlalchemy-utils pymysql cryptography

Note: you may need to restart the kernel to use updated packages.


In [2]:
DB_SETTING = {
  "drivername": "mysql+pymysql",
  "host": "localhost",
  "port": "3808",
  "username": "root",
  "password": "123qweasd.",
  "database": "sql_orm_test",
}

## 엔진 생성

In [3]:
import sqlalchemy as db
from sqlalchemy_utils import database_exists, create_database

# DB와의 연결을 관리
engine = db.create_engine(db.engine.URL.create(**DB_SETTING), future=True)

if not database_exists(engine.url):
    create_database(engine.url)
else:
    engine.connect()

## Model(테이블) 생성

In [4]:
import sqlalchemy
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Table, Date, TIMESTAMP

# 모든 ORM 모델의 부모 클래스
DeclarativeBase = declarative_base()

# Table 클래스 직접 써서 테이블 생성
address_table = Table(
  "address", # 테이블명
  DeclarativeBase.metadata, # 메타데이터
  Column('id', Integer, primary_key=True, autoincrement="auto"),
  Column('email', String(50), nullable=False, unique=True),
  Column('reg_date', TIMESTAMP, server_default=sqlalchemy.text('CURRENT_TIMESTAMP')),
  )


# 이 클래스를 상속시 자동으로 등록 준비
class User(DeclarativeBase):
  __tablename__ = 'user' # 테이블명

  id = Column(Integer, primary_key=True, autoincrement="auto")
  name = Column(String(20), nullable=False)
  age = Column(Integer, nullable=False)
  birthday = Column(Date)
  reg_date = Column(TIMESTAMP, server_default=sqlalchemy.func.now())


# 데이터베이스에 모델 생성
DeclarativeBase.metadata.create_all(engine)

In [5]:
User.__table__

Table('user', MetaData(), Column('id', Integer(), table=<user>, primary_key=True, nullable=False), Column('name', String(length=20), table=<user>, nullable=False), Column('age', Integer(), table=<user>, nullable=False), Column('birthday', Date(), table=<user>), Column('reg_date', TIMESTAMP(), table=<user>, server_default=DefaultClause(<sqlalchemy.sql.functions.now at 0x1093ee9a0; now>, for_update=False)), schema=None)

In [6]:
address_table

Table('address', MetaData(), Column('id', Integer(), table=<address>, primary_key=True, nullable=False), Column('email', String(length=50), table=<address>, nullable=False), Column('reg_date', TIMESTAMP(), table=<address>, server_default=DefaultClause(<sqlalchemy.sql.elements.TextClause object at 0x1093ee550>, for_update=False)), schema=None)

## 세션 생성

In [7]:
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm.session import Session

SessionMaker = sessionmaker(bind=engine)

# 모든 연결을 관리
session: Session = SessionMaker()

## ORM 스타일 쿼리 사용하기

### 생 쿼리 직접 실행

In [8]:
from sqlalchemy import text

result = session.execute("SHOW TABLES")

for row in result:
  print (row)

('address',)
('user',)


### Insert

In [9]:
from faker import Faker
import datetime
Faker.seed(42)

faker = Faker(locale='ko_KR')

def get_date_string_from_date(date: datetime.date):
  return "%04d-%02d-%02d" % (date.year, date.month, date.day)

def gen_fake_user_data():
  name = faker.name()
  birthday = faker.date_between(start_date=datetime.datetime.today() - datetime.timedelta(days=10000), end_date=datetime.datetime.today())
  age_dt = datetime.datetime.today() - datetime.datetime.combine(birthday, datetime.datetime.min.time()) 

  age = age_dt.days // 365
  return { "name" : name, "birthday": get_date_string_from_date(birthday), "age": age, }

#### 쿼리 문장을 생성하여 Execute하는 스타일

In [10]:
from sqlalchemy import insert

user_python_obj = gen_fake_user_data()
stmt = insert(User).values(**user_python_obj)
print (stmt)

INSERT INTO "user" (name, age, birthday) VALUES (:name, :age, :birthday)


In [11]:
compiled = stmt.compile()

print ("파이썬 데이터:", user_python_obj)
print ("생성 문장에 들어가는 데이터:", compiled.params)

파이썬 데이터: {'name': '김수민', 'birthday': '2002-08-17', 'age': 19}
생성 문장에 들어가는 데이터: {'name': '김수민', 'age': 19, 'birthday': '2002-08-17'}


In [12]:
result = session.execute(stmt)
session.commit()
print("추가된 데이터의 Primary Key:", result.inserted_primary_key)

추가된 데이터의 Primary Key: (1,)


In [13]:
# 데이터 100개 한번에 Insert
target_insert_data = [gen_fake_user_data() for _ in range(100)]

print ("추가된 데이터 한개 예시:", target_insert_data[0])
result = session.execute(
  insert(User), # Statement
  target_insert_data # Data
)
session.commit()

추가된 데이터 한개 예시: {'name': '김정호', 'birthday': '2013-08-03', 'age': 8}


#### ORM의 Object를 이용하는 스타일

In [14]:
user_python_obj = gen_fake_user_data()
user_orm_obj = User(**user_python_obj)

## 다음으로 User object를 insert
session.add(user_orm_obj)

## 다음으로 생성 확정
session.flush()

print ("추가됨:", user_python_obj)
print ("생성된 데이터의 ID값:", user_orm_obj.id)

추가됨: {'name': '김경희', 'birthday': '2009-11-14', 'age': 12}
생성된 데이터의 ID값: 102


### Select

#### 쿼리 문장을 생성하여 Execute하는 스타일

In [15]:
from sqlalchemy import select
stmt = select(User)
print (stmt)

SELECT "user".id, "user".name, "user".age, "user".birthday, "user".reg_date 
FROM "user"


In [16]:
result = session.execute(stmt)
for row in result:
  print (row[0].__dict__)
  break

{'_sa_instance_state': <sqlalchemy.orm.state.InstanceState object at 0x109820580>, 'id': 1, 'age': 19, 'reg_date': datetime.datetime(2022, 5, 31, 10, 4, 25), 'birthday': datetime.date(2002, 8, 17), 'name': '김수민'}


#### 다양한 조건을 추가

In [17]:
print (select(User).where(User.age == 14))

SELECT "user".id, "user".name, "user".age, "user".birthday, "user".reg_date 
FROM "user" 
WHERE "user".age = :age_1


In [18]:
print (select(User).where(User.age == 14, sqlalchemy.between(User.age, 10, 12)))

SELECT "user".id, "user".name, "user".age, "user".birthday, "user".reg_date 
FROM "user" 
WHERE "user".age = :age_1 AND "user".age BETWEEN :age_2 AND :age_3


In [19]:
print (select(User).order_by(User.name.desc()))

SELECT "user".id, "user".name, "user".age, "user".birthday, "user".reg_date 
FROM "user" ORDER BY "user".name DESC


In [20]:
# SQL 함수는 다음으로 대체한다.
from sqlalchemy import func

stmt = select(User.name, func.count(User.id)) \
  .where(User.age > 5) \
  .group_by(User.name) \
  .having(func.count(User.id) > 1)


print (stmt)

SELECT "user".name, count("user".id) AS count_1 
FROM "user" 
WHERE "user".age > :age_1 GROUP BY "user".name 
HAVING count("user".id) > :count_2


#### ORM을 이용하는 스타일

In [21]:
user_query = session.query(User)

filter_query = user_query.filter(User.age < 15, func.month(User.birthday) == 5).order_by(User.age)
print (filter_query)

SELECT user.id AS user_id, user.name AS user_name, user.age AS user_age, user.birthday AS user_birthday, user.reg_date AS user_reg_date 
FROM user 
WHERE user.age < %(age_1)s AND month(user.birthday) = %(month_1)s ORDER BY user.age


In [22]:
for row in filter_query:
  print (row.__dict__)

{'_sa_instance_state': <sqlalchemy.orm.state.InstanceState object at 0x10986bf10>, 'id': 12, 'age': 1, 'reg_date': datetime.datetime(2022, 5, 31, 10, 4, 25), 'birthday': datetime.date(2021, 5, 3), 'name': '김정남'}
{'_sa_instance_state': <sqlalchemy.orm.state.InstanceState object at 0x10986bc40>, 'id': 4, 'age': 3, 'reg_date': datetime.datetime(2022, 5, 31, 10, 4, 25), 'birthday': datetime.date(2019, 5, 25), 'name': '조영호'}
{'_sa_instance_state': <sqlalchemy.orm.state.InstanceState object at 0x109872280>, 'id': 20, 'age': 8, 'reg_date': datetime.datetime(2022, 5, 31, 10, 4, 25), 'birthday': datetime.date(2014, 5, 1), 'name': '김지훈'}


### Update

#### 쿼리 문장을 생성하여 Execute하는 스타일

In [23]:
from sqlalchemy import update

stmt = update(User).where(User.id == 1).values(name="변경: " + User.name)

print (stmt)
print (stmt.compile().params)

UPDATE "user" SET name=(:name_1 || "user".name) WHERE "user".id = :id_1
{'name_1': '변경: ', 'id_1': 1}


In [24]:
result = session.execute(stmt)

session.commit()

session.query(User).get(1).__dict__

{'_sa_instance_state': <sqlalchemy.orm.state.InstanceState at 0x109820580>,
 'age': 19,
 'id': 1,
 'reg_date': datetime.datetime(2022, 5, 31, 10, 4, 25),
 'name': '변경: 김수민',
 'birthday': datetime.date(2002, 8, 17)}

#### ORM의 Object를 이용하는 스타일

In [25]:
user_query = session.query(User)

first_user = user_query.get(1)

if first_user:
  first_user.name = "ORM"

session.commit()

In [26]:
session.query(User).get(1).__dict__

{'_sa_instance_state': <sqlalchemy.orm.state.InstanceState at 0x109820580>,
 'age': 19,
 'id': 1,
 'reg_date': datetime.datetime(2022, 5, 31, 10, 4, 25),
 'name': 'ORM',
 'birthday': datetime.date(2002, 8, 17)}

## 커넥션 스타일로 직접 쿼리 실행

In [27]:
import sqlalchemy

with engine.connect() as conn:
  print ("ROW fetch 스타일")
  result = conn.execute(sqlalchemy.text("select 'hello world'"))
  print(result.all())

  print ("ROW for문 순회 스타일")
  result = conn.execute(sqlalchemy.text("select 'hello world'"))
  for row in result:
    print (row)

ROW fetch 스타일
[('hello world',)]
ROW for문 순회 스타일
('hello world',)
