In [1]:
import os
os.chdir("../../")#与easymlops同级目录

### 拆分数据

In [2]:
import pandas as pd
data=pd.read_csv("./data/demo.csv")
data.head(5)

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,,S
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.925,,S
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1,C123,S
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.05,,S


In [3]:
x_train=data[:500]
x_test=data[500:]
y_train=x_train["Survived"]
y_test=x_test["Survived"]
del x_train["Survived"]
del x_test["Survived"]

### 简单跑一个模型

In [5]:
from easymlops import TablePipeLine
from easymlops.table.preprocessing import *
from easymlops.table.encoding import *
from easymlops.table.classification import *
from easymlops.table.storage import LocalStorage

In [6]:
table=TablePipeLine()
table.pipe(FixInput())\
  .pipe(FillNa())\
  .pipe(OneHotEncoding(cols=["Pclass", "Sex"], drop_col=False)) \
  .pipe(WOEEncoding(cols=["Ticket", "Embarked", "Cabin", "Sex", "Pclass"], y=y_train)) \
  .pipe(LabelEncoding(cols=["Name"]))\
  .pipe(LocalStorage(db_name="./local.db", table_name="label_encoding",cols=['PassengerId','Pclass','Name','Sex','Age','SibSp','Parch',
                                                                             'Ticket','Fare','Cabin','Embarked','Pclass_3','Pclass_1','Pclass_2','Sex_male','Sex_female']))\
  .pipe(LGBMClassification(y=y_train,native_init_params={"max_depth":2},native_fit_params={"num_boost_round":128},prefix="lgbm"))\
  .pipe(LocalStorage(db_name="./local.db", table_name="predict",cols=["lgbm_0","lgbm_1"]))\

x_test_new=table.fit(x_train).transform(x_test)
x_test_new.head(5)

Unnamed: 0,lgbm_0,lgbm_1
500,0.923326,0.076674
501,0.373652,0.626348
502,0.37838,0.62162
503,0.670166,0.329834
504,0.06847,0.93153


### 模拟线上数据流

In [7]:
for record in tqdm(x_test.to_dict("record")):
    table.transform_single(record,storage_base_dict={"key":record.get("PassengerId")})

100%|███████████████████████████████████████████████████████████████████████████████| 391/391 [00:02<00:00, 163.11it/s]


### 存储结果分析  
这部分需要特别注意的是：弄清楚你分析数据所需的计算资源是由存储引擎提供还是本地机器，如果是本地机器，尽量避免复杂的查询

#### 1. 查询某key对应的记录

In [8]:
table[-3].select_key(key=501)

Unnamed: 0,key,transform_time,PassengerId,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked,Pclass_3,Pclass_1,Pclass_2,Sex_male,Sex_female
0,501,2023-02-27 14:29:58,501,0.4824385226871724,0,1.1113788020761075,17.0,0,0,0.0,8.664,0.299607189091587,0.2248488292116022,1,0,0,1,0


#### 2. 放宽到更复杂的查询条件

In [9]:
table[-3].where("Pclass>0.4 and Sex>1 and Sex_male=1",limit=5)

Unnamed: 0,key,transform_time,PassengerId,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked,Pclass_3,Pclass_1,Pclass_2,Sex_male,Sex_female
0,501,2023-02-27 14:29:58,501,0.4824385226871724,0,1.1113788020761075,17.0,0,0,0.0,8.664,0.299607189091587,0.2248488292116022,1,0,0,1,0
1,509,2023-02-27 14:29:58,509,0.4824385226871724,0,1.1113788020761075,28.0,0,0,0.0,22.53,0.299607189091587,0.2248488292116022,1,0,0,1,0
2,510,2023-02-27 14:29:58,510,0.4824385226871724,0,1.1113788020761075,26.0,0,0,-0.4641575586823115,56.5,0.299607189091587,0.2248488292116022,1,0,0,1,0
3,511,2023-02-27 14:29:58,511,0.4824385226871724,0,1.1113788020761075,29.0,0,0,0.0,7.75,0.299607189091587,-0.5086093212531454,1,0,0,1,0
4,512,2023-02-27 14:29:58,512,0.4824385226871724,0,1.1113788020761075,0.0,0,0,0.0,8.05,0.299607189091587,0.2248488292116022,1,0,0,1,0


