In [5]:
import re

## 关于这份noteook的说明

SQLAlchemy包含SQLAlchemy Core和SQLAlchemy ORM两部分, 这里只介绍SQLAlchemy Core的内容。

SQLAlchemy中需要通过engine和目标数据库建立连接，如果要运行文中的代码,请安装postgresql数据库并且建立测试用户和测试数据库,用户名,数据库名,密码都是test。如果使用了其它的用户名,密码和数据库名，请相应的对下面建立engine的代码做出修改。

In [4]:
from IPython.display import display
from sqlalchemy import create_engine

user = 'test'
password = 'test'
port = '5432'
dbname = 'test'

engine = create_engine('postgresql://{user}:{password}@localhost:{port}/{dbname}'.format(**locals()))

helper.py文件可以在本notebook的文件夹中可以找到。它包含了后面的实验需要的测试数据

In [2]:
from helper import users_data, addresses_data

users表和user是一一对应关系,它包含的测试数据

In [3]:
users_data

Unnamed: 0,id,name,fullname
0,1,jack,Jack Jones
1,2,wendy,Wendy Williams


addresses表和user是一对多的关系,它包含的测试数据

In [4]:
addresses_data

Unnamed: 0,user_id,email_address
0,1,jack@yahoo.com
1,1,jack@msn.com
2,2,www@www.org
3,2,wendy@aol.com


reset_db函数的功能是在postgresql数据库中重置这两张表的数据,并且返回指向两张表的sqlalchemy.sql.schema.Table对象，有了这两个object记录了表中有哪些列，列的数据类型是什么等信息，它们是用SQLAlchemy构建SQL逻辑的基础object。如果看到这里不明白的话不用在意，后面会有详细的解说。

In [5]:
from helper import reset_db

## SQLAlchemy初印象
SQLAlchemy Core的显著特征是用sqlalchemy中提供object层层组合,去表达SQL逻辑。例如下面这段SQL逻辑

In [45]:
sql = '''
    select 
        users.id, users.fullname
    from
    (
        users join addresses
        on
            users.id = addresses.user_id
    )
    group by
        users.id
    having
        count(addresses.email_address)>1
    order by users.fullname
'''

在SQLAlchemy Core中是这样表达的

In [46]:
from sqlalchemy import select, func

users, addresses = reset_db(engine)

s = (
    select(
        [
            users.c.id,
            users.c.fullname    
        ]
    ).select_from(
        users.join(
            addresses,
            users.c.id==addresses.c.user_id    
        )
    ).group_by(users.c.id)
    .having(func.count(addresses.c.email_address)>1)
)

其中所有的SQL逻辑基本要素例如表,列,条件,join语句,以及组成的整体SQL逻辑都是SQLAlchemy中的object

In [47]:
l = [
    users,
    users.c.id,
    users.c.id==addresses.c.user_id,
    s
]

for obj in l:
    print(type(obj))

<class 'sqlalchemy.sql.schema.Table'>
<class 'sqlalchemy.sql.schema.Column'>
<class 'sqlalchemy.sql.elements.BinaryExpression'>
<class 'sqlalchemy.sql.selectable.Select'>


由于因此使用SQLAlchemy Core表达SQL逻辑的时候，是一个从代表简单SQL逻辑的object组装成复杂object的过程。这样做有几个好处。

### 分而治之
当SQL逻辑复杂的时候，可以分阶段的构造。先构造简单的SQL逻辑模块，再组装成复杂的SQL逻辑。相比一次性构造完成的复杂SQL逻辑相比，头脑的负担更低，也不容易出错。

In [48]:
from sqlalchemy import select, func

columns = [users.c.id, users.c.fullname]

join_clause = users.join(
    addresses,
    users.c.id==addresses.c.user_id    
)

condition = func.count(addresses.c.email_address)>1


s = (
    select(columns)
    .select_from(join_clause)
    .group_by(users.c.id)
    .having(condition)
)

### 容易复用
由于使用SQLAlchemy Core去表达SQL，本质上是使用python语言写代码。　因此我们可以利用python提供的一切工具和手段将重复出现的SQL逻辑转化成可复用的python代码。

### 处理数据库差异

在用SQLAlchemy Core表达SQL逻辑的时候，只是表达了用户的意图，并不是最终的SQL语句。

同样的SQL逻辑，在不同的数据库中语法可能会有变化，而SQLAlchemy Core会根据database的种类，编译出和这个database兼容的SQL语句。这样用户写一次SQLAlchemy Core组织一次SQL逻辑，就可以在多个数据库中复用。

例如我们engine指向的是postgresql数据库，我们可以用下面的语句观察针对postgresql编译后的SQL语句。

In [49]:
print((s.compile(engine, compile_kwargs={"literal_binds": True})))

SELECT test.users.id, test.users.fullname 
FROM test.users JOIN test.addresses ON test.users.id = test.addresses.user_id GROUP BY test.users.id 
HAVING count(test.addresses.email_address) > 1


上面的语句有点长,我们写一个函数方便后面实验

In [50]:
def print_sql(engine, obj, compile_kwargs = None):
    compile_kwargs = compile_kwargs or {}
    print('Compiled SQL')
    print('=============================')
    print((obj.compile(engine, compile_kwargs=compile_kwargs)))

print_sql(engine,s)

Compiled SQL
SELECT test.users.id, test.users.fullname 
FROM test.users JOIN test.addresses ON test.users.id = test.addresses.user_id GROUP BY test.users.id 
HAVING count(test.addresses.email_address) > %(count_1)s


## SQLAlchemy的使用

### schema操作

#### create schema

In [51]:
from sqlalchemy.schema import CreateSchema

schema_name = 'test2'
obj = CreateSchema(schema_name)
engine.execute(obj)

