## sqlalchemy core笔记

### 定义和创建表

In [1]:
# 创建sqlalchemy引擎
from sqlalchemy import create_engine
# 连接字符串，mysql代表数据库引擎，pymysql代表数据库连接驱动，pool_recycle是连接回收时间
conn_str = 'mysql+pymysql://root:ying19781216@192.168.99.1/test'
engine = create_engine(conn_str, pool_recycle=3600)

In [2]:
# 建表
from sqlalchemy import Table, Column, Integer, String, MetaData, ForeignKey
# 获取MetaData对象
metadata = MetaData()
# 创建users表
users = Table('users', metadata,
              Column('id', Integer, primary_key=True),
              Column('name', String(32)), 
              Column('fullname', String(200)), 
              Column('password', String(20)),
              extend_existing=True)
# 创建addresses表
addresses = Table('addresses', metadata, 
                  Column('id', Integer, primary_key=True),
                  Column('user_id', None, ForeignKey('users.id')),
                  Column('email_address', String(200), nullable=False),
                  extend_existing=True)

In [3]:
# 使用engine引擎创建metadata中的所有对象
metadata.create_all(engine)

### Insert表达式

In [4]:
ins = users.insert()
str(ins)

'INSERT INTO users (id, name, fullname, password) VALUES (:id, :name, :fullname, :password)'

In [5]:
ins = users.insert().values(name='jack', fullname='Jack Jones', password='123456')
str(ins)

'INSERT INTO users (name, fullname, password) VALUES (:name, :fullname, :password)'

In [6]:
ins.compile().params

{'fullname': 'Jack Jones', 'name': 'jack', 'password': '123456'}

#### 执行insert命令

In [7]:
# 获得连接对象
conn = engine.connect()

In [8]:
# 执行
result = conn.execute(ins)
result

<sqlalchemy.engine.result.ResultProxy at 0x7fbb935d2550>

In [9]:
ins.bind = engine
str(ins)

'INSERT INTO users (name, fullname, password) VALUES (%(name)s, %(fullname)s, %(password)s)'

In [10]:
# 查看插入的主键值
result.inserted_primary_key

[1]

#### 执行批量插入

In [11]:
ins = users.insert()
result = conn.execute(ins, name='wendy', fullname='Wendy Williams', password='654321')
result.inserted_primary_key

[2]

In [12]:
# 插入多条记录
result = conn.execute(addresses.insert(), [
    {'user_id': 1, 'email_address': 'jack@yahoo.com'},
    {'user_id': 1, 'email_address': 'jack@msn.com'},
    {'user_id': 2, 'email_address': 'www@www.org'},
    {'user_id': 1, 'email_address': 'wendy@aol.com'},
])

result.rowcount

4

### 使用select

In [13]:
from sqlalchemy.sql import select

In [14]:
# 查询users表所有记录
s = select([users])
result = conn.execute(s)

for row in result:
    print(row)

(1, 'jack', 'Jack Jones', '123456')
(2, 'wendy', 'Wendy Williams', '654321')


In [15]:
# 获得下一条记录，使用列名获取值
result = conn.execute(s)
row = result.fetchone()
print(row['name'], row['fullname'])

jack Jack Jones


In [16]:
# 获得下一条记录，使用列序号获取值
row = result.fetchone()
print(row[1], row[2])

wendy Wendy Williams


In [17]:
# 关闭结果集
result.close()

In [18]:
# 使用Table对象的c属性获取表的部分字段
s = select([users.c.name, users.c.password])
result = conn.execute(s)
for row in result:
    print(row)

('jack', '123456')
('wendy', '654321')


In [19]:
# 获得两张表的笛卡尔乘积结果
for row in conn.execute(select([users, addresses])):
    print(row)

(1, 'jack', 'Jack Jones', '123456', 1, 1, 'jack@yahoo.com')
(2, 'wendy', 'Wendy Williams', '654321', 1, 1, 'jack@yahoo.com')
(1, 'jack', 'Jack Jones', '123456', 2, 1, 'jack@msn.com')
(2, 'wendy', 'Wendy Williams', '654321', 2, 1, 'jack@msn.com')
(1, 'jack', 'Jack Jones', '123456', 3, 2, 'www@www.org')
(2, 'wendy', 'Wendy Williams', '654321', 3, 2, 'www@www.org')
(1, 'jack', 'Jack Jones', '123456', 4, 1, 'wendy@aol.com')
(2, 'wendy', 'Wendy Williams', '654321', 4, 1, 'wendy@aol.com')


In [20]:
# 获得两张表内连接结果，使用where
s = select([users, addresses]).where(users.c.id==addresses.c.user_id)
for row in conn.execute(s):
    print(row)