In [10]:
#更有可能存活的乘客
table[-1].where("lgbm_1>0.8",limit=5)

Unnamed: 0,key,transform_time,lgbm_0,lgbm_1
0,505,2023-02-27 14:29:58,0.0684702857122879,0.931529714287712
1,514,2023-02-27 14:29:58,0.0718778453665381,0.9281221546334618
2,517,2023-02-27 14:29:58,0.1139278092213909,0.886072190778609
3,519,2023-02-27 14:29:58,0.1945796833533618,0.8054203166466383
4,521,2023-02-27 14:29:58,0.0710767441929253,0.9289232558070748


#### 3. 聚合分析  
比如：不同sex_male,pclass_3下的统计量

In [11]:
table[-3].group_agg_where(group_by="Sex_male,Pclass_3", agg_sql="Sex_male,Pclass_3,max(PassengerId) as PassengerId_max,sum(Sex)/count(Sex) as Sex_mean,count(Ticket) as Ticket_cnt", where_sql="transform_time>='2023-02-27 13:44:12'", limit=10)

Unnamed: 0,Sex_male,Pclass_3,PassengerId_max,Sex_mean,Ticket_cnt
0,0,0,888,-1.56999,75
1,0,1,889,-1.56999,54
2,1,0,890,1.111379,104
3,1,1,891,1.111379,158


#### 4. 纯sql  
以上接口不支持复杂的sql嵌套，这里可以直接定义复杂的sql嵌套分析

In [12]:
table_name="predict"
sql=f"""
-- 统计是否存活用户的最早最晚transform时间
select survied_pred,min(transform_time) as transform_min,max(transform_time) as transform_max from 
    -- 如果lgbm_1>0.5就视为存活
    (select key,transform_time,case when lgbm_1>0.5 then 1 else 0 end as survied_pred from {table_name}) as t 
group by survied_pred
"""
table[-1].sql(sql)

Unnamed: 0,survied_pred,transform_min,transform_max
0,0,2023-02-27 14:29:58,2023-02-27 14:30:00
1,1,2023-02-27 14:29:58,2023-02-27 14:30:00


### 异步保存
存储模块强制使用异步的方式存储数据，通常只消耗少量transform_single的时间(获取当前时间以及字典中的数据，并放到队列中异步存储)

In [13]:
from easymlops.table.callback import check_transform_function_pipeline
table.callback(check_transform_function_pipeline,x_test[:10])

(<class 'easymlops.table.preprocessing.core.FixInput'>) module check [transform] complete,speed:[0.0ms]/it,cpu:[0%],memory:[0K]
(<class 'easymlops.table.preprocessing.onevar_operation.FillNa'>) module check [transform] complete,speed:[0.0ms]/it,cpu:[0%],memory:[0K]
(<class 'easymlops.table.encoding.OneHotEncoding'>) module check [transform] complete,speed:[0.0ms]/it,cpu:[0%],memory:[0K]
(<class 'easymlops.table.encoding.WOEEncoding'>) module check [transform] complete,speed:[0.0ms]/it,cpu:[0%],memory:[0K]
(<class 'easymlops.table.encoding.LabelEncoding'>) module check [transform] complete,speed:[0.0ms]/it,cpu:[0%],memory:[0K]
(<class 'easymlops.table.storage.local_storage.LocalStorage'>) module check [transform] complete,speed:[1.56ms]/it,cpu:[0%],memory:[0K]
(<class 'easymlops.table.classification.LGBMClassification'>) module check [transform] complete,speed:[2.43ms]/it,cpu:[0%],memory:[0K]
(<class 'easymlops.table.storage.local_storage.LocalStorage'>) module check [transform] complet

### 独立调用
通过new一个LocalStorage对象，可以独立调用目前的存储数据（只需要指定db_name和table_name）

In [14]:
ls=LocalStorage(db_name="./local.db", table_name="predict")
ls.sql(sql)

Unnamed: 0,survied_pred,transform_min,transform_max
0,0,2023-02-27 14:29:58,2023-02-27 14:31:51
1,1,2023-02-27 14:29:58,2023-02-27 14:31:51


In [16]:
ls.select_key(key=501)

Unnamed: 0,key,transform_time,lgbm_0,lgbm_1
0,501,2023-02-27 14:29:58,0.9233260451690832,0.0766739548309168


In [18]:
copy.deepcopy(None)