In [None]:
import pandas as pd
import numpy as np
from sklearn import preprocessing
from sklearn.preprocessing import LabelEncoder
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split

In [None]:
def data_base_preprocess(data_base,test_newin=""):
    
    '''
    功能：预处理data_base格式
    1. data_base:读取的VSIM表格
    2.test_newin默认空，检测数据的项目号，需要在data_base中删除
    '''
    
    #提取所需要的列
    data_base = data_base[["NUMBER","NEWIN","NAME","PSS","PLANT","DRAWING NO","EXP","ORD","REQ","S","STATE","LC","UL","LENGTH(MM)","WIDTH(MM)","HEIGHT(MM)","EMB1","FGRP","POS"]]
    #删掉test检测数据
    data_base = data_base[data_base["NEWIN"] !=test_newin]
    
    # 通过NAME拆分INFO TYPE
    data_base["NAMELIST"] = data_base["NAME"].str.rsplit("-", n=0)
    
    data_base["TYPE"] = data_base["NAMELIST"].str[0].str.rsplit(",").str[0]
    data_base["INFO"] = data_base["NAMELIST"].str[1]
    
    #剔除UL=0，UL或者LC或者长宽高为空的数据
    data_base = data_base.loc[-( (data_base["UL"] == 0) | (data_base["UL"].isna())| (data_base["LENGTH(MM)"].isna() | data_base["LC"].isna()))]
    data_base = data_base.loc[-((data_base["EXP"] == "Y") | (data_base["S"] == "N"))]
    
    data_base["single_volume"] = data_base["LENGTH(MM)"] * data_base["WIDTH(MM)"]*data_base["HEIGHT(MM)"]/data_base["UL"]
    
    return data_base

In [None]:
def test_preprocess(axa,single_bom,plant):
        
        test1 = axa[["PART NO","PART NAME","PART DESCRIPTION"]]
        test2 = single_bom[["Fgrp","Pos ","PSS","-Partno-","Description-------","Proj"]]
        test1[["PART NAME","PART DESCRIPTION"]] = test1[["PART NAME","PART DESCRIPTION"]].apply(lambda x : x.apply(lambda y:str(y).strip()))
        test2["Description-------"] = test2["Description-------"].apply(lambda y:str(y).strip())
        test = pd.merge(test1,test2,how = "right",left_on = "PART NO",right_on = "-Partno-")
        
        test["NAME"] = test["PART NAME"]+"-"+test["PART DESCRIPTION"]
        test["NAME"] = test["NAME"].apply(lambda x : str(x).replace(">",""))
        
        test["NAMELIST"] = test["NAME"].str.rsplit("-", n=0)
        
        test["TYPE"] = test["NAMELIST"].str[0].str.rsplit(",").str[0]
        test["INFO"] = test["NAMELIST"].str[1]
        
        test.rename(columns = {"Fgrp":"FGRP","Pos ":"POS","PART NO":"NUMBER","Proj":"NEWIN"},inplace = True)
        
        test.dropna(subset = ["NAME","FGRP","POS","NUMBER"],inplace = True)
        
        test["PLANT"] = plant
        
        return test[["NUMBER","NAME","PLANT","TYPE","INFO","FGRP","POS","PSS","NEWIN"]].drop_duplicates().reset_index(drop=True)

In [None]:
def TYPE_process(data_base,mylist):
    
    '''
    删除type中的干扰项
    '''
    tt = data_base["TYPE"].apply(lambda x: x.replace(",", " "))
    for word in mylist:
        tt = tt.apply(lambda x: x.replace(word, ""))
    data_base["TYPE"] = tt.apply(lambda x: x.strip())
    
    return data_base

In [None]:
#从database中提取test
def test_from_database(df,size=0.02):
    
    
    train,test = train_test_split(df,test_size = size)
    
    return train.reset_index(drop = True),test.reset_index(drop = True)

以上3个cell说明同流程图

### 定义betaencoder类

