출처: https://gungadinn.github.io/data/2019/07/09/ORM/

In [None]:
import sqlalchemy 
from sqlalchemy import create_engine
from sqlalchemy import Table, Column, Integer, String, MetaData, ForeignKey
from sqlalchemy import select, join, insert, update
from sqlalchemy import and_, or_, not_

# Alchemy Version

In [None]:
print(f'{sqlalchemy.__version__}')

# Connectivity

In [None]:
db = create_engine('postgresql://vkadmin:Asdf!234@noweaver:5432/vk', echo=True)
# for i in range(1,2000):
#     conn = db.connect()
#     #some simple data operations
#     conn.close()
# db.dispose()

In [None]:
print(db)

# Create

In [None]:
metadata = MetaData()

In [None]:
# CREATE TABLE users (
#       id INTEGER NOT NULL
#     , name VARCHAR
#     , fullname VARCHAR
#     , PRIMARY KEY (id)
# )
users = Table('users', metadata,
                 Column('id', Integer, primary_key=True),
                 Column('name', String),
                 Column('fullname', String),
)

In [None]:
# metadata.create_all(db)

In [None]:
# CREATE TABLE addresses (
#     id INTEGER NOT NULL
#     , user_id INTEGER
#     , email_address VARCHAR NOT NULL
#     , PRIMARY KEY (id)
#     , FOREIGN KEY(user_id) REFERENCES users (id)
# )
addresses = Table('addresses', metadata,
                    Column('id', Integer, primary_key=True),
                    Column('user_id', None, ForeignKey('users.id')),
                    Column('email_address', String, nullable=False)
)

In [None]:
metadata.create_all(db)

# Insert

In [None]:
# value 값 없음
insert = users.insert()
print(insert)

insert = users.insert().values(name='kim', fullname='Anonymous, Kim')
print(insert)

insert.compile().params

## Insert 구문 이용

In [None]:
conn = db.connect()

insert.bind = db
str(insert)

result = conn.execute(insert)

result.inserted_primary_key

## Execute의 params 사용

In [None]:
insert = users.insert()
result = conn.execute(insert, name="lee", fullname="Unknown, Lee")
result.inserted_primary_key

## DBAPI의 executemany() 사용

In [None]:
conn.execute(addresses.insert(), [
    {"user_id":1, "email_address":"anonymous.kim@test.com"},
    {"user_id":2, "email_address":"unknown.lee@test.com"}
])

# Select

In [None]:
query = select([users])
result = conn.execute(query)

for row in result:
    print(row)

In [None]:
result = conn.execute(select([users.c.name, users.c.fullname]))

for row in result:
    print(row)

In [None]:
result = conn.execute(query)

row = result.fetchone()
print("id - ", row["id"], ", name - ", row["name"], ", fullname - ", row["fullname"])

row = result.fetchone()
print("id - ", row[0], ", name - ", row[1], ", fullname - ", row[2])

In [None]:
result = conn.execute(query)
rows = result.fetchall()

for row in rows:
    print("id - ", row[0], ", name -", row[1], ", fullname - ", row[2])

result.close()

# Conjunctions

In [None]:
print(users.c.id == addresses.c.user_id)
print(users.c.id == 1)
print((users.c.id == 1).compile().params)
print(or_(users.c.id == addresses.c.user_id, users.c.id == 1))
print(and_(users.c.id == addresses.c.user_id, users.c.id == 1))
print(and_(
    or_(
        users.c.id == addresses.c.user_id,
        users.c.id ==1
   ),
    addresses.c.email_address.like("a%")
    )
)
print((
    (users.c.id == addresses.c.user_id) |
    (users.c.id == 1)
) & (addresses.c.email_address.like("a%")))

In [None]:
#1
result = conn.execute(select([users]).where(users.c.id == 1))
for row in result:
    print(row)

#2
result = conn.execute(select([users, addresses]).where(users.c.id == addresses.c.user_id))
for row in result:
    print(row)
    
#3
result = conn.execute(select([users.c.id, users.c.fullname, addresses.c.email_address]).where(users.c.id == addresses.c.user_id))
for row in result:
    print(row)
    
#4
result = conn.execute(select([users.c.id, users.c.fullname, addresses.c.email_address]).where(users.c.id == addresses.c.user_id).where(addresses.c.email_address.like("Un%")))
for row in result:
    print(row)

# Join

In [None]:
print(users.join(addresses))
print(users.join(addresses, users.c.id == addresses.c.user_id))

In [None]:
query = select([users.c.id, users.c.fullname,
               addresses.c.email_address]).select_from(users.join(addresses))

result = conn.execute(query).fetchall()
for row in result:
    print(row)

In [None]:
# input
metadata.tables

In [None]:
metadata.clear()
metadata.tables

![alchemy-practice](./imgs/alchemy-practice.png)

# Create Tables

In [None]:
artist = Table("Artist", metadata,
              Column("id", Integer, primary_key=True),
              Column("name", String, nullable=False),
              extend_existing=True)

album = Table("Album", metadata,
             Column("id", Integer, primary_key=True),
             Column("title", String, nullable=False),
             Column("artist_id", Integer, ForeignKey("Artist.id")),
             extend_existing=True)

genre = Table("Genre", metadata,
             Column("id", Integer, primary_key=True),
             Column("name", String, nullable=False),
             extend_existing=True)

