# 分析窗口函数
在实际的分析中，经常需要知道当前记录在一个固定窗口内的同一分组中的相关信息，比如排名，占分组累计值比。早期MySQL中完成这类工作往往需要通过多次关联组合来实现。随着大数据的技术发展，以及此类需求是如此的常见，目前大量的SQL引擎都引入一种叫窗口函数的东西来满足此类需求。比如Hive, Presto, ClickHouse, Doris，甚至MySQL8.0开始也支持窗口函数。可参考[Presto窗口函数](https://prestodb.io/docs/current/functions/window.html)了解更多信息，以下则是对一些常用的窗口函数做一个简单梳理。


In [1]:
# 准备工作
import numpy  as np
import pandas as pd
#连接presto库
from sqlalchemy import *
from sqlalchemy.engine import create_engine
from sqlalchemy.schema import *

# 本地则使用代理，默认socks5端口号为18888
import os
if os.environ.get('JUPYTERENV') != 'release':
    import socks
    import socket
    socks.set_default_proxy(socks.SOCKS5, "localhost", 18888)
    socket.socket = socks.socksocket

presto_engine = create_engine('presto://192.168.28.111:10080/hive/dwd')

## SUM、AVG、MIN、MAX
用于实现分组内所有和连续累积的统计

### SUM
以下对SQL中的各值做个简要描述，核心还是对着SQL仔细领悟
```
SUM(imp) OVER (PARTITION BY user_id ORDER BY ctime ROWS BETWEEN x AND y)
```

以上为一个完整语法描述
- 如果不指定ROWS BETWEEN,默认为从起点到当前行;
- 如果不指定ORDER BY，则将分组内所有值累加;
- 关键是理解ROWS BETWEEN含义,也叫做WINDOW子句：
    + PRECEDING：往前
    + FOLLOWING：往后
    + CURRENT ROW：当前行
    + UNBOUNDED：起点，UNBOUNDED PRECEDING 表示从前面的起点， UNBOUNDED FOLLOWING：表示到后面的终点

以下对例子SQL中每个字段做一个解释
- imp1: 分组内所有的imp累加。
- imp2: 组内从起点到当前行的imp累积。如，11点(14) = 10点(6) + 11点(8)
- imp3: 同imp2
- imp4: 分组内当前行+往前2行。如，13点(30) = 11点(8) + 12点(10) + 13点(12)
- imp5: 分组内当前行+往前2行+往后1行。如，13点(44) = 11点(8) + 12点(10) + 13点(12) + 14点(14)
- imp6: 分组内当前行+往后所有行。如，13点(26) = 13点(12) + 14点(14)

### 其他AVG，MIN，MAX，和SUM用法一样

In [2]:
# SUM例子
df = pd.read_sql('''
SELECT SUBSTR(user_id, 1, 18) AS uid
    ,  ctime
    ,  imp
    ,  SUM(imp) OVER (PARTITION BY user_id) AS imp1    --分组内所有行
    ,  SUM(imp) OVER (PARTITION BY user_id ORDER BY ctime) AS imp2  -- 默认为从起点到当前行 
    ,  SUM(imp) OVER (PARTITION BY user_id ORDER BY ctime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS imp3   -- 从起点到当前行，结果同imp2
    ,  SUM(imp) OVER (PARTITION BY user_id ORDER BY ctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)         AS imp4   -- 当前行+往前2行
    ,  SUM(imp) OVER (PARTITION BY user_id ORDER BY ctime ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING)         AS imp5   -- 当前行+往前2行+往后1行
    ,  SUM(imp) OVER (PARTITION BY user_id ORDER BY ctime ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) AS imp6   -- 当前行+往后所有行 
    ,  SUM(1)   OVER (PARTITION BY user_id) AS sk
FROM rpt.padabook_window_functions
WHERE uid_type = 'Android'
ORDER BY sk desc, ctime
LIMIT 5
''', presto_engine)
df.head(10)

Unnamed: 0,uid,ctime,imp,imp1,imp2,imp3,imp4,imp5,imp6,sk
0,61A71B2762124CA3F8,2020-03-18 10:00,6,50,6,6,6,14,50,5
1,61A71B2762124CA3F8,2021-03-18 11:00,8,50,14,14,14,24,44,5
2,61A71B2762124CA3F8,2021-03-18 12:00,10,50,24,24,24,36,36,5
3,61A71B2762124CA3F8,2021-03-18 13:00,12,50,36,36,30,44,26,5
4,61A71B2762124CA3F8,2021-03-18 14:00,14,50,50,50,36,36,14,5


In [3]:
# AVG例子
df = pd.read_sql('''
SELECT SUBSTR(user_id, 1, 18) AS uid
    ,  ctime
    ,  imp
    ,  AVG(imp) OVER (PARTITION BY user_id) AS imp1    --分组内所有行
    ,  AVG(imp) OVER (PARTITION BY user_id ORDER BY ctime) AS imp2  -- 默认为从起点到当前行 
    ,  AVG(imp) OVER (PARTITION BY user_id ORDER BY ctime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS imp3   -- 从起点到当前行，结果同imp2
    ,  AVG(imp) OVER (PARTITION BY user_id ORDER BY ctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)         AS imp4   -- 当前行+往前2行
    ,  AVG(imp) OVER (PARTITION BY user_id ORDER BY ctime ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING)         AS imp5   -- 当前行+往前2行+往后1行
    ,  AVG(imp) OVER (PARTITION BY user_id ORDER BY ctime ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) AS imp6   -- 当前行+往后所有行 
    ,  SUM(1)   OVER (PARTITION BY user_id) AS sk
FROM rpt.padabook_window_functions
WHERE uid_type = 'Android'
ORDER BY sk desc, ctime
LIMIT 5
''', presto_engine)
df.head(10)

Unnamed: 0,uid,ctime,imp,imp1,imp2,imp3,imp4,imp5,imp6,sk
0,61A71B2762124CA3F8,2020-03-18 10:00,6,10.0,6.0,6.0,6.0,7.0,10.0,5
1,61A71B2762124CA3F8,2021-03-18 11:00,8,10.0,7.0,7.0,7.0,8.0,11.0,5
2,61A71B2762124CA3F8,2021-03-18 12:00,10,10.0,8.0,8.0,8.0,9.0,12.0,5
3,61A71B2762124CA3F8,2021-03-18 13:00,12,10.0,9.0,9.0,10.0,11.0,13.0,5
4,61A71B2762124CA3F8,2021-03-18 14:00,14,10.0,10.0,10.0,12.0,12.0,14.0,5


In [4]:
# MIN例子
df = pd.read_sql('''
SELECT SUBSTR(user_id, 1, 18) AS uid
    ,  ctime
    ,  imp
    ,  MIN(imp) OVER (PARTITION BY user_id) AS imp1    --分组内所有行
    ,  MIN(imp) OVER (PARTITION BY user_id ORDER BY ctime) AS imp2  -- 默认为从起点到当前行 
    ,  MIN(imp) OVER (PARTITION BY user_id ORDER BY ctime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS imp3   -- 从起点到当前行，结果同imp2
    ,  MIN(imp) OVER (PARTITION BY user_id ORDER BY ctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)         AS imp4   -- 当前行+往前2行
    ,  MIN(imp) OVER (PARTITION BY user_id ORDER BY ctime ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING)         AS imp5   -- 当前行+往前2行+往后1行
    ,  MIN(imp) OVER (PARTITION BY user_id ORDER BY ctime ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) AS imp6   -- 当前行+往后所有行 
    ,  SUM(1)   OVER (PARTITION BY user_id) AS sk
FROM rpt.padabook_window_functions
WHERE uid_type = 'Android'
ORDER BY sk desc, ctime
LIMIT 5
''', presto_engine)
df.head(10)

Unnamed: 0,uid,ctime,imp,imp1,imp2,imp3,imp4,imp5,imp6,sk
0,61A71B2762124CA3F8,2020-03-18 10:00,6,6,6,6,6,6,6,5
1,61A71B2762124CA3F8,2021-03-18 11:00,8,6,6,6,6,6,8,5
2,61A71B2762124CA3F8,2021-03-18 12:00,10,6,6,6,6,6,10,5
3,61A71B2762124CA3F8,2021-03-18 13:00,12,6,6,6,8,8,12,5
4,61A71B2762124CA3F8,2021-03-18 14:00,14,6,6,6,10,10,14,5


In [5]:
# MAX例子
df = pd.read_sql('''
SELECT SUBSTR(user_id, 1, 18) AS uid
    ,  ctime
    ,  imp
    ,  MAX(imp) OVER (PARTITION BY user_id) AS imp1    --分组内所有行
    ,  MAX(imp) OVER (PARTITION BY user_id ORDER BY ctime) AS imp2  -- 默认为从起点到当前行 
    ,  MAX(imp) OVER (PARTITION BY user_id ORDER BY ctime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS imp3   -- 从起点到当前行，结果同imp2
    ,  MAX(imp) OVER (PARTITION BY user_id ORDER BY ctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)         AS imp4   -- 当前行+往前2行
    ,  MAX(imp) OVER (PARTITION BY user_id ORDER BY ctime ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING)         AS imp5   -- 当前行+往前2行+往后1行
    ,  MAX(imp) OVER (PARTITION BY user_id ORDER BY ctime ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) AS imp6   -- 当前行+往后所有行 
    ,  SUM(1)   OVER (PARTITION BY user_id) AS sk
FROM rpt.padabook_window_functions
WHERE uid_type = 'Android'
ORDER BY sk DESC, ctime
LIMIT 5
''', presto_engine)
df.head(10)

Unnamed: 0,uid,ctime,imp,imp1,imp2,imp3,imp4,imp5,imp6,sk
0,61A71B2762124CA3F8,2020-03-18 10:00,6,14,6,6,6,8,14,5
1,61A71B2762124CA3F8,2021-03-18 11:00,8,14,8,8,8,10,14,5
2,61A71B2762124CA3F8,2021-03-18 12:00,10,14,10,10,10,12,14,5
3,61A71B2762124CA3F8,2021-03-18 13:00,12,14,12,12,12,14,14,5
4,61A71B2762124CA3F8,2021-03-18 14:00,14,14,14,14,14,14,14,5


## NTILE,ROW_NUMBER,RANK,DENSE_RANK
用于实现分组内排名功能。
### NTILE
NTILE(n)，用于将分组数据按照顺序切分成n片，返回当前切片值。NTILE不支持ROWS BETWEEN，比如 `
NTILE(2) OVER(PARTITION BY cookieid ORDER BY createtime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW)
`。如果切片不均匀，默认增加第一个切片的分布

### ROW_NUMBER
ROW_NUMBER() 从1开始，按照顺序，生成分组内记录的序列。比如，按照pv降序排列，生成分组内每天的pv名次。相同名次会生成邻近的不同值。
ROW_NUMBER() 的应用场景非常多，再比如，获取分组内排序第一的记录;获取一个session中的第一条refer等。

### RANK
RANK() 生成数据项在分组中的排名，排名相等会在名次中留下空位

### DENSE_RANK
DENSE_RANK() 生成数据项在分组中的排名，排名相等会在名次中不会留下空位

In [6]:
# NTILE
df = pd.read_sql('''
SELECT SUBSTR(user_id, 1, 18) AS uid
    ,  ctime
    ,  imp
    ,  NTILE(2) OVER (PARTITION BY user_id ORDER BY ctime) AS rn1  -- 分组内将数据分成2片
    ,  NTILE(3) OVER (PARTITION BY user_id ORDER BY ctime) AS rn2  -- 分组内将数据分成3片
    ,  NTILE(4) OVER (ORDER BY ctime) AS rn3  -- 将所有数据分成4片
FROM rpt.padabook_window_functions
WHERE uid_type = 'Android'
ORDER BY user_id, ctime
''', presto_engine)
df.head(10)

Unnamed: 0,uid,ctime,imp,rn1,rn2,rn3
0,348DC48A8D0F5610FD,2020-03-18 10:00,9,1,1,1
1,348DC48A8D0F5610FD,2021-03-18 11:00,11,1,1,2
2,348DC48A8D0F5610FD,2021-03-18 12:00,11,2,2,3
3,348DC48A8D0F5610FD,2021-03-18 13:00,13,2,3,4
4,4CFA886BA563FD6286,2020-03-18 10:00,8,1,1,1
5,4CFA886BA563FD6286,2021-03-18 11:00,8,1,2,2
6,4CFA886BA563FD6286,2021-03-18 12:00,12,2,3,3
7,61A71B2762124CA3F8,2020-03-18 10:00,6,1,1,1
8,61A71B2762124CA3F8,2021-03-18 11:00,8,1,1,2
9,61A71B2762124CA3F8,2021-03-18 12:00,10,1,2,3


In [7]:
# ROW_NUMBER, RANK, DENSE_RANK
df = pd.read_sql('''
SELECT SUBSTR(user_id, 1, 18) AS uid
    ,  ctime
    ,  imp
    ,  ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY imp) AS rn1
    ,  RANK()       OVER (PARTITION BY user_id ORDER BY imp) AS rn2 
    ,  DENSE_RANK() OVER (PARTITION BY user_id ORDER BY imp) AS rn3
FROM rpt.padabook_window_functions
WHERE uid_type = 'Android'
ORDER BY user_id, imp
''', presto_engine)
df.head(10)

Unnamed: 0,uid,ctime,imp,rn1,rn2,rn3
0,348DC48A8D0F5610FD,2020-03-18 10:00,9,1,1,1
1,348DC48A8D0F5610FD,2021-03-18 11:00,11,2,2,2
2,348DC48A8D0F5610FD,2021-03-18 12:00,11,3,2,2
3,348DC48A8D0F5610FD,2021-03-18 13:00,13,4,4,3
4,4CFA886BA563FD6286,2020-03-18 10:00,8,1,1,1
5,4CFA886BA563FD6286,2021-03-18 11:00,8,2,1,1
6,4CFA886BA563FD6286,2021-03-18 12:00,12,3,3,2
7,61A71B2762124CA3F8,2020-03-18 10:00,6,1,1,1
8,61A71B2762124CA3F8,2021-03-18 11:00,8,2,2,2
9,61A71B2762124CA3F8,2021-03-18 12:00,10,3,3,3


## LAG, LEAD, FIRST_VALUE, LAST_VALUE
用于实现分组内前/后N行中某一列值，如可用于计算环比值
### LAG
LAG(col,n,DEFAULT) 用于统计窗口内往上第n行值，第一个参数为列名，第二个参数为往上第n行（可选，默认为1），第三个参数为默认值（当往上第n行为NULL时候，取默认值，如不指定，则为NULL）

以下对例子SQL中每个字段做一个解释
- pre_time: 指定了往上第1行的时间，默认值为'1970-01-01 00:00'
    - 348DC48A8D0F5610第一行，往上1行为NULL,因此取默认值'1970-01-01 00:00'
    - 348DC48A8D0F5610第四行，往上1行为第三行值'2021-03-18 12:00'
- pre2_imp: 指定了往上第2行的曝光值，未指定默认值
    - 348DC48A8D0F5610第二行，往上2行为NULL
    - 348DC48A8D0F5610第三行，往上2行为第二行值'9.0'


### LEAD
与LAG相反，LEAD(col,n,DEFAULT) 用于统计窗口内往下第n行值，第一个参数为列名，第二个参数为往下第n行（可选，默认为1），第三个参数为默认值（当往下第n行为NULL时候，取默认值，如不指定，则为NULL）

以下对例子SQL中每个字段做一个解释
- next_time: 指定了往下第1行的时间，默认值为'1970-01-01 00:00'
    - 348DC48A8D0F5610第一行，往下1行为第二行值'2021-03-18 11:00'
    - 348DC48A8D0F5610第四行，往下1行为NULL，因此取默认值'1970-01-01 00:00'
- next2_imp: 指定了往上第2行的曝光值，未指定默认值
    - 348DC48A8D0F5610第二行，往下2行为第四行值'13.0'
    - 348DC48A8D0F5610第三行，往下2行为NULL

### FIRST_VALUE
FIRST_VALUE(col)取分组内排序后，截止到当前行，第一个值。

以下对例子SQL中每个字段做一个解释
- first_imp: 截止到当前行，第一个值
    - 348DC48A8D0F5610的第一个值为9
    - 4CFA886BA563FD62的第一个值为8
    - 61A71B2762124CA3的第一个值为6

### LAST_VALUE
LAST_VALUE(col)取分组内排序后，截止到当前行，最后一个值。如果想要取分组内排序后最后一个值，则需要变通一下，反向排序然后使用FIRST_VALUE取第一个值。

以下对例子SQL中每个字段做一个解释
- last1_imp：截止到当前行最后一个值
    - 348DC48A8D0F5610第二行，截止当前行的最后一个值为自己11
- last2_imp：分组内排序后的最后一值
    - 348DC48A8D0F5610第二行，分组内排序最后一个值是第四行13

如果不指定ORDER BY，则默认按照记录在文件中的偏移量进行排序，会出现非预期的结果。

In [8]:
# LAG, LEAD, FIRST_VALUE, LAST_VALUE
df = pd.read_sql('''
SELECT SUBSTR(user_id, 1, 18) AS uid
    ,  ctime
    ,  imp
    ,  ROW_NUMBER() OVER(PARTITION BY user_id ORDER BY ctime) AS rn
    ,  LAG(ctime,1,'1970-01-01 00:00') OVER(PARTITION BY user_id ORDER BY ctime) AS pre_time
    ,  LEAD(ctime,1,'1970-01-01 00:00') OVER(PARTITION BY user_id ORDER BY ctime) AS next_time
    ,  LAG(imp,2) OVER(PARTITION BY user_id ORDER BY ctime) AS pre2_imp 
    ,  LEAD(imp,2) OVER(PARTITION BY user_id ORDER BY ctime) AS next2_imp
    ,  FIRST_VALUE(imp) OVER(PARTITION BY user_id ORDER BY ctime) AS first_imp
    ,  LAST_VALUE(imp) OVER(PARTITION BY user_id ORDER BY ctime) AS last1_imp
    ,  FIRST_VALUE(imp) OVER(PARTITION BY user_id ORDER BY ctime DESC) AS last2_imp
FROM rpt.padabook_window_functions
WHERE uid_type = 'Android'
ORDER BY user_id, ctime
''', presto_engine)
df.head(10)

Unnamed: 0,uid,ctime,imp,rn,pre_time,next_time,pre2_imp,next2_imp,first_imp,last1_imp,last2_imp
0,348DC48A8D0F5610FD,2020-03-18 10:00,9,1,1970-01-01 00:00,2021-03-18 11:00,,11.0,9,9,13
1,348DC48A8D0F5610FD,2021-03-18 11:00,11,2,2020-03-18 10:00,2021-03-18 12:00,,13.0,9,11,13
2,348DC48A8D0F5610FD,2021-03-18 12:00,11,3,2021-03-18 11:00,2021-03-18 13:00,9.0,,9,11,13
3,348DC48A8D0F5610FD,2021-03-18 13:00,13,4,2021-03-18 12:00,1970-01-01 00:00,11.0,,9,13,13
4,4CFA886BA563FD6286,2020-03-18 10:00,8,1,1970-01-01 00:00,2021-03-18 11:00,,12.0,8,8,12
5,4CFA886BA563FD6286,2021-03-18 11:00,8,2,2020-03-18 10:00,2021-03-18 12:00,,,8,8,12
6,4CFA886BA563FD6286,2021-03-18 12:00,12,3,2021-03-18 11:00,1970-01-01 00:00,8.0,,8,12,12
7,61A71B2762124CA3F8,2020-03-18 10:00,6,1,1970-01-01 00:00,2021-03-18 11:00,,10.0,6,6,14
8,61A71B2762124CA3F8,2021-03-18 11:00,8,2,2020-03-18 10:00,2021-03-18 12:00,,12.0,6,8,14
9,61A71B2762124CA3F8,2021-03-18 12:00,10,3,2021-03-18 11:00,2021-03-18 13:00,6.0,14.0,6,10,14


## CUME_DIST, PERCENT_RANK
不怎么常用，主要用于计算分组当前行的排序百分比，可知道当前行处于哪个百分位值
### CUME_DIST
CUME_DIST() (小于等于当前值的行数)/(分组内总行数)。比如，统计小于等于当前曝光数的人数，所占总人数的比例。

以下对例子SQL中每个字段做一个解释
- cd1: 没有partition,所有数据均为1组，总行数为5
    - 第一行：小于等于28的行数为1，因此，1/5=0.2。
    - 第三行：小于等于44的行数为3，因此，3/5=0.6
- cd2: 按照设备类型分组，uid_type='Android'的行数为3
    - 第三行：小于等于44的行数为2，因此，2/3=0.6666666666666666

### PERCENT_RANK
PERCENT_RANK() (分组内当前行的RANK值-1)/(分组内总行数-1)。应用场景不了解，可能在一些特殊算法的实现中可以用到吧

以下对例子SQL中每个字段做一个解释
- pr1: pr1 = (rn1-1) / (trn-1)
    - 第一行,(1-1)/(5-1)=0/4=0
    - 第二行,(2-1)/(5-1)=1/4=0.25
    - 第四行,(4-1)/(5-1)=3/4=0.75
- pr2: 按照设备类型分组，uid_type='Android'的行数为3
    - 第一行，(1-1)/(3-1)=0.0
    - 第三行，(2-1)/(3-1)=0.5

In [9]:
# CUME_DIST, PERCENT_RANK
df = pd.read_sql('''
SELECT uid_type
    ,  uid
    ,  imp
    ,  CUME_DIST() OVER(ORDER BY imp) AS cd1
    ,  CUME_DIST() OVER(PARTITION BY uid_type ORDER BY imp) AS cd2 
    ,  PERCENT_RANK() OVER (ORDER BY imp) AS pr1  --分组内
    ,  RANK() OVER (ORDER BY imp)         AS rn1  --分组内RANK值
    ,  SUM(1) OVER ()                     AS trn  --分组内总行数
    ,  PERCENT_RANK() OVER(PARTITION BY uid_type ORDER BY imp) AS pr2 
FROM (
    SELECT uid_type
        ,  SUBSTR(user_id, 1, 18) AS uid
        ,  SUM(imp) AS imp
    FROM rpt.padabook_window_functions
    GROUP BY 1, 2
) a
ORDER BY imp
''', presto_engine)
df.head(10)

Unnamed: 0,uid_type,uid,imp,cd1,cd2,pr1,rn1,trn,pr2
0,Android,4CFA886BA563FD6286,28,0.2,0.333333,0.0,1,5,0.0
1,iOS,161A4989-BEA6-40F7,36,0.4,0.5,0.25,2,5,0.0
2,Android,348DC48A8D0F5610FD,44,0.6,0.666667,0.5,3,5,0.5
3,iOS,83308879-DA6C-47C1,48,0.8,1.0,0.75,4,5,1.0
4,Android,61A71B2762124CA3F8,50,1.0,1.0,1.0,5,5,1.0


## 附录
测试数据建表
```SQL
create table padabook_window_functions (
      user_id  string
    , uid_type string
    , ctime    string
    , imp      int
)
ROW FORMAT DELIMITED 
    FIELDS TERMINATED BY ','
    LINES TERMINATED BY '\n'
STORED AS TEXTFILE;

LOAD DATA LOCAL INPATH '/home/da/dylanwu/padabook_window_functions.csv' OVERWRITE  INTO TABLE padabook_window_functions;
```