In [None]:
class BetaEncoder(object):
    
    def __init__(self, group):
        
        self.group = group
        self.stats = None
    
    # get counts from df
    def fit(self, df, target_col):
        # 先验均值
        self.prior_mean = np.mean(df[target_col]) 
        stats           = df[[target_col, self.group]].groupby(self.group)
        # count和sum
        stats           = stats.agg(['sum', 'count'])[target_col]    
        stats.rename(columns={'sum': 'n', 'count': 'N'}, inplace=True)
        stats.reset_index(level=0, inplace=True)           
        self.stats      = stats
    
    # extract posterior statistics
    def transform(self, df, stat_type, N_min=1):
        
        df_stats = pd.merge(df[[self.group]], self.stats, how='left')
        n        = df_stats['n'].copy()
        N        = df_stats['N'].copy()
        
        # fill in missing
        nan_indexs    = np.isnan(n)
        n[nan_indexs] = self.prior_mean
        N[nan_indexs] = 1.0
        
        # prior parameters
        N_prior     = np.maximum(N_min-N, 0)
        alpha_prior = self.prior_mean*N_prior
        beta_prior  = (1-self.prior_mean)*N_prior
        
        # posterior parameters
        alpha       =  alpha_prior + n
        beta        =  beta_prior  + N-n
        
        # calculate statistics
        if stat_type=='mean':
            num = alpha
            dem = alpha+beta
        
        elif stat_type=='mode':
            num = alpha-1
            dem = alpha+beta-2
        
        elif stat_type=='median':
            num = alpha-1/3
            dem = alpha+beta-2/3
        
        elif stat_type=='var':
            num = alpha*beta
            dem = (alpha+beta)**2*(alpha+beta+1)
        
        elif stat_type=='skewness':
            num = 2*(beta-alpha)*np.sqrt(alpha+beta+1)
            dem = (alpha+beta+2)*np.sqrt(alpha*beta)
        
        elif stat_type=='kurtosis':
            num = 6*(alpha-beta)**2*(alpha+beta+1) - alpha*beta*(alpha+beta+2)
            dem = alpha*beta*(alpha+beta+2)*(alpha+beta+3)
        
        # replace missing
        value = num/dem
        #value[np.isnan(value)] = np.nanmedian(value)
        return value


### 基于beta_target_encoding对特征X进行编码

In [None]:
def to_encode_X(train,test,cat_cols,N_min):
    
    feature_cols = []    
    target_cols = "single_volume"
    x_train=pd.DataFrame()
    x_test = pd.DataFrame()
    
    # encode variables
    for c in cat_cols:
        
        # fit encoder
        be = BetaEncoder(c)
        be.fit(train, 'single_volume')
        
        # mean
        feature_name = f'{c}_mean'
        x_train[feature_name] = be.transform(train, 'mean', N_min)
        x_test[feature_name]  = be.transform(test,  'mean', N_min)
        feature_cols.append(feature_name)
        
        # mode
        feature_name = f'{c}_mode'
        x_train[feature_name] = be.transform(train, 'mode', N_min)
        x_test[feature_name]  = be.transform(test,  'mode', N_min)
        feature_cols.append(feature_name)
        
        # median
        feature_name = f'{c}_median'
        x_train[feature_name] = be.transform(train, 'median', N_min)
        x_test[feature_name]  = be.transform(test,  'median', N_min)
        feature_cols.append(feature_name)    
        
        # var
        feature_name = f'{c}_var'
        x_train[feature_name] = be.transform(train, 'var', N_min)
        x_test[feature_name]  = be.transform(test,  'var', N_min)
        feature_cols.append(feature_name)        
        
        # skewness
        feature_name = f'{c}_skewness'
        x_train[feature_name] = be.transform(train, 'skewness', N_min)
        x_test[feature_name]  = be.transform(test,  'skewness', N_min)
        feature_cols.append(feature_name)    
        
        # kurtosis
        feature_name = f'{c}_kurtosis'
        x_train[feature_name] = be.transform(train, 'kurtosis', N_min)
        x_test[feature_name]  = be.transform(test,  'kurtosis', N_min)
        feature_cols.append(feature_name)  

        x_train = x_train.dropna(axis=1)
        x_test = x_test.dropna(axis=1)
        
    return x_train,x_test

