# グループ処理

データ分析の中で、グループ処理は特に重要な役割を果たします。この章では、Polarsのグループ処理に関する主要な関数を紹介し、それぞれの使い方と応用例を説明します。

以下の関数について詳しく見ていきます：

- `group_by`: データフレームを特定の列でグループ化します。
- `agg`: グループ化したデータに対して集計処理を行います。
- `over`: ウィンドウ関数を使用して、グループごとに計算を行います。
- `group_by_dynamic`: 時間ベースの動的なグループ化を実現します。
- `rolling`: ローリングウィンドウによる計算を行います。

これらの機能を活用することで、大規模なデータセットに対する高度な分析を効率的に行うことができます。

In [1]:
import polars as pl
from helper.jupyter import row

## group_by

データフレームの`group_by()`メソッドを使用して、グループ化オブジェクト`GroupBy`を作成します。`maintain_order`引数を`True`に指定することで、元のデータの順序を維持したままグループ化を行います。これにより、グループの順番は元データ中に表れる順番を使用します。

In [2]:
df = pl.DataFrame(
    dict(
        g=['A', 'B', 'C', 'A', 'B', 'B'], 
        x=[1, 2, 3, 4, 5, 6])
)
g = df.group_by('g', maintain_order=True)
type(g)

polars.dataframe.group_by.GroupBy

`GroupBy`オブジェクトの`.agg()`メソッドを使用して、複数の演算式でグループごとのデータを処理し、その結果をまとめます。一部のよく使用する機能は、`GroupBy`オブジェクトのメソッドとしても提供されます。たとえば、`.mean()`は各グループの平均値を計算し、`.first()`は各グループの最初の行を出力します。以下のプログラムは、`.mean()`と`.first()`に相当する演算式の例です。

In [3]:
row(
    g.mean(), 
    g.agg(pl.all().mean()),
    g.first(),
    g.agg(pl.all().first())
)

g,x,Unnamed: 2_level_0,Unnamed: 3_level_0
str,f64,Unnamed: 2_level_1,Unnamed: 3_level_1
g,x,Unnamed: 2_level_2,Unnamed: 3_level_2
str,f64,Unnamed: 2_level_3,Unnamed: 3_level_3
g,x,Unnamed: 2_level_4,Unnamed: 3_level_4
str,i64,Unnamed: 2_level_5,Unnamed: 3_level_5
g,x,Unnamed: 2_level_6,Unnamed: 3_level_6
str,i64,Unnamed: 2_level_7,Unnamed: 3_level_7
"""A""",2.5,,
"""B""",4.333333,,
"""C""",3.0,,
"""A""",2.5,,
"""B""",4.333333,,
"""C""",3.0,,
"""A""",1,,
"""B""",2,,
"""C""",3,,
"""A""",1,,

g,x
str,f64
"""A""",2.5
"""B""",4.333333
"""C""",3.0

g,x
str,f64
"""A""",2.5
"""B""",4.333333
"""C""",3.0

g,x
str,i64
"""A""",1
"""B""",2
"""C""",3

g,x
str,i64
"""A""",1
"""B""",2
"""C""",3


`.agg()`メソッド内の演算式が複数の値を出力する場合、結果列はリストになります。`.head()`と同じ結果を得るためには、`.explode()`を使用してリスト内のデータを縦に結合する必要があります。

In [4]:
row(
    g.head(2), 
    g.agg(pl.all().head(2)), 
    g.agg(pl.all().head(2)).explode('x')
)

g,x,Unnamed: 2_level_0
str,i64,Unnamed: 2_level_1
g,x,Unnamed: 2_level_2
str,list[i64],Unnamed: 2_level_3
g,x,Unnamed: 2_level_4
str,i64,Unnamed: 2_level_5
"""A""",1,
"""A""",4,
"""B""",2,
"""B""",5,
"""C""",3,
"""A""","[1, 4]",
"""B""","[2, 5]",
"""C""",[3],
"""A""",1,
"""A""",4,

g,x
str,i64
"""A""",1
"""A""",4
"""B""",2
"""B""",5
"""C""",3

g,x
str,list[i64]
"""A""","[1, 4]"
"""B""","[2, 5]"
"""C""",[3]