(1, 'jack', 'Jack Jones', '123456', 1, 1, 'jack@yahoo.com')
(1, 'jack', 'Jack Jones', '123456', 2, 1, 'jack@msn.com')
(1, 'jack', 'Jack Jones', '123456', 4, 1, 'wendy@aol.com')
(2, 'wendy', 'Wendy Williams', '654321', 3, 2, 'www@www.org')


In [21]:
# 有别于Python的条件表达式，下面式子并不返回True或False，而是一种条件模型
users.c.id==addresses.c.user_id

<sqlalchemy.sql.elements.BinaryExpression object at 0x7fbb92f01908>

In [22]:
# 使用str可以看到，该条件模型将转换为SQL的条件字符串
str(users.c.id==addresses.c.user_id)

'users.id = addresses.user_id'

#### 运算符

In [23]:
# 如果表达式中有常量，将会转换为绑定参数形式
str(users.c.id==1)

'users.id = :id_1'

In [24]:
# 条件表达式编译后获得
(users.c.id==1).compile().params

{'id_1': 1}

In [25]:
# 大部分的Python运算都可以作为条件表达式的运算符
print(users.c.id!=1)
print(users.c.name==None)
print('fred'>users.c.name)

users.id != :id_1
users.name IS NULL
users.name < :name_1


In [26]:
# 其他运算亦一样
str(users.c.id+addresses.c.id)

'users.id + addresses.id'

In [27]:
# 字符串连接
str(users.c.name+users.c.fullname)

'users.name || users.fullname'

In [28]:
# 指定为mysql+pymysql引擎时的字符串连接
print((users.c.name+users.c.fullname).compile(bind=create_engine('mysql+pymysql://')))

concat(users.name, users.fullname)


In [29]:
# 如果SQL中有无法在Python中找到的运算符，可以使用通用op函数定义一个自己的运算符
str(users.c.name.op('my_op')('foo'))

'users.name my_op :name_1'

In [30]:
# 如定义一个按位与运算
str(users.c.id.op('&')(0xff))

'users.id & :id_1'

In [31]:
# 如果运算返回的是布尔值，需要使用bool_op
str(users.c.name.bool_op('->')('foo'))

'users.name -> :name_1'

#### 条件连接

In [32]:
from sqlalchemy.sql import and_, or_, not_

In [33]:
# 使用and_, or_和not_组合条件
print(and_(users.c.name.like('j%'),
          users.c.id == addresses.c.user_id,
          or_(
              addresses.c.email_address=='wendy@aol.com',
              addresses.c.email_address=='jack@yahoo.com'
          ),
          not_(users.c.id>5)))

users.name LIKE :name_1 AND users.id = addresses.user_id AND (addresses.email_address = :email_address_1 OR addresses.email_address = :email_address_2) AND users.id <= :id_1


In [34]:
# 也可以使用Python的位运算符& | ~进行连接
print(users.c.name.like('j%') & (users.c.id==addresses.c.user_id) 
     & ((addresses.c.email_address=='wendy@aol.com') | (addresses.c.email_address=='jack@yahoo.com'))
     & ~(users.c.id>5))

users.name LIKE :name_1 AND users.id = addresses.user_id AND (addresses.email_address = :email_address_1 OR addresses.email_address = :email_address_2) AND users.id <= :id_1


In [35]:
# 集合上面的很多运算符进行查询
conn = engine.connect()
s = select([(users.c.fullname+', '+addresses.c.email_address).label('title')]).where(
and_(users.c.id==addresses.c.user_id, users.c.name.between('a', 'm'), or_(
addresses.c.email_address.like('%@aol.com'), addresses.c.email_address.like('%@msn.com'))))
conn.execute(s).fetchall()

[('Jack Jones, jack@msn.com',), ('Jack Jones, wendy@aol.com',)]

#### 使用SQL字符串查询

In [36]:
from sqlalchemy.sql import text

In [37]:
# 使用SQL文本查询同样的结果
s = text("select concat(u.fullname, ', ', a.email_address) as title " 
         "from users u, addresses a " 
         "where u.id=a.user_id and u.name between :start and :end "
         "and (a.email_address like :e1 or a.email_address like :e2)")
conn.close()
if conn.closed:
    conn = engine.connect()
conn.execute(s, start='a', end='m', e1='%@aol.com', e2='%@msn.com').fetchall()

[('Jack Jones, jack@msn.com',), ('Jack Jones, wendy@aol.com',)]

#### 设置结果字段的行为

In [38]:
# 指定列的类型
s = text("select id, name from users")
s = s.columns(id=Integer, name=String)