### 基于labelencoder对目标Y值进行编码

In [None]:
def to_encode_Y(train):
    

    enc=preprocessing.LabelEncoder()
    #y_encoded=pd.Series(enc.fit_transform(mydata["single_volume"]),index = mydata.index ) #训练LabelEncoder
    
    y_encoded = enc.fit_transform(train["single_volume"])
    
    y_train = enc.transform(train["single_volume"])
    
    return y_train

### 训练数据集，此处为随机森林模型

In [None]:
def model_fit(x_train,y_train):

    model = RandomForestClassifier(n_estimators=15,random_state=0,max_depth=30)  # 实例化模型RandomForestClassifier
    print("model is fitting")
    model.fit(x_train,y_train)
    print("Done")
    return model

### 预测测试集

In [None]:
def model_predict(x_test,y_train,train):
    
    
    pre = model.predict(x_test)

    
    pair = dict(zip(y_train,train["single_volume"]))
    
    predict_volume = [pair[i] for i in model.predict(x_test)]
    
    return pre,predict_volume,pair

In [None]:
import warnings

warnings.filterwarnings("ignore")

if __name__=="__main__":
    #读取VSIM表格
    data_base = pd.read_excel(r"C:\Users\zli242\Desktop\VOLVO\0815\data base.xlsx")
    axa = pd.read_excel(r"C:\Users\zli242\Desktop\VOLVO\packaging final_version\AXA P519.xlsx",header =4 ).iloc[4:,:]
    single_bom = pd.read_excel(r"C:\Users\zli242\Desktop\VOLVO\packaging final_version\Single bom.xlsx",header = 3).iloc[3:,:]
    plant = "CHN03"
    #清洗表格，删除样本数据的项目号
    data_base_processed = data_base_preprocess(data_base,test_newin = "")
    #清洗test
    test_processed = test_preprocess(axa, single_bom, plant)
    # 删除样本数据和database的干扰项
    mylist = [
        "UNCOLORED",
        "KSOCOLOR",
        "CKDCOLOR",
        "CHARCOAL",
        "OFFBLACK",
        "CHARCOAL SOLID",
        "DAWN",
        "BRIGHT",
        "CHROME",
        "GRAINED",
        "PAINTED",
        "CARDAMOM",
        "BLACK",
        "WHITE",
        "WOOD",
        "LIGHT ASH",
        "LIGHT GUI",
        "METAL",
        "LUMIERE",
        "FOG MELANGE",
    ]
    
    train = TYPE_process(data_base_processed,mylist)
    test = TYPE_process(test_processed,mylist)
    '''
    #从data_base中提取test,test
    train,test = test_from_database(data_base_final,size=0.05)
    '''
    
    #把data_base进行编码
    cat_cols = ['NAME',
     "NUMBER",
     "NEWIN",
     'PLANT',
     'TYPE',
     'INFO',
     "POS",
     "PSS",
     "FGRP"]
    N_min =5
    
    x_train,x_test = to_encode_X(train,test,cat_cols,N_min)
    y_train = to_encode_Y(train)
    
    #进行模型训练
    model= model_fit(x_train,y_train) 
    #检测
    pre,predict_volume,pair = model_predict(x_test, y_train, train)
    

### 若test为从data_base中随意挑选

In [None]:
#完全一致

sum([pair[i] for i in pre]==test["single_volume"])/len(test)


#0.3 1061 
sum((abs([pair[i] for i in pre]- test["single_volume"])/test["single_volume"])<0.3)/len(test)

#0.2 1038 
sum((abs([pair[i] for i in pre]- test["single_volume"])/ test["single_volume"])<0.2)/len(test)