g,x
str,i64
"""A""",1
"""A""",4
"""B""",2
"""B""",5
"""C""",3


(over)=
## over

演算式の中に`.over()`を使えば、グループごとの処理も行えます。次のプログラムでは、`g`列のグループごとに`x`列の平均値を計算します。`.group_by().agg()`を使う場合は、1グループは1行になりますが、`.over()`を使用する場合は、各グループの計算結果が元データの該当する場所に反映されます。

In [5]:
df.with_columns(
    pl.col('x')
    .mean()
    .over('g')
    .alias('x_mean'),
)

g,x,x_mean
str,i64,f64
"""A""",1,2.5
"""B""",2,4.333333
"""C""",3,3.0
"""A""",4,2.5
"""B""",5,4.333333
"""B""",6,4.333333


次のプログラムでは、`g`列のグループごとに`x`列の平均値を計算し、その平均値を元に`x`列から平均値を引いた結果を新しい列に格納します。この演算でグループごとの計算結果の長さが不変の場合も、元データの該当する場所に反映されます。

In [6]:
df.with_columns(
    (pl.col('x') - pl.col('x').mean())
    .over('g')
    .alias('x_sub_mean')
)

g,x,x_sub_mean
str,i64,f64
"""A""",1,-1.5
"""B""",2,-2.333333
"""C""",3,0.0
"""A""",4,1.5
"""B""",5,0.666667
"""B""",6,1.666667


グループごとの計算結果の長さが1でも不変でもない場合、エラーになりますが、`over()`の`mapping_strategy`引数を`'join'`に設定すると、この結果を一つのリストとして元の場所に入れます。次のプログラムでは、`g`列のグループごとに`x`列の先頭2つの値を取得し、それをリストとして新しい列に格納します。

In [7]:
df.with_columns(
    pl.col('x')
    .head(2)
    .over('g', mapping_strategy='join')
    .alias('x_head')
)

g,x,x_head
str,i64,list[i64]
"""A""",1,"[1, 4]"
"""B""",2,"[2, 5]"
"""C""",3,[3]
"""A""",4,"[1, 4]"
"""B""",5,"[2, 5]"
"""B""",6,"[2, 5]"


次のプログラムでは、`g`列のグループごとに`x`列の最小値と最大値を取得し、それをリストとして新しい列に格納します。

In [8]:
df.with_columns(
    pl.col('x')
    .min()
    .append(pl.col('x').max())
    .over('g', mapping_strategy='join')
    .alias('x_min_max')
)

g,x,x_min_max
str,i64,list[i64]
"""A""",1,"[1, 4]"
"""B""",2,"[2, 6]"
"""C""",3,"[3, 3]"
"""A""",4,"[1, 4]"
"""B""",5,"[2, 6]"
"""B""",6,"[2, 6]"


`mapping_strategy`を`'explode'`に設定すると、各グループの結果を縦に結合します。これは、`group_by().agg().explode()`と同じ結果になります。次のプログラムでは、`g`列のグループごとに`x`列の先頭2つの値を取得し、各グループの結果を縦に結合します。

In [9]:
df.select(
    pl.col('g', 'x')
    .head(2)
    .over('g', mapping_strategy='explode')
)

g,x
str,i64
"""A""",1
"""A""",4
"""B""",2
"""B""",5
"""C""",3


`filter()`メソッドにも`over()`の式を使うことができます。次のプログラムでは、`g`列のグループごとに、`x`列の値がそのグループの平均値以上の行をフィルタリングして出力します。

In [10]:
df.filter(
    (pl.col('x') >= pl.col('x').mean())
    .over('g')
)

g,x
str,i64
"""C""",3
"""A""",4
"""B""",5
"""B""",6


## group_by_dynamic

`group_by_dynamic(index)`メソッドは、`index`列の値に基づいて動的にグループ化を行います。グループの範囲の開始点`start`は`start_by`、`every`、`offset`、および`index`列の最初の値`value`によって決まります。`start_by`はデフォルト値の`"window"`の場合、次のように計算されます：

`start = value - value % every + offset`

この式により、次のようなグループ範囲が設定されます：

```
[start, start + period)
[start + every, start + every + period)
...
[start + i*every, start + i*every + period)
```

