/
kinesis-stream-shard-split-merge.sh
141 lines (97 loc) · 3.07 KB
/
kinesis-stream-shard-split-merge.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# Kinesis Stream シャード増減スクリプト
# (kinesis-stream-shard-split-merge-function.sh)
# 実行例
# . ./kinesis-stream-shard-split-merge-function.sh
# STREAM_NAME='stream-1'
#
# 有効シャード確認
# get-json-active-shards ${STREAM_NAME} | jq .
#
# シャード倍増
# split-shards ${STREAM_NAME}
#
# シャード半減
# merge-shards ${STREAM_NAME}
## シャード倍増(x2)
function split-shards () {
STREAM_NAME=$1
JSON_DESC_STREAM=`get-json-active-shards ${STREAM_NAME}`
echo ${JSON_DESC_STREAM} | jq .
echo ${JSON_DESC_STREAM}| jq -r "[.ShardId, .HashKeyRange.StartingHashKey, .HashKeyRange.EndingHashKey]|@csv" | sed 's/"//g' | while read -r TMP1; do
IFS=","
set -- $TMP1
SHARD_ID=$1
STARTING_HASH=$2
ENDING_HASH=$3
IFS=$' \t\n'
# 分割地点となるハッシュ値計算
NEW_STARTING_HASH=`expr \( ${ENDING_HASH} - ${STARTING_HASH} \) / 2 + ${STARTING_HASH} + 1`
echo "STREAM_NAME: ${STREAM_NAME}"
echo "SHARD_ID: ${SHARD_ID}"
echo "NEW_STARTING_HASH: ${NEW_STARTING_HASH}"
kinesis-split-shard ${STREAM_NAME} ${SHARD_ID} ${NEW_STARTING_HASH}
done
get-json-active-shards ${STREAM_NAME} | jq .
}
## シャード半減(1/2)
function merge-shards () {
STREAM_NAME=$1
i=0
JSON_DESC_STREAM=`get-json-active-shards ${STREAM_NAME}`
echo ${JSON_DESC_STREAM} | jq .
echo ${JSON_DESC_STREAM} | jq -r '[.ShardId]|@csv'| sed 's/"//g' | sort | while read -r TMP1; do
if [ ${i} -eq 0 ] ; then
echo "$TMP1"
SHARD_TO_MERGE="${TMP1}"
i=1
else
ADJACENT_SHARD="${TMP1}"
echo "STREAM_NAME: $STREAM_NAME"
echo "SHARD_TO_MERGE: $SHARD_TO_MERGE"
kinesis-merge-shard ${STREAM_NAME} ${SHARD_TO_MERGE} ${ADJACENT_SHARD}
i=0
fi
done
get-json-active-shards ${STREAM_NAME}| jq .
}
## シャード分割
function kinesis-split-shard () {
STREAM_NAME=$1
SHARD_ID=$2
NEW_STARTING_HASH=$3
aws kinesis split-shard --stream-name ${STREAM_NAME} \
--shard-to-split ${SHARD_ID} \
--new-starting-hash-key ${NEW_STARTING_HASH}
check-stream-active ${STREAM_NAME}
}
## シャード統合
function kinesis-merge-shard () {
STREAM_NAME=$1
SHARD_TO_MERGE=$2
ADJACENT_SHARD=$3
aws kinesis merge-shards --stream-name ${STREAM_NAME} \
--shard-to-merge ${SHARD_TO_MERGE} \
--adjacent-shard-to-merge ${ADJACENT_SHARD}
check-stream-active ${STREAM_NAME}
}
## シャードステータス確認
# split-shard, merge-shard の 完了検知)
function check-stream-active () {
STREAM_NAME=$1
while :
do
sleep 1
r="`aws kinesis describe-stream --stream-name ${STREAM_NAME} | jq -r .StreamDescription.StreamStatus`"
if [ ${r} = "ACTIVE" ]; then
break
fi
done
}
## 有効シャードのみ取得
function get-json-active-shards () {
STREAM_NAME=$1
aws kinesis describe-stream --stream-name ${STREAM_NAME} \
| jq '.StreamDescription.Shards[]' \
| jq 'select(.SequenceNumberRange.EndingSequenceNumber == null)' \
| jq -c .
}