# Impala

使用 Impala 模块需要有配置文件，该文件存放所连接数据的信息，例如 `project.yaml` 为我们项目的配置文件，其数据库信息部分内容如下:

```yaml
databases:
    impala:
        host: 1.1.1.1
        port: 12345
        user: yumingmin
        password: yumingmin
        auth_mechanism: PLAIN

    hive:
        host: 1.1.1.2
        port: 12345
        user: yumingmin
        password: yumingmin
        auth_mechanism: PLAIN
```

In [5]:
# 加载配置文件
import yaml

with open("../project.yaml", "r") as f:
    config = yaml.load(f.read(), Loader=yaml.FullLoader)

> 我们不推荐 settings.py 文件来定义项目配置信息，主要原因是在工程目录上导入 py 文件中的变量经常出现问题，不推荐新手进行使用。

> 这里推荐 `project.yaml` 或者 `project.cfg` 文件类型来存储配置信息

## 执行单个 SQL

In [7]:
from mumu.db import ImpalaRunner

In [6]:
query = """SELECT * FROM test.ddd_211015 LIMIT 10"""
runner = ImpalaRunner(config=config['databases']['impala'], sql=query)
df = runner.run_sql_block()
df.head()

INFO:root:Starting run sql:
 [1;34mSELECT * FROM test.ddd_211015 LIMIT 10[0m 

Loading: 100%|[32m██████████[0m| 100/100 [00:00<00:00, 209.92it/s]
INFO:impala.hiveserver2:Closing active operation
INFO:root:cost time: [0;32m2.765[0m seconds


Unnamed: 0,owner_id,dun_case_id,call_time
0,a,a1,1
1,a,a1,2
2,a,a1,3
3,a,a2,4
4,a,a2,5


In [8]:
query = """SELECT * FROM test.ddd_211015 LIMIT 10"""

with ImpalaRunner(config=config['databases']["impala"], sql=query) as runner:
    df = runner.run_sql_block()

df.head()

INFO:root:Starting run sql:
 [1;34mSELECT * FROM test.ddd_211015 LIMIT 10[0m 

Loading: 100%|[32m██████████[0m| 100/100 [00:00<00:00, 5215.82it/s]
INFO:impala.hiveserver2:Closing active operation
INFO:root:cost time: [0;32m2.33[0m seconds
INFO:root:DB connection has been closed!


Unnamed: 0,owner_id,dun_case_id,call_time
0,a,a1,1
1,a,a1,2
2,a,a1,3
3,a,a2,4
4,a,a2,5


## 格式化SQL参数

通常 SQL 中会带有一定的参数，比如 Impala 中可以使用 `$`, mumu 也支持这样的功能。示例代码如下：

In [10]:
query = "SELECT * FROM test.ddd_211015 WHERE 1=1 AND owner_id='${owner_id}' LIMIT 10"
context = {"owner_id": "b"}
with ImpalaRunner(config=config['databases']['impala'], sql=query, context=context) as runner:
    df = runner.run_sql_block()

df.head()

INFO:root:Starting run sql:
 [1;34mSELECT * FROM test.ddd_211015 WHERE 1=1 AND owner_id='b' LIMIT 10[0m 

Loading: 100%|[32m██████████[0m| 100/100 [00:00<00:00, 1216.84it/s]
INFO:impala.hiveserver2:Closing active operation
INFO:root:cost time: [0;32m2.199[0m seconds
INFO:root:DB connection has been closed!


Unnamed: 0,owner_id,dun_case_id,call_time
0,b,b1,1
1,b,b1,2
2,b,b2,3
3,b,b2,4
4,b,b2,5


## 不显示进度条

使用 `verbose` 参数可以控制是否显示进度条，默认是打开的，设为 `False` 则关闭进度条显示。

In [11]:
query = "SELECT * FROM test.ddd_211015 WHERE 1=1 AND owner_id='${owner_id}' LIMIT 10"
context = {"owner_id": "b"}
with ImpalaRunner(config=config['databases']['impala'], sql=query, context=context, verbose=False) as runner:
    df = runner.run_sql_block()

df.head()

INFO:root:Starting run sql:
 [1;34mSELECT * FROM test.ddd_211015 WHERE 1=1 AND owner_id='b' LIMIT 10[0m 

INFO:impala.hiveserver2:Closing active operation
INFO:root:cost time: [0;32m0.143[0m seconds
INFO:root:DB connection has been closed!


Unnamed: 0,owner_id,dun_case_id,call_time
0,b,b1,1
1,b,b1,2
2,b,b2,3
3,b,b2,4
4,b,b2,5


## 设置重跑次数，以及间隔时间

可以使用 `retry_times` 和 `sleep_time` 来设置失败重跑次数，以及重跑的间隔

In [None]:
query = "SELECT * FROM test.ddd_211015 WHERE 1=1 AND owner_id='${owner_id}' LIMIT 10"
context = {"owner_id": "b"}
with ImpalaRunner(config=config['databases']['impala'], sql=query, context=context, 
                verbose=False, retry_times=5, sleep_time=120) as runner:
    df = runner.run_sql_block()

df.head()

## 读取 SQL 文件，并执行指定 SQL 段落

将所有 SQL 存放在一个 SQL 文件中是工程上常用的一种操作，mumu 支持 SQL 段落的执行，并支持上述的所有功能

In [13]:
context = {"owner_id": "b"}
with ImpalaRunner(config=config['databases']['impala'],
                filename="./sqls/run.sql",
                context=context,
                verbose=False) as runner:
    df = runner.run_sql_block(sql_name="sql1")

df.head()

INFO:root:Starting run sql:
 [1;34m
SELECT * FROM test.ddd_211015 WHERE 1=1 AND owner_id='b' LIMIT 10
[0m 

INFO:impala.hiveserver2:Closing active operation
INFO:root:cost time: [0;32m0.692[0m seconds
INFO:root:Executed sql is null
INFO:root:cost time: [0;32m0.178[0m seconds
INFO:root:DB connection has been closed!


Unnamed: 0,owner_id,dun_case_id,call_time
0,b,b1,1
1,b,b1,2
2,b,b2,3
3,b,b2,4
4,b,b2,5


> 💡 如果不指定 SQL 文件中的段落 title，则会自上而下运行 SQL。

In [14]:
context = {"owner_id": "b"}
with ImpalaRunner(config=config['databases']['impala'],
                filename="./sqls/run.sql",
                context=context,
                verbose=False) as runner:
    df = runner.run_sql_block()

df.head()

INFO:root:Starting run sql:
 [1;34m
SELECT * FROM test.ddd_211015 WHERE 1=1 AND owner_id='b' LIMIT 10
[0m 

INFO:impala.hiveserver2:Closing active operation
INFO:root:cost time: [0;32m0.666[0m seconds
INFO:root:Executed sql is null
INFO:root:cost time: [0;32m0.017[0m seconds
INFO:root:Starting run sql:
 [1;34m
SELECT * FROM test.ddd_211015 WHERE 1=1 AND owner_id='b' LIMIT 5
[0m 

INFO:impala.hiveserver2:Closing active operation
INFO:root:cost time: [0;32m0.121[0m seconds
INFO:root:Executed sql is null
INFO:root:cost time: [0;32m0.021[0m seconds
INFO:root:DB connection has been closed!


Unnamed: 0,owner_id,dun_case_id,call_time
0,b,b1,1
1,b,b1,2
2,b,b2,3
3,b,b2,4
4,b,b2,5