たとえば、次のプログラムでは、`index`列の値に基づき、3単位の移動量で3の単位でグループ化を行い、それぞれのグループに対して集計を行います。

In [11]:
index = [4, 5, 9, 10, 11, 12, 13]
df = pl.DataFrame(dict(index=index, value=index)).set_sorted('index')

g = df.group_by_dynamic('index', every='3i', offset='0i', include_boundaries=True)
g.agg(pl.col("value"))

_lower_boundary,_upper_boundary,index,value
i64,i64,i64,list[i64]
3,6,3,"[4, 5]"
9,12,9,"[9, 10, 11]"
12,15,12,"[12, 13]"


次のプログラムでは、`index`列に基づいて、1単位の移動量で3単位ごとにグループ化を行います。

In [12]:
g = df.group_by_dynamic('index', every='1i', period='3i', offset='0i', include_boundaries=True)
g.agg(pl.col("value"))

_lower_boundary,_upper_boundary,index,value
i64,i64,i64,list[i64]
4,7,4,"[4, 5]"
5,8,5,[5]
7,10,7,[9]
8,11,8,"[9, 10]"
9,12,9,"[9, 10, 11]"
10,13,10,"[10, 11, 12]"
11,14,11,"[11, 12, 13]"
12,15,12,"[12, 13]"
13,16,13,[13]


わかりやすくするために、上の例は整数列に対してグループ処理の方法を示しましたが、実際には時系列データに対してグループ処理を行うことが多いです。時系列の場合、`every`、`period`、`offset`などの単位は次のような形式で指定します：

- `1ns`  (1ナノ秒)
- `1us`  (1マイクロ秒)
- `1ms`  (1ミリ秒)
- `1s`   (1秒)
- `1m`   (1分)
- `1h`   (1時間)
- `1d`   (1暦日)
- `1w`   (1暦週)
- `1mo`  (1暦月)
- `1q`   (1暦四半期)
- `1y`   (1暦年)

これらの単位を使って、時系列データのグループ化を柔軟に行うことができます。

## rolling

`rolling(index)`は、`index`列の値に基づいてローリングウィンドウを作成します。デフォルトでは、`offset`の値は`-period`に設定されています。ローリングウィンドウは次のようにグループ化されます：

```
(index[0] + offset, index[0] + offset + period]
...
(index[i] + offset, index[i] + offset + period]
```

次のプログラムでは、`index`列に基づいて、前の2つのデータ点を含む3点のローリングウィンドウを作成し、それに対して集計を行います。

In [13]:
g = df.rolling('index', period='3i')
g.agg(pl.col("value"))

index,value
i64,list[i64]
4,[4]
5,"[4, 5]"
9,[9]
10,"[9, 10]"
11,"[9, 10, 11]"
12,"[10, 11, 12]"
13,"[11, 12, 13]"


次のプログラムでは、`index`列に基づいて、データ点とその後の2つのデータ点を含む3点のローリングウィンドウを作成し、それに対して集計を行います。`offset`を`'-1i'`に設定することで、ウィンドウの開始位置を1インデックス分後ろにずらしています。

In [14]:
g = df.rolling('index', period='3i', offset='-1i')
g.agg(pl.col("value"))

index,value
i64,list[i64]
4,"[4, 5]"
5,[5]
9,"[9, 10, 11]"
10,"[10, 11, 12]"
11,"[11, 12, 13]"
12,"[12, 13]"
13,[13]


固定点数の移動窓で処理する場合は、`int_range()`で作成した行番号に対して`rolling`処理を行います。以下のプログラムでは、データフレームの各行に対して、前の2つの行を含む合計3行のローリングウィンドウを適用し、`A`列の最大値と`B`列の最小値を計算します。

次のコードでは、`int_range(0, pl.len())`を使って行番号を生成し、その行番号に基づいてローリングウィンドウを作成します。ウィンドウの期間は3行（`period='3i'`）で、`offset='-2i'`により、各ウィンドウは現在の行の2行前から始まります。集計では、`A`列の最大値と`B`列の最小値が計算され、結果として各行に対してこれらの統計値が追加されます。