print_sql(engine,obj)

Compiled SQL
CREATE SCHEMA test2


如果新建已经存在的schema,会出现ProgrammingError的错误。

In [52]:
try:
    engine.execute(obj)
except Exception as e:
    print(type(e),e)

<class 'sqlalchemy.exc.ProgrammingError'> (psycopg2.ProgrammingError) schema "test2" already exists
 [SQL: 'CREATE SCHEMA test2']


### drop schema

In [53]:
from sqlalchemy.schema import DropSchema

schema_name = 'test2'
obj = DropSchema(schema_name, cascade = True)
engine.execute(obj)

print_sql(engine,obj)

Compiled SQL
DROP SCHEMA test2 CASCADE


同样, 如果删除已经不存在的schema,会报ProgrammingError

In [54]:
try:
    engine.execute(obj)
except Exception as e:
    print(type(e),e)

<class 'sqlalchemy.exc.ProgrammingError'> (psycopg2.ProgrammingError) schema "test2" does not exist
 [SQL: 'DROP SCHEMA test2 CASCADE']


### table操作

#### define ｔａble
定义SQLAlchemy的数据表，主要参数是提供table名,schema名以及相关的column的名称,类型，是否是primary_key等信息。

In [55]:
from sqlalchemy import Table, Column, Integer, String, MetaData, ForeignKey, Sequence
from sqlalchemy.schema import CreateTable

schema='test'
table_name = 'users'

metadata = MetaData()

