Skip to content

Commit

Permalink
[bugfix] 加入对自定义处理函数的文档;
Browse files Browse the repository at this point in the history
  • Loading branch information
马钰鹏 committed Oct 14, 2018
1 parent aec7aff commit 7413c47
Show file tree
Hide file tree
Showing 9 changed files with 232 additions and 26 deletions.
Binary file modified docs/build/doctrees/environment.pickle
Binary file not shown.
Binary file modified docs/build/doctrees/hello.doctree
Binary file not shown.
81 changes: 74 additions & 7 deletions docs/build/html/_sources/hello.md.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
![Alt text](https://github.com/m358807551/images/blob/master/images/MysqlsMom.jpeg?raw=true)
![Alt text](https://github.com/m358807551/images/blob/master/images/mysqlsmom/mysqlsmom_red.png?raw=true)

## 简介

Expand Down Expand Up @@ -313,7 +313,7 @@ TASKS = [

```python
"pipeline": [
{"only_fields": {"fields": ["id", "name"]}}, # 只同步 id 和 name字段
{"only_fields": {"fields": ["id", "name"]}}, # 只同步 id 和 name字段
{"set_id": {"field": "id"}} # 然后设置 id 字段为es中文档的_id
]
```
Expand All @@ -325,7 +325,7 @@ TASKS = [
```python
"pipeline": [
# 将name重命名为name1,age 重命名为age1
{"replace_fields": {"name": ["name1"], "age": ["age1"]}},
{"replace_fields": {"name": ["name1"], "age": ["age1"]}},
{"set_id": {"field": "id"}}
]
```
Expand All @@ -335,7 +335,7 @@ TASKS = [
```python
"pipeline": [
# 先重命名 name 为 name1
{"replace_fields": {"name": ["name1"]}},
{"replace_fields": {"name": ["name1"]}},
# 再重命名 age 为 age1
{"replace_fields": {"age": ["age1"]}},
{"set_id": {"field": "id"}}
Expand All @@ -348,7 +348,7 @@ TASKS = [

```python
"pipeline": [
{"replace_fields": {"name": ["name_default", "name_raw"]}},
{"replace_fields": {"name": ["name_default", "name_raw"]}},
{"set_id": {"field": "id"}}
]
```
Expand All @@ -361,8 +361,8 @@ TASKS = [

```python
"pipeline": [
# tags 存储类似"aaa|bbb|ccc"的字符串,将 tags 字段的值按符号 `|` 切分成数组
{"split": {"field": "tags", "flag": "|"}},
# tags 存储类似"aaa|bbb|ccc"的字符串,将 tags 字段的值按符号 `|` 切分成数组
{"split": {"field": "tags", "flag": "|"}},
{"set_id": {"field": "id"}}
]
```
Expand Down Expand Up @@ -413,6 +413,73 @@ TASKS = [

### 更多示例正在更新

## 自定义处理函数

所有 row_handlers.py 文件里的函数都支持在 pipeline 里使用。

每一个同步任务都有一个 pipeline 配置,从 Mysql 取出的数据经过 pipeline 里的函数处理后传入es;每个处理函数的输入都是上一个处理函数的输出;

除此之外,可以自定义处理函数,以 binlog 同步为例:

1. 在同步配置文件的同一目录下创建文件:`my_handlers.py`
2. 修改同步的配置文件,取消倒数第二行 `# CUSTOM_ROW_HANDLERS = "./my_handlers.py"`的注释
3. 在`my_handlers.py`中编写自定义函数,写好的函数可以直接在 pipeline 中使用。

### 自定义函数的格式

`my_handlers.py `的文件内容按照如下格式

```python
# coding=utf-8
import copy

# 示例自定义函数,将 my_field 字段的值改为 my_value
def my_func(row, my_field, my_value):
"""
参数 row: 这个参数一定要有,是一个字典,存储着行信息。示例:{"id": 123, "name": "啦啦啦"}
参数 my_field: 在本例中期待传入要修改的字段名。示例:id, name
参数 my_value: 将 my_field 的值设置为 my_value
"""
new_row = copy.deepcopy(row) # 一定要有

# 中间的处理逻辑,主要自定义这部分
new_row[my_field] = my_value

return new_row # 一定要有,返回处理后的结果,来交给接下来的处理函数,或者同步到es

# 自定义函数2,如果 _id 为空,就强行指定 _id 为 "123"
def my_func2(row):
row = copy.deepcopy(row)
if not row["_id"]:
row["_id"] = "123"
return row

# 自定义函数3...

```

然后可以直接在pipeline里使用了

```python
...
"pipeline": [
{
"my_func": # 刚刚自定义的函数名
# 自定义的参数
# 其中row参数 不用写
# 将所有的 name 字段的值 改为 "Jack"
{
"my_field": "name",
"my_value": "Jack"
}
},
{"set_id": {"field": "id"}} # 设置 id 字段的值为 es 中文档 _id
],
...
```



## 常见问题

#### 为什么我的增量同步不及时?
Expand Down
1 change: 1 addition & 0 deletions docs/build/html/genindex.html
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
<li class="toctree-l1"><a class="reference internal" href="hello.html#">增量同步</a></li>
<li class="toctree-l1"><a class="reference internal" href="hello.html#">组织架构</a></li>
<li class="toctree-l1"><a class="reference internal" href="hello.html#mysqlsmom"><em>Mysqlsmom</em> 使用实战</a></li>
<li class="toctree-l1"><a class="reference internal" href="hello.html#">自定义处理函数</a></li>
<li class="toctree-l1"><a class="reference internal" href="hello.html#">常见问题</a></li>
<li class="toctree-l1"><a class="reference internal" href="hello.html#">待改进</a></li>
<li class="toctree-l1"><a class="reference internal" href="hello.html#">未完待续</a></li>
Expand Down

0 comments on commit 7413c47

Please sign in to comment.