In [15]:
df = pl.DataFrame(
    dict(
        A=[5, 3, 8, 9, 10, 1, 2, 4],
        B=[10, 20, 10, 2, 3, 5, 0, 7]
    )
)

(
    df
    .rolling(
        pl.int_range(0, pl.len()).alias('index'), 
        period='3i', 
        offset='-2i'
    )
    .agg(
        pl.col.A,
        pl.col.B,
        pl.col.A.max().alias('A_max'),
        pl.col.B.min().alias('B_min'),
    )
)

index,A,B,A_max,B_min
i64,list[i64],list[i64],i64,i64
0,"[5, 3]","[10, 20]",5,10
1,"[5, 3, 8]","[10, 20, 10]",8,10
2,"[3, 8, 9]","[20, 10, 2]",9,2
3,"[8, 9, 10]","[10, 2, 3]",10,2
4,"[9, 10, 1]","[2, 3, 5]",10,2
5,"[10, 1, 2]","[3, 5, 0]",10,0
6,"[1, 2, 4]","[5, 0, 7]",4,0
7,"[2, 4]","[0, 7]",4,0


## CookBook

### 値の変化に応じたグループ化と集計

次のプログラムでは、`g`列の値が変化するたびにグループを変え、各グループの最初の`g`の値と`v`列の値を取得します。`rle_id()`を使用して、`g`列の値が変化するたびに新しいグループIDを割り当てます。`rle_id()`は、連続する同じ値に同じIDを割り当て、値が変化するたびにIDを更新します。

In [16]:
df = pl.DataFrame(
    {
        "g": [1, 1, 2, 2, 2, 1, 1, 0],
        "v": list(range(8))
    }
)

df.group_by(
    pl.col('g').rle_id().alias('group_id')
).agg(
    pl.col('g').first(),
    pl.col('v')
)

group_id,g,v
u32,i64,list[i64]
0,1,"[0, 1]"
1,2,"[2, 3, 4]"
2,1,"[5, 6]"
3,0,[7]


### グループ毎に番号付け

c2列とc3列の値に番号を付けます。番号は0からスタート、新しい値が現れるたびに１増加します。

In [17]:
df = pl.DataFrame(
    {
        "c1": ["a", "a", "a", "a", "a", "d", "d"],
        "c2": ["b", "a", "a", "b", "c", "a", "b"],
        "c3": [2, 1, 1, 1, 1, 1, 1],
    }
)

In [18]:
(
    df
    .group_by("c2", "c3", maintain_order=True)
    .agg(
        pl.col("c2")
        .agg_groups()
        .alias("row_index")
    )
    .with_row_index()
    .explode("row_index")
    .sort(by="row_index")
)

index,c2,c3,row_index
u32,str,i64,u32
0,"""b""",2,0
1,"""a""",1,1
1,"""a""",1,2
2,"""b""",1,3
3,"""c""",1,4
1,"""a""",1,5
2,"""b""",1,6


In [19]:
df.join(
    df.select("c2", "c3")
    .unique(maintain_order=True)
    .with_row_index(),
    on=["c2", "c3"]
)

c1,c2,c3,index
str,str,i64,u32
"""a""","""b""",2,0
"""a""","""a""",1,1
"""a""","""a""",1,1
"""a""","""b""",1,2
"""a""","""c""",1,3
"""d""","""a""",1,1
"""d""","""b""",1,2


In [20]:
df.with_columns(
    pl.format("{} - {}", "c2", "c3")
    .cast(pl.Categorical)
    .to_physical()
    .alias("index")
)

c1,c2,c3,index
str,str,i64,u32
"""a""","""b""",2,0
"""a""","""a""",1,1
"""a""","""a""",1,1
"""a""","""b""",1,2
"""a""","""c""",1,3
"""d""","""a""",1,1
"""d""","""b""",1,2


In [21]:
df.with_columns(
    pl.struct("c2", "c3")
    .rank("dense")
    .cast(pl.String)
    .cast(pl.Categorical)
    .to_physical()
    .alias("index")
)

c1,c2,c3,index
str,str,i64,u32
"""a""","""b""",2,0
"""a""","""a""",1,1
"""a""","""a""",1,1
"""a""","""b""",1,2
"""a""","""c""",1,3
"""d""","""a""",1,1
"""d""","""b""",1,2