track = Table("Track", metadata,
              Column("id", Integer, primary_key=True),
              Column("title",String,nullable=False),
              Column("length", Integer, nullable=False),
              Column("rating", Integer, nullable=False),
              Column("count", Integer, nullable=False),
              Column("album_id", Integer,ForeignKey("Album.id")),
              Column("genre_id", Integer,ForeignKey("Genre.id")),
              extend_existing=True)

In [None]:
metadata.create_all(db)

## Show tables

In [None]:
tables = metadata.tables
for table in tables:
    print(table)

In [None]:
# 데이터베이스에 있는 테이블
for table in db.table_names():
    print(table)

# Insert

In [None]:
conn = db.connect()

conn.execute(artist.insert(), [
    {"name":"Led Zepplin"},
    {"name":"AC/DC"}
])

conn.execute(album.insert(), [
    {"title":"IV" ,"artist_id":1},
    {"title":"Who Made Who" ,"artist_id":2}
])

conn.execute(genre.insert(), [
    {"name":"Rock"},
    {"name":"Metal"}
])

conn.execute(track.insert(),[
    {"title":"Black Dog", "rating":5, "length":297, "count":0, "album_id":1, "genre_id":1},
    {"title":"Stairway", "rating":5, "length":482, "count":0, "album_id":1, "genre_id":1},
    {"title":"About to rock", "rating":5, "length":313, "count":0, "album_id":2, "genre_id":2},
    {"title":"Who Made Who", "rating":5, "length":297, "count":0, "album_id":2, "genre_id":2}
])

# Select

In [None]:
artistResult = conn.execute(artist.select())
for row in artistResult:
    print(row)
    
albumResult = conn.execute(album.select())
for row in albumResult:
    print(row)
    
genreResult = conn.execute(genre.select())
for row in genreResult:
    print(row)
    
trackResult = conn.execute(track.select())
for row in trackResult:
    print(row)

# Where

In [None]:
trackResult = conn.execute(select([track]).\
                           where(and_(track.c.album_id == 1, track.c.genre_id == 1)))
for row in trackResult:
    print(row)

# Update

In [None]:
conn.execute(track.update().values(genre_id=2).where(track.c.id==2))
conn.execute(track.update().values(genre_id=1).where(track.c.id==3))

trackResult = conn.execute(select([track]).where(and_(track.c.album_id == 1, or_(track.c.genre_id == 1, track.c.genre_id == 2,))))
for row in trackResult:
    print(row)
    

# Join

In [None]:
print(track.join(album))

In [None]:
result = conn.execute(track.select().select_from(track.join(album)))

for row in result.fetchall():
    print(row)

In [None]:
result = conn.execute(track.select().select_from(track.join(album)).where(album.c.id==1))

for row in result.fetchall():
    print(row)

# Multiple Join

In [None]:
print(track.join(album))
print(track.join(album).join(genre))
print(track.join(album).join(artist))
print(track.join(album).join(genre).join(artist))

In [None]:
result = conn.execute(select([track.c.title, album.c.title, genre.c.name, artist.c.name]).select_from(track.join(album).join(genre).join(artist)))

for row in result.fetchall():
    print(row)

In [None]:
result = conn.execute(track.select().select_from(track.join(album)\
                                  .join(genre).join(artist))
                     .where(
                         and_(
                             genre.c.id==1,
                             artist.c.id==1,
                         )
                     )
                     )
for row in result.fetchall():
    print(row)

# Open/Close

In [None]:
db = create_engine('postgresql://vkadmin:Asdf!234@noweaver:5432/vk', echo=True)
conn = db.connect()

metadata = MetaData(bind=db, reflect=True)
metadata.reflect(bind=db)

for row in metadata.tables:
    print(row)

In [None]:
tables = metadata.tables
for table in tables:
    print(table)
    
    
# album
track = metadata.tables["Track"]
for row in conn.execute(track.select()).fetchall():
    print(row)
    
conn.close()
metadata.clear()

# ORM

In [None]:
from sqlalchemy.ext.declarative import declarative_base

In [None]:
base = declarative_base()

## Create

In [None]:
# 아직은 메모리에
class User(base):
    __tablename__ = "users"
    
    id = Column(Integer, primary_key=True)
    name = Column(String)
    fullname = Column(String)
    password = Column("passwd", String)
    
    def __repr__(self):
        return "<T'User(name='%s', fullname='%s', password='%s')>"%(self.name, self.fullname, self.password)

In [None]:
# schema
User.__table__

In [None]:
# create table
base.metadata.create_all(db)

In [None]:
# create instance
kim = User(name="kim", fullname="anonymous, Kim", password="kimbap heaven")

print(kim)
print(kim.id)

## Session

In [None]:
from sqlalchemy.orm import sessionmaker

Session = sessionmaker(bind=db)
session = Session()

## Insert

In [None]:
session.add(kim)

In [None]:
session.add_all([
    User(name="lee", fullname="unknown, Lee", password="123456789a"),
    User(name="park", fullname="nobody, Park", password="Parking in Park")
])

## Update

In [None]:
kim.password = "password"

session.dirty

session.is_modified(kim)

In [None]:
for row in session.query(User):
    print(type(row))
    print(row.id, row.name, row.fullname, row.password)

In [None]:
for row in session.query(User.id, User.fullname).filter(User.name == "lee"):
    print(type(row))
    print(row.id, row.fullname)

In [None]:
for row in session.query(User.id, User.fullname).filter_by(name = "lee"):
    print(type(row))
    print(row.id, row.fullname)