In [39]:
# 或者使用表实体字段指定
s = text("select id, name from users")
s = s.columns(users.c.id, users.c.name)

In [40]:
# 当TextClause的SQL调用了columns函数之后，将会返回一个TextAsFrom对象，该对象表现像表一样，可以支持c属性以及其他select操作
j = s.join(addresses, s.c.id==addresses.c.user_id)
ns = select([s.c.id, addresses.c.id]).select_from(j).where(s.c.name=='x')
print(ns)

SELECT id, addresses.id 
FROM (select id, name from users) JOIN addresses ON id = addresses.user_id 
WHERE name = :name_1


In [41]:
# 还可以使用columns函数对结果中相同名称的列进行定义
s = text("select users.id, addresses.id, users.id, users.name, addresses.email_address as email "
        "from users join addresses on users.id = addresses.id "
        "where users.id = 1").columns(users.c.id, 
                                     addresses.c.id,
                                     addresses.c.user_id,
                                     users.c.name,
                                     addresses.c.email_address)
def check_and_reconnect(conn):
    if conn.closed:
        conn = engine.connect()
    return conn
conn = check_and_reconnect(conn)
result = conn.execute(s)
for row in result:
    print(row[addresses.c.id])

1


In [42]:
# 也可以在select、where这些函数里面使用text
s = select([text("concat(users.fullname, addresses.email_address) as title")]) \
.where(and_(
                text("users.id=addresses.user_id"),
                text("users.name between 'a' and 'm'"),
                text("(addresses.email_address like :x "
                     "or addresses.email_address like :y)")
            )
      ).select_from(text("users, addresses"))
conn = check_and_reconnect(conn)
conn.execute(s, x='%aol.com', y='%msn.com').fetchall()

[('Jack Jonesjack@msn.com',), ('Jack Joneswendy@aol.com',)]

In [43]:
# 对label的字段进行排序order和分组grouping
from sqlalchemy import func
s = select([addresses.c.user_id, func.count(addresses.c.id).label('num_addresses')]).\
order_by('num_addresses')
conn = check_and_reconnect(conn)
conn.execute(s).fetchall()

[(1, 4)]

In [45]:
# 使用asc()或desc()选择升序或降序
from sqlalchemy import desc
from sqlalchemy import func
s = select([addresses.c.user_id, func.count(addresses.c.id).label('num_addresses')]).\
order_by(desc('num_addresses'))
conn = check_and_reconnect(conn)
conn.execute(s).fetchall()

[(1, 4)]

In [46]:
# 使用表别名
ula, ulb = users.alias(), users.alias()
s = select([ula, ulb]).where(ula.c.name>ulb.c.name).order_by(ula.c.name)
conn = check_and_reconnect(conn)
conn.execute(s).fetchall()

[(2, 'wendy', 'Wendy Williams', '654321', 1, 'jack', 'Jack Jones', '123456')]

### 使用表连接join

In [51]:
# 连接两张有外键的表
print(users.join(addresses))

users JOIN addresses ON users.id = addresses.user_id


In [52]:
# 自己定义表连接的条件
print(users.join(addresses, addresses.c.email_address.like(users.c.name+'%')))

users JOIN addresses ON addresses.email_address LIKE users.name || :name_1


In [53]:
# 放在select中
s = select([users.c.fullname]).select_from(users.join(addresses, 
                                                     addresses.c.email_address.like(users.c.name+'%')))
conn = check_and_reconnect(conn)
conn.execute(s).fetchall()

[('Jack Jones',), ('Jack Jones',), ('Wendy Williams',)]

In [54]:
# 左外连接left outer join
print(select([users.c.fullname]).select_from(users.outerjoin(addresses)))

SELECT users.fullname 
FROM users LEFT OUTER JOIN addresses ON users.id = addresses.user_id


### SQL函数

In [55]:
# 使用func对象生成并调用SQL函数
print(func.now())
print(func.concat('x', 'y'))

now()
concat(:concat_1, :concat_2)


In [56]:
# 生成一个并不存在的函数
print(func.foo_bar_func())

foo_bar_func()


In [57]:
# 如果函数是ANSI标准的SQL函数，SQLAlchemy会认出来，转为大写，并且不带括号
print(func.current_timestamp())

CURRENT_TIMESTAMP


In [58]:
# 查询最大的email地址，给列起别名，指定列的数据类型，然后使用scalar()返回第一行第一列的一个标量
s = select([func.max(addresses.c.email_address, type_=String).label('maxemail')])
conn = check_and_reconnect(conn)
conn.execute(s).scalar()

'www@www.org'