table = Table(
    table_name,
    metadata,
    Column('id',Integer,primary_key=True),
    Column('name',String), 
    Column('fullname',String,     
    schema = schema
)

SyntaxError: unexpected EOF while parsing (<ipython-input-55-5605527d8371>, line 16)

如果是数据库中已经存在的表,可以直接使用autoload功能从数据库中读取表的列信息，可以免去很多麻烦。

In [56]:
table = Table(table_name, metadata, schema = schema, autoload=True, autoload_with=engine)

NameError: name 'Table' is not defined

In [57]:
table.c.values()

NameError: name 'table' is not defined

**注意**

如果table中定义了foreign key信息,autoload得到的表无法用于join是自动匹配foreign key，暂时没找到解决方法。(见join章节的演示)

#### create table
如果是数据库中不存在的表，在定义了table后，需要需要执行下面的语句才能在数据库中建立这张表。

先清空数据库

In [58]:
from helper import clear_db
clear_db(engine)

新建表

In [59]:
obj = CreateTable(table)
engine.execute(obj)

print_sql(engine,obj)

NameError: name 'CreateTable' is not defined

SQLAlchemy会根据数据库的类型，将String等列类型信息转化成数据库中对应的信息，例如Oracle中的VARCHAR2。

注意，不同的数据库对于configs的要求会不同。例如，postgresql只需要写String,不需要指定长度；而Oracle在定义时，必须指定长度，得改成类似下面的设置才会生效。
```
    Column('id',Integer,primary_key=True),
    Column('name',String(20)), 
    Column('fullname',String(20)),     
```


同样, 如果尝试新建已经存在的表，会出错

In [60]:
try:
    engine.execute(obj)    
except Exception as e:
    print(type(e),e)

<class 'sqlalchemy.exc.ProgrammingError'> (psycopg2.ProgrammingError) schema "test2" does not exist
 [SQL: 'DROP SCHEMA test2 CASCADE']


创建表是一个常见的操作,上面的代码比较麻烦而且如果新建已经存在的表会报错，我们可以定义一个函数方面以后使用。这个函数接受table name, schema name, 和column的相关配置信息就能在数据库中建表并返回表对应的sqlalchemy object。且能妥善处理表已经存在的情况。

In [71]:
def create_table(engine,table_name,column_configs, schema = schema, metadata = None):
    from sqlalchemy.exc import ProgrammingError
    try:
        metadata = metadata or MetaData()

        columns = [
            Column(*args,**kwargs) for
            args, kwargs in column_configs
        ]

        table = Table(
            table_name,
            metadata,
            *columns,
            schema = schema
        )
        
        engine.execute(CreateTable(table))
    except ProgrammingError as e:
        if 'already exists' in str(e):
            print('Table already exists')
        else:
            raise e

使用一下刚才的函数

In [72]:
table_name = 'users'
schema = 'test'
# put mandatory parameters in [] and optional parameters in {}
column_configs = [
    (['id',Integer],{'primary_key':True}),
    (['name',String(20)],{}),
    (['fullname',String(20)],{})
]        

create_table(engine,table_name,column_configs, schema = 'test')

NameError: name 'Integer' is not defined

## drop table

drop table的处理方法和create table类似

In [None]:
from sqlalchemy.schema import DropTable

table = Table(
        table_name,
        metadata,
        schema = schema
    )

obj = DropTable(table)

print_sql(engine,obj)

不过运行的话会报错

In [None]:
users, addresses = reset_db(engine)

try:
    engine.execute(obj)
except Exception as e:
    print(type(e),e)

这是由于在定义addresses表的时候,定义了addresses的user_id是users表的foreign key,因此foreign key依赖于users表，只有Drop时指定CASCADE选项才能顺利的删除这张表。(它会删除所有依赖于users表的foreign_key),遗憾的是，我并没有在sqlalchemy中找到相关的选项启动CASCADE。

不过SQLAlchemy的一个好处是，它完全可以接受原生的SQL语句去对数据库进行操作。我们在语句中加上CASCADE和IF EXISTS来进行drop table的操作。

In [None]:
table_name = 'users'
schema = 'test'
sql = "DROP TABLE IF EXISTS {schema}.{table_name} CASCADE".format(table_name = table_name, schema = schema)

engine.execute(sql)

**提示**

根据自己的场景选择合适自己的工具即可。在构建复杂的SQL逻辑的时候,SQLAlchemy会比文本形式的SQL更容易构建和复用。但是堆drop table这样的操作,原生的SQL已经足够简单，不一定非要用SQLAlchemy。

### insert data


构造object

In [None]:
users, addresses = reset_db(engine)

ins = users.insert().values(name='jack', fullname='Jack Jones')
print_sql(engine, ins)

由于不同的sql数据库,对应的sql语句会有区别,上面的函数执行后只是在ins object中记录了待绑定的数据。　如果要预览生成的sql语句，需要传入engine或者指定dialect明确是哪种数据库。

如果希望编译后的语句把传入的数值写入SQL语句中的话，可以添加```"literal_binds": True```的参数

In [None]:
print_sql(engine,ins,compile_kwargs = {"literal_binds": True})

In [None]:
result = engine.execute(ins)

可以用result.insered_primary_key很方便的找到插入记录的id

In [None]:
result.inserted_primary_key

先用helper中的read_table验证一下数据是否被顺利插入

In [None]:
from helper import read_table
read_table(engine, users)

注意也可以在engine.execute中传入数据

In [None]:
users, addresses = reset_db(engine)

ins = users.insert()
print_sql(engine, ins)

engine.execute(ins,name='jack', fullname='Jack Jones')

read_table(engine, users)

#### insert batch data
例如从DataFrame插入数据

In [None]:
from pandas import DataFrame
df = DataFrame({'name':['Junjie','Xu'],'fullname':['Cai','Zhang']})
df

转成dict格式后插入

In [None]:
df.to_dict(orient='records')

In [None]:
ins = users.insert().values(df.to_dict(orient='records'))

engine.execute(ins)
read_table(engine,users)

注意如果要插入dict list,sqlalchemy会以list中第一条记录的key为准

In [None]:
data = [
    {'name':'Name1'},
    {'name':'Name2','fullname':'FULLNAME2'}
]
ins = users.insert().values(data)

engine.execute(ins)
read_table(engine, users)

如果第一行包含了所有的key,后面的记录key缺失的话，会直接报错。

In [None]:
try:
    data = [
        {'name':'Name3','fullname':'FULLNAME3'},
        {'name':'Name4'},    
    ]
    ins = users.insert().values(data)


    engine.execute(ins)

    print_sql(engine,ins)

    read_table(engine,users)
except Exception as e:
    print(type(e),e)

如果插入数据时会使用所有的列,那么可以简化成直接用tuple list插入数据。　但是这是就不能利用自动编号id，而是要传入时指定id。

In [None]:
data = [
    (10,'Cai','Junjie'),
    (11,'Zhang','Xu')
]
ins = users.insert().values(data)

engine.execute(ins)

read_table(engine,users)

但是传入id时指定id的话，似乎自动id的状态并不会做出相应的调整,而是继续从上次终止的地方开始，不会跳过用上面方式插入的id, 导致报错。因此这种方式适合一次性将所有数据导入数据库,之后不需要追加数据的情况。

此外还要注意一点,df.to_records()的结果尽管看上去像是list of tuple。

In [None]:
data = df.to_records()
list(data)

但是里面的数据类型是numpy.record，　SQLAlchemy并不识别

In [None]:
type(data[0])

In [None]:
try:
    ins = users.insert().values(data)
    engine.execute(ins)
except Exception as e:
    print(type(e),e)

即使修复了这个问题，依然会遇到不识别的numpy数据类型

In [None]:
data = [tuple(r) for r in data]
type(data[0])

In [None]:
try:
    ins = users.insert().values(data)
    engine.execute(ins)
except Exception as e:
    print(type(e),e)

所以推荐使用df.to_dict(orient = 'records')的方式传入数据

### select

### select all
选择全部的列。这里用helper中的read_select函数帮助显示结果。

In [94]:
from helper import read_select

users, addresses = reset_db(engine)

s1 = users.select()

print_sql(engine,s1)

read_select(engine,s1)

Compiled SQL
SELECT test.users.id, test.users.name, test.users.fullname 
FROM test.users


Unnamed: 0,id,name,fullname
0,1,jack,Jack Jones
1,2,wendy,Wendy Williams


注意可以用下面方法得到选择后的列名

In [95]:
s1.c.keys()

['id', 'name', 'fullname']

另一种稍繁琐但是更通用的方法

In [96]:
from sqlalchemy import select

s2 = select([users])

print_sql(engine,s2)

read_select(engine,s2)

Compiled SQL
SELECT test.users.id, test.users.name, test.users.fullname 
FROM test.users


Unnamed: 0,id,name,fullname
0,1,jack,Jack Jones
1,2,wendy,Wendy Williams


### select columns
可以在select的list中指定需要的列

In [None]:
s = select(
    [
        users.c.id,
        users.c.fullname,
    ]
)
print_sql(engine,s)

read_select(engine,s)

### limit and offset

例如用offset跳过1行，并且用limit只显示2行

In [109]:
s = (
    select(
        [
            addresses.c.id,
            addresses.c.email_address,
        ]
    ).offset(1)
    .limit(2)
)


print_sql(engine,s, compile_kwargs={"literal_binds": True})

read_select(engine,s)

Compiled SQL
SELECT test.addresses.id, test.addresses.email_address 
FROM test.addresses 
 LIMIT 2 OFFSET 1


Unnamed: 0,id,email_address
0,2,jack@msn.com
1,3,www@www.org


### select as scalar

如果select的结果是1行1列,可以将这个select指定成scalar类型,然后就可以像普通的Column一样用于其他的select语句。

In [None]:
scalar = (
    select(
        [
            func.count(addresses.c.id)
        ]
    ).
    where(users.c.id == addresses.c.user_id).
    as_scalar() #！！
)

print_sql(engine,s1)

s = select(
    [
        users.c.id,
        scalar #!!
    ]
)

print_sql(engine,s)

read_select(engine, s)

不过上面的构造方法中SQLAlchemy采用的列名非常不友好

In [None]:
s.c.keys()

只需要将scalar替换成label就能解决这个问题

In [None]:
scalar = (
    select(
        [
            func.count(addresses.c.id)
        ]
    ).
    where(users.c.id == addresses.c.user_id)
    .label('scalar') #！！
)


s = select(
    [
        users.c.id,
        scalar #!!
    ]
)

print_sql(engine,s)

read_select(engine, s)

如果选择出的结果不是1行1列，上面的方法会报错

In [None]:
scalar = (
    select(
        [
            addresses.c.id
        ]
    ).
    as_scalar() #！！
)

print_sql(engine,s1)

s = select(
    [
        users.c.id,
        scalar #!!
    ]
)

print_sql(engine,s)

try:
    read_select(engine, s)
except Exception as e:
    print(type(e),e)

这里可以注意到,尽管定义s1的时候，定义了label,但是没有被SQLAlchemy正确的识别。可以在应用一次label解决这个问题。

In [None]:
s = select(
    [
        users.c.id,
        s1.label('scalar') #!!
    ]
)


print_sql(engine,s)

read_select(engine,s)


s2作为Column使用的话，尽管不会报错，但是编译出的语句是错误的。

In [None]:
s = select(
    [
        users.c.id,
        s2 #!!
    ]
)

print_sql(engine,s)

read_select(engine,s)


**提醒**

上面的多个例子都显示，两个Object编译出的SQL一样，并不代表两个Object的行为会一致。因此出现意外结果的时候，不能光检查编译出的SQL语句是否合理。

## Correlated query

上面的例子中，读者可能会疑问,最终编译后出现的SQL语句中,子查询中并没有出现users,为何能够顺利的被执行?

```
SELECT 
    test.users.id, 
    (
        SELECT 
            count(test.addresses.id) AS scalar 
        FROM
            test.addresses #From 语句并没有出现test.users
        WHERE
            test.users.id = test.addresses.user_id #where中出现了test.users
    ) AS scalar 
FROM test.users
```

In [76]:
scalar = (
    select(
        [
            func.count(addresses.c.id)
        ]
    ).
    where(users.c.id == addresses.c.user_id)
    .label('scalar') #！！
)

s = select(
    [
        users.c.id,
        scalar #!!
    ]
)

print_sql(engine,s)

read_select(engine, s)

Compiled SQL
SELECT test.users.id, (SELECT count(test.addresses.id) AS count_1 
FROM test.addresses 
WHERE test.users.id = test.addresses.user_id) AS scalar 
FROM test.users


Unnamed: 0,id,scalar
0,1,2
1,2,2
2,3,0


这里的关键在于,如果子sub query中找不到相关的table，那么就会自动去和外层select中的表去关联。在这里例子里，如果外层的user是id为1的user的话,sub query中的user也是id为1的user,sub query起到了找出id为1的user有多少个email_address的效果。因此，这个查询实际上相当于起到了对所有user进行遍历，并选出这些user的id和email_address个数的效果。为了方便理解，可以想象有一个接受user id,返回user对应email_address个数的函数。上面的语句相当于。

```
    select 
        user.id,
        count_email(user.id) #例子中的sub query等价接受user.id，返回邮件个数的函数
    from 
        users
```

可以通过correlate, correlate_except指定或排除sub query中需要关联的外层的table范围。例如通过```correlate(None)```或者```correlate_except(users)```禁止sub query关联外层的users

In [78]:
scalar = (
    select(
        [
            func.count(addresses.c.id)
        ]
    ).
    where(users.c.id == addresses.c.user_id)
    .correlate(None) #.correlate_except(users)在这个例子里也能起到同样的效果
    .label('scalar') #！！
)

s = select(
    [
        users.c.id,
        scalar #!!
    ]
)

print_sql(engine,s)

read_select(engine, s)

Compiled SQL
SELECT test.users.id, (SELECT count(test.addresses.id) AS count_1 
FROM test.addresses, test.users 
WHERE test.users.id = test.addresses.user_id) AS scalar 
FROM test.users


Unnamed: 0,id,scalar
0,1,4
1,2,4
2,3,4


可以看到上面的例子里，sub query中自动在From语句中补充了users，成为完成的select语句，结果也变成了固定的4

## join lateral

In [93]:
from sqlalchemy import true

sub_query = (
    select(
        [
            func.count(addresses.c.id).label('count')
        ]
    ).
    where(users.c.id == addresses.c.user_id)
    .lateral('sub_query') #！！用lateral()代替常规的label()
)

s = select(
    [
        users.c.id,
        sub_query.c.count
    ]
).select_from(
    users.join(
        sub_query,
        true() #用这个代替常规的on条件
    )
)

print_sql(engine,s)

read_select(engine, s)

Compiled SQL
SELECT test.users.id, sub_query.count 
FROM test.users JOIN LATERAL (SELECT count(test.addresses.id) AS count 
FROM test.addresses 
WHERE test.users.id = test.addresses.user_id) AS sub_query ON true


Unnamed: 0,id,count
0,1,2
1,2,2
2,3,0


join lateral语句实际上也起到了遍历users,取出id和对应email_address个数的功能。

```
    SELECT 
        test.users.id, sub_query.count 
    FROM 
        test.users #被sub query关联的表
            JOIN LATERAL 
        (
            SELECT
                count(test.addresses.id) AS count 
            FROM
                test.addresses
            WHERE
                test.users.id = test.addresses.user_id 
                #users关联了join lateral左边的test.users
        ) AS sub_query ON true
```
这段代码等价于

```
    SELECT 
        sub_query.id, sub_query.count 
    FROM
        (
            select
                users.id as id,
                email_count(user.id) as count #email_count是假想的函数
            from users
        ) AS sub_query
```



## rename column
用label实现

In [77]:
s = select(
    [
        users.c.id.label('user_id'),
        users.c.name.label('user_name'),
    ]
)

print_sql(engine,s)

read_select(engine,s)

Compiled SQL
SELECT test.users.id AS user_id, test.users.name AS user_name 
FROM test.users


Unnamed: 0,user_id,user_name
0,1,jack
1,2,wendy
2,3,jack


### add constant in selection

In [None]:
from sqlalchemy import literal, text,literal_column

s = select(
    [
        users.c.id.label('user_id'),
        literal('AAAAAA').label('constant'),
        literal(None).label('null')        
    ]
)

print_sql(engine,s)

read_select(engine,s)

### function
可以使用func.func_name的形式应用函数,使用的时候只需要导入func模块,接上数据库中的函数名即可

In [None]:
from sqlalchemy import func

s = select(
    [
        users.c.id.label('user_id'),
        func.upper(users.c.name).label('user_name'),
    ]
)

print_sql(engine,s)

read_select(engine,s)

注意应用函数的时候，label要放在在函数之外使用，否则是无效的，这是一个容易犯的错误。可以看到下面的例子里，name列采用了自动命名。

In [None]:
from sqlalchemy import func

s = select(
    [
        users.c.id.label('user_id'),
        func.upper(users.c.name.label('user_name')),
    ]
)

print_sql(engine,s)

read_select(engine,s)

In [None]:
from sqlalchemy import func

s = select(
    [
        func.coalesce(users.c.id, 0).label('test')
    ]
)

print_sql(engine,s)

read_select(engine,s)

In [None]:
from sqlalchemy import func

s = select(
    [
        func.substr(users.c.fullname, 2,5).label('test')
    ]
)

print_sql(engine,s)

read_select(engine,s)

### window function

只需要添加over即可

In [None]:
s = select([
        users.c.id,
        users.c.name,
        func.row_number().over(
                order_by=users.c.name,
                ).label('num')
    ])

print_sql(engine,s)

read_select(engine,s)

In [None]:
from sqlalchemy import case

s = select(
    [
        case(
            [
                (users.c.id == 1, 'A'),
                (users.c.id == 3, 'C'),
            ],
            else_='B'
        ).label('case_test')
    ]
)

print_sql(engine,s)

read_select(engine,s)

### Order by

默认是升序

In [None]:
s = select([users.c.name]).order_by(users.c.name)
print_sql(engine,s)
read_select(engine,s)

如果要控制升序降序的话

In [None]:
s = select([users.c.name]).order_by(users.c.name.desc())
print_sql(engine,s)
read_select(engine,s)

In [None]:
from sqlalchemy import desc
s = select([users.c.name]).order_by(desc(users.c.name))
print_sql(engine,s)
read_select(engine,s)

### Group by

In [None]:
s = (
    select(
            [
                users.c.name,
                func.count(addresses.c.id).label('count')
            ]
        ) 
        .select_from(users.join(addresses))
        .group_by(users.c.name)      

)
print_sql(engine,s)
read_select(engine,s)

# having

In [None]:
s = (
    select(
            [
                users.c.name,
                func.count(addresses.c.id).label('count')
            ]
        )
        .select_from(users.join(addresses))
        .group_by(users.c.name)
        .having(func.count(addresses.c.id)>1)
)
print_sql(engine,s)
read_select(engine,s)

## Set operations

In [None]:
from sqlalchemy.sql import union, union_all, except_, except_all, intersect,intersect_all

s = union_all(
    addresses.select().where(addresses.c.email_address == 'foo@bar.com').alias(),
    addresses.select().where(addresses.c.email_address.like('%@yahoo.com')).alias(),
)

print_sql(engine,s)
read_select(engine,s)

```union_all, except_, except_all, intersect,intersect_all```同理，这里不再演示了

### where

注意下面的select中,table和column混合使用。

In [None]:
s = select(
    [
        users,
        addresses.c.user_id,
        addresses.c.email_address,        
    ]
).where(users.c.id == addresses.c.user_id)

print_sql(engine,s)
read_select(engine,s)

条件表达式本身也是object

In [None]:
s = users.c.id == addresses.c.user_id

可以观察转成string以后的效果

In [None]:
print_sql(engine,s)

也可以利用其他的条件。这里实际上sqlalchemy将object中的```__eq__```特殊函数覆盖掉了,因此==的含义发生了改变

其他常用的条件表达方式

In [None]:
print_sql(engine,users.c.id > addresses.c.user_id)

In [None]:
print_sql(engine,users.c.id != addresses.c.user_id)

In [None]:
print_sql(engine,users.c.id == None)

有些条件需要用object method实现，例如表现SQL中的IN的时候，这样是不行的

In [None]:
(users.c.id in [1,2,3])

而是应该用object本身提供的函数in_

In [99]:
s = users.c.id.in_([1,2,3])
print_sql(engine,s,compile_kwargs={"literal_binds": True})

Compiled SQL
test.users.id IN (1, 2, 3)


类似的

In [None]:
s = users.c.id.between(1,3)
print_sql(engine,s,compile_kwargs={"literal_binds": True})

字符串匹配like

In [None]:
s = users.c.name.like('C%')
print_sql(engine,s,compile_kwargs={"literal_binds": True})

### concatenate string

In [None]:
s = users.c.name+users.c.fullname
print_sql(engine,s,compile_kwargs={"literal_binds": True})

### math add

In [None]:
s = users.c.id+addresses.c.id
print_sql(engine,s,compile_kwargs={"literal_binds": True})

### special operator

如果有一些非常规的operator，总是可以用.op和字符串去实现

In [None]:
s = users.c.id.op('special_operator')('foo')
print_sql(engine,s,compile_kwargs={"literal_binds": True})

### logic conjunction

In [None]:
from sqlalchemy.sql import and_, or_, not_

s = and_(
    users.c.name.like('j%'),
    users.c.id == addresses.c.user_id,
    or_(
        addresses.c.email_address == 'wendy@aol.com',
        addresses.c.email_address == 'jack@yahoo.com'
    ),
    not_(users.c.id > 5)
)

print_sql(engine,s,compile_kwargs={"literal_binds": True})

连续多个where连用也可以起到and的效果

In [None]:
s=(select([users])
.where(users.c.name.like('j%'))
.where(users.c.id == addresses.c.user_id)
)

print_sql(engine,s,compile_kwargs={"literal_binds": True})

也可以用python的&,|,~等逻辑连接符号代替and_(), or_(), not_, 不过如果是要实现比较复杂的条件,这种方法从可读性上并没有什么优势

In [None]:
s = (
    users.c.name.like('j%') &
    users.c.id == addresses.c.user_id &
    (
        (addresses.c.email_address == 'wendy@aol.com') |
        (addresses.c.email_address == 'jack@yahoo.com')
    ) &
    (~(users.c.id > 5))
)
print_sql(engine,s,compile_kwargs={"literal_binds": True})

## join

如果定义表的时候，定义了users.id是addresses.user_id的外键,可以省略on部分

In [73]:
s = users.join(addresses)
print_sql(engine,s,compile_kwargs={"literal_binds": True})

Compiled SQL
test.users JOIN test.addresses ON test.users.id = test.addresses.user_id


**注意**

用metadata的create_all()方式新建表才能正确建立foreign key, auto_load得到的表无法找到正确的foreign_key

In [74]:
metadata = MetaData()
addresses = Table('addresses', metadata, schema = 'test', autoload=True, autoload_with=engine)
addresses.c.values()

NameError: name 'MetaData' is not defined

看上去定义了foreign key的信息,但是运行的话会出错

In [None]:
try:
    s = users.join(addresses)
except Exception as e:
    print(type(e),e)

如果要将join语句用于实际的选择的话，可以用

In [None]:
users, addresses = reset_db(engine)

In [None]:
s = (
    select(
        [
            users.c.id, 
            addresses.c.email_address
        ]
    )
    .select_from(users.join(addresses))
    .where(users.c.id==1)    
)

print_sql(engine,s,compile_kwargs={"literal_binds": True})

read_select(engine,s)

如果要使用left outer join，把join换成outerjoin

In [None]:
s = (
    select(
        [
            users.c.id, 
            addresses.c.email_address
        ]
    )
    .where(users.c.id==1)
    .select_from(users.outerjoin(addresses))
)

print_sql(engine,s,compile_kwargs={"literal_binds": True})

read_select(engine,s)

如果要使用outer join，把join换成outerjoin,并且加上参数```full = True```

In [None]:
s = (
    select(
        [
            users.c.id, 
            addresses.c.email_address
        ]
    )
    .where(users.c.id==1)
    .select_from(
        users.outerjoin(addresses,full = True)
    )
)

print_sql(engine,s,compile_kwargs={"literal_binds": True})

read_select(engine,s)

要把一个子查询结果像一张表那样被使用于join中时,需要使用alias()给子查询命名。

In [None]:
email_count = (
    select(
        [
            addresses.c.email_address,
            func.count(addresses.c.email_address).label('count')            
        ]
    ).group_by(
        addresses.c.email_address
    )
).alias('email_count')

print_sql(engine, email_count)


s = (
    select(
        [
            addresses.c.email_address,
            email_count.c.count
        ]
    )
    .select_from(
        email_count.outerjoin(addresses,email_count.c.email_address == addresses.c.email_address)
    )
)

print_sql(engine,s,compile_kwargs={"literal_binds": True})

read_select(engine,s)

使用SQLAlchemy的时候，由于可以通过python变量名来找到正确的查询，因此并不一定要去指定命名，SQLAlchemy会自动添加命名。下面的例子里，SQLAlchemy采用了自动命名anon_1

In [None]:
email_count = (
    select(
        [
            addresses.c.email_address,
            func.count(addresses.c.email_address).label('count')            
        ]
    ).group_by(
        addresses.c.email_address
    )
).alias()

print_sql(engine, email_count)


s = (
    select(
        [
            addresses.c.email_address,
            email_count.c.count
        ]
    )
    .select_from(
        email_count.outerjoin(addresses,email_count.c.email_address == addresses.c.email_address)
    )
)

print_sql(engine,s,compile_kwargs={"literal_binds": True})

read_select(engine,s)

## Update

最基本的update，不指定任何条件，相当于对所有的row做了遍历。

In [113]:
stmt = (
    users.update().
            values(fullname="Fullname: " + users.c.name)
    )

print_sql(engine, stmt, compile_kwargs={"literal_binds": True})

engine.execute(stmt)

s = users.select()

read_select(engine,s)

Compiled SQL
UPDATE test.users SET fullname=('Fullname: ' || test.users.name)


Unnamed: 0,id,name,fullname
0,1,jack,Fullname: jack
1,2,wendy,Fullname: wendy


选择单行并进行遍历

In [114]:
stmt = (
    users.update().
    where(users.c.name == 'jack').
    values(name='ed')
)


print_sql(engine, stmt, compile_kwargs={"literal_binds": True})

engine.execute(stmt)

s = users.select()

read_select(engine,s)

Compiled SQL
UPDATE test.users SET name='ed' WHERE test.users.name = 'jack'


Unnamed: 0,id,name,fullname
0,2,wendy,Fullname: wendy
1,1,ed,Fullname: jack


### 带参数的SQL逻辑

如果希望生成的SQL逻辑支持参数,有两种实现方式。
第一种是用函数生成SQL逻辑，用函数参数实现参数效果，例如

In [None]:
def selector(user_id):
    return users.select().where(users.c.id == user_id)

print_sql(engine,selector(1),compile_kwargs={'literal_binds':True})
print_sql(engine,selector(2),compile_kwargs={'literal_binds':True})

上面这种方式每次运行函数的时候都会构建新的SQLAlchemy object。

另一种方式是构建SQLAlchemy object时,用bindparam指定参数部分。 然后用```.params```绑定数值。n语言自带的参数就可以了

In [None]:
from sqlalchemy.sql import bindparam
s = users.select().where(users.c.id == bindparam('user_id'))

s = s.params({'user_id':1})

print_sql(engine,s,compile_kwargs={'literal_binds':True})

read_select(engine,s)

或者在```engine.execute```时再传入数值。

In [None]:
from sqlalchemy.sql import bindparam
s = users.select().where(users.c.id == bindparam('user_id'))

print_sql(engine,s)


read_select(engine,s,user_id=1) #read_select会将user_id=1传给engine.execute

上面这种方式, obj只会生成一次, 在性能上会有一些优势。

有些场景下，需要指定变量类型，帮助sqlalchemy正确的编译语句。下面的例子里,即使绑定了string类型的数据，```+```依然没能正确的编译

In [None]:
from sqlalchemy import text

s = users.select(users.c.name.like(bindparam('username') + text("'%'")))

s = s.params({'username':'jack'})

print_sql(engine,s,compile_kwargs={'literal_binds':True})


try:
    read_select(engine, s)
except Exception as e:
    print(type(e),e)

这时候，需要主动在bindparam中通过type_指定数据类型，帮助SQLAlchemy正确的编译

In [None]:
from sqlalchemy import text,String

s = users.select(users.c.name.like(bindparam('username',type_=String) + text("'%'")))

s = s.params({'username':'jack'})

print_sql(engine,s,compile_kwargs={'literal_binds':True})

read_select(engine, s)

利用这个技巧可以只写对于单个row的update逻辑，但是传入参数list实现多行的update。(不确定是否是atomic operation)

In [124]:
from sqlalchemy import bindparam

users, addresses = reset_db(engine)


stmt = (
    users.update().
    where(users.c.name == bindparam('oldname')).
    values(name=bindparam('newname'))
)

data =  [
    {'oldname':'jack', 'newname':'ed'},
    {'oldname':'wendy', 'newname':'mary'},
    {'oldname':'jim', 'newname':'jake'}
]   

engine.execute(stmt,data)


print_sql(engine, stmt)

engine.execute(stmt,data)

s = users.select()

read_select(engine,s)

Compiled SQL
UPDATE test.users SET name=%(newname)s WHERE test.users.name = %(oldname)s


Unnamed: 0,id,name,fullname
0,1,ed,Jack Jones
1,2,mary,Wendy Williams


不过update语句的参数数值只能在execute阶段传入，不能使用params()传入

In [143]:
from sqlalchemy import bindparam

users, addresses = reset_db(engine)


stmt = (
    users.update().
    where(users.c.name == bindparam('oldname')).
    values(name=bindparam('newname'))
)

data =  {'oldname':'jack', 'newname':'ed'}

try:
    stmt.params(data)
except Exception as e:
    print(type(e),e)

<class 'NotImplementedError'> params() is not supported for INSERT/UPDATE/DELETE statements. To set the values for an INSERT or UPDATE statement, use stmt.values(**parameters).


用其它的sub query的数据更新数据。这里需要添加一个where条件去定义原来的两张表之间的行是怎么对应。在下面的例子里，我们用每个user的lexicographical order靠前的email_addresses去替换用户的name

In [184]:
from sqlalchemy import bindparam,func

users, addresses = reset_db(engine)

first_addresses = (
    select(
        [
            addresses.c.user_id.label('user_id'),
            func.min(addresses.c.email_address).label('name')
        ]
    ).group_by(
        addresses.c.user_id
    )
).alias('first_address')


s = (
    users
    .update()
    .values(
        {
            users.c.name:first_addresses.c.name
        }

    ).where(
        first_addresses.c.user_id == users.c.id
    )
)

engine.execute(s)

print_sql(engine, s)

s1 = users.select()

read_select(engine,s1)

Compiled SQL
UPDATE test.users SET name=first_address.name FROM (SELECT test.addresses.user_id AS user_id, min(test.addresses.email_address) AS name 
FROM test.addresses GROUP BY test.addresses.user_id) AS first_address WHERE first_address.user_id = test.users.id


Unnamed: 0,id,name,fullname
0,1,jack@msn.com,Jack Jones
1,2,wendy@aol.com,Wendy Williams


也可以使用correlated的方式取进行update。

In [175]:
from sqlalchemy import bindparam,func

users, addresses = reset_db(engine)

first_address = (
    select(
        [
            func.min(addresses.c.email_address).label('name')
        ]
    ).group_by(
        addresses.c.user_id
    ).where(
        addresses.c.user_id == users.c.id
    )
)


s = (
    users
    .update()
    .values(   
        {
            users.c.name:first_address
        }
    )
)

engine.execute(s)

print_sql(engine, s)

s1 = users.select()

read_select(engine,s1)

Compiled SQL
UPDATE test.users SET name=(SELECT min(test.addresses.email_address) AS name 
FROM test.addresses 
WHERE test.addresses.user_id = test.users.id GROUP BY test.addresses.user_id)


Unnamed: 0,id,name,fullname
0,1,jack@msn.com,Jack Jones
1,2,wendy@aol.com,Wendy Williams


不过这里不需要添加.scalar（）

用correlated subquery进行update的时候,如果返回多行，sql会报错。

In [181]:
from sqlalchemy import bindparam,func

users, addresses = reset_db(engine)

first_address = (
    select(
        [
            addresses.c.email_address
        ]
    ).where(
        addresses.c.user_id == users.c.id
    )
)


s = (
    users
    .update()
    .values(   
        {
            users.c.name:first_address
        }
    )
)


try:
    engine.execute(s)
except Exception as e:
    print(type(e),e)

<class 'sqlalchemy.exc.ProgrammingError'> (psycopg2.ProgrammingError) more than one row returned by a subquery used as an expression
 [SQL: 'UPDATE test.users SET name=(SELECT test.addresses.email_address \nFROM test.addresses \nWHERE test.addresses.user_id = test.users.id)']


如果sub query返回的结果不止一条的话，也不会出错。选出了每个id的第一行email作为实际使用的值。

In [149]:
from sqlalchemy import bindparam,func

users, addresses = reset_db(engine)

s = (
    users
    .update()
    .values(
        {
            users.c.name:addresses.c.email_address
        }

    ).where(
        addresses.c.user_id == users.c.id
    )
)

engine.execute(s)

print_sql(engine, s)

s1 = users.select()

read_select(engine,s1)

Compiled SQL
UPDATE test.users SET name=test.addresses.email_address FROM test.addresses WHERE test.addresses.user_id = test.users.id


Unnamed: 0,id,name,fullname
0,1,jack@yahoo.com,Jack Jones
1,2,www@www.org,Wendy Williams


## 使用文本SQL
sqlalchemy中一个sql逻辑的每一个组件都是用python object来定义的, 如果有时候我们希望用用文本形式的sql直接转化成SQLAlchemy的obj。例如下面两种场景。

* 已经存在现成的sql代码片段，不想用SQLAlchemy重写
* 遇到SQLAlchemy无法表达，只有原生的SQL能表达的场景

如果有完整的SQL语句,直接传入SQL语句即可。

In [None]:
s = 'select users.id, users.name, users.fullname from test.users'

engine.execute(s).fetchall()

如果有待定参数的SQL语句,直接传入执行的话会失败。例如下面的语句是SQLAlchemy可识别的带参数的语句```:id```是名为id的参数。在传入实际的参数前,这个语句是不完整的，如果直接传入engine.execute的话，会出错。

In [None]:
s = 'select users.id, users.name, users.fullname from test.users where users.id=:user_id'

try:
    engine.execute(s).fetchall()
except Exception as e:
    print(type(e),e)

这时可以用text处理并且用bindparams函数绑定数据

In [None]:
s = 'select users.id, users.name, users.fullname from test.users where users.id=:user_id'

s = text(s).bindparams(user_id=1)


print_sql(engine,s,compile_kwargs={'literal_binds':True})
engine.execute(s).fetchall()


也可以在execute阶段传入数据

In [None]:
s = 'select users.id, users.name, users.fullname from test.users where users.id=:user_id'

s = text(s)


print_sql(engine,s)
engine.execute(s,user_id = 1).fetchall()


除了用文本定义大段的SQL逻辑外，也可以用文本SQL的片段去定义部分的SQLAlchemy逻辑。注意最外层的结构是sqlalchemy.sql.selectable.Select,绑定数据的时候用调用的method是params而不是bindparams

In [None]:
s = (
    select(
            [
               text("users.fullname || ', ' || addresses.email_address AS title")
            ]
        ).where(
            and_(
                text("users.id = addresses.user_id"),
                text("users.name BETWEEN 'm' AND 'z'"),
                text(
                    "(addresses.email_address LIKE :x "
                    "OR addresses.email_address LIKE :y)")
                )
        ).select_from(
            text('test.users, test.addresses')
        )
)

s = s.params({'x':'%@aol.com', 'y':'%@msn.com'})

print_sql(engine, s)

engine.execute(s).fetchall()

如果用文本定义的SQL片段是table,和column, 可以用literal_column, table代替text去处理文本SQL。

In [None]:
from sqlalchemy import literal_column, String,table,literal

users = table('users')
users.schema = 'test'

s = select(
    [
        literal_column('users.id').label('id'),
        (literal('=<')+literal_column('users.fullname',type_ = String)+literal('>=')).label('name')        
    ]
).select_from (
    users
)

print_sql(engine,s1)

read_select(engine,s1)

注意schema不能在构造table时以字符串传入，否则生成的语句会错误

In [None]:
from sqlalchemy import literal_column, String,table,literal


s = select(
    [
        literal_column('users.id').label('id'),
        (literal('=<')+literal_column('users.fullname',type_ = String)+literal('>=')).label('name')        
    ]
).select_from (
    table('test.users')
)

print_sql(engine,s)

try:
    read_select(engine,s)
except Exception as e:
    print(type(e),e)

用literal_column和table相比text,构造出的object能够更好的被SQLAlchemy支持。看下面的例子。

In [None]:
users, _ = reset_db(engine)
s1 = select(
    [
        users.c.id,
        text('users.fullname AS name')
    ]
)

print_sql(engine,s1)

s2 = select(
    [
        users.c.id,
        literal_column('users.fullname').label('name')
    ]
)

print_sql(engine,s2)


尽管编译出的语句是一样的，但是观察SQLAlchemy识别出的column names,发现SQLAlchemy无法识别text构造的列。

In [None]:
print(s1.c.keys())
print(s2.c.keys())

因此应该优先考虑使用literal_column, table等更"专业"的构造方式。