In [1]:
import pymysql
import pandas as pd
import yaml

import pickle
import os

import matplotlib.pyplot as plt
plt.rcParams['font.sans-serif']=['SimHei'] #用来正常显示中文标签
plt.rcParams['axes.unicode_minus'] = False #用来正常显示负号

# 缺失值可视化库
import missingno as msno

In [6]:

def makedirs(path):
    """
    保证path这个文件或文件夹路径存在，如果没有则创建该路径
    path 可以为文件或文件夹
    """
    # 通过是否有.xxx后缀判断是文件或文件夹
    # 如果是文件则去除文件名
    # 如果是文件夹则直接判断该文件夹是否存在并创建
    if os.path.splitext(path)[1] != '':
        # 这个是文件
        path = os.path.split(path)[0]

    if not os.path.isdir(path):
        os.makedirs(path)

        
# 常用函数
def yaml_read(yaml_path, encoding='utf-8'):
    """
    yaml配置读取器，
    输入yaml文件路径
    返回变量与值的字典
    """
    with open(yaml_path, encoding=encoding) as f:
        _info_yaml = f.read()
    info_yaml = yaml.load(_info_yaml, Loader=yaml.FullLoader)
    return info_yaml


def miss(data):
    # 缺失值可视化
    msno.matrix(data, labels=True)
    plt.show()
    

def pickle_save(path, content):
    """
    保存py变量
    """
    # 如果不存在该路径，则创建文件夹
    makedirs(path)
    with open(path, 'wb') as f:
        pickle.dump(content, f)


def pickle_load(path):
    """
    读取py变量
    """    
    with open(path, 'rb') as f:
        content = pickle.load(f)
    return content


In [7]:
# 专业函数
def fault_map_func(x, fault_dict):
    # 从故障原因中提取，哪些故障，返回故障类型列表
    # 可能有缺失值
    if type(x) != str:
        return []
    
    _fault_type_list = []
    for word in list(fault_dict):
        if word in x:
            _fault_type_list.append(fault_dict[word])
    
#     if len(_fault_type_list) == 0:
#         return None
#     elif len(_fault_type_list) == 1:
#         return _fault_type_list[0]
    
    return _fault_type_list



def find_fault_id(x, focus_id, mode='common'):
    # 从故障id列表中，判断是否存在指定类型，可选：
    # common: 包含该故障
    # only: 有且只对应该故障
    # first: 多故障，以第一个故障为判断依据
    
    assert mode in ['only', 'common', 'first'], '未知模式，请选择:only, common, first 模式'
    # mode: 'only', 'common'

    if len(x) == 0:
        return False

    elif mode == 'common':
        res = focus_id in x
        
    elif mode == 'only':
        res = [focus_id] == x
        
    elif mode == 'first':
        try:
            res = focus_id == x[0]
        except Exception as e:
            print(x)
            raise e
    
    else:
        return False

    return res



class FindReason(object):
    """故障原因检索"""
    def __init__(self, data_path):
        """
        data_path: str, 故障映射表路径
        """
        super(FindReason, self).__init__()


        # 故障特征词原因，pd.DataFrame
        fault_mapping_df = yaml_read(data_path, encoding='utf-8')
        self.fault_mapping_df = pd.DataFrame(fault_mapping_df).T


    def get_fault_mapping_df(self):
        # 获取 故障特征词原因
        return self.fault_mapping_df


    def bag_of_words_in_language(self, fault_focus_language):
        """
        返回指定语言
        fault_focus_language: str, 故障特征词语言 in ['CN', 'RU', 'EN']
        """
        assert fault_focus_language in ['CN', 'RU', 'EN'], '选择的语言只能为CN,RU,EN其中一种'

        bag_of_words = []
        for i in self.fault_mapping_df[fault_focus_language]:
            bag_of_words += i

        # 列表去重，保持原始顺序
        bag_of_words = list(dict.fromkeys(bag_of_words))

        return bag_of_words


    def find_reason(self, fault_focus_reason, fault_focus_language):
        """
        fault_focus_reason: str, 故障特征词原因
        fault_focus_language: str, 故障特征词语言 in ['CN', 'RU', 'EN']
        """

        assert fault_focus_language in ['CN', 'RU', 'EN'], '选择的语言只能为CN,RU,EN其中一种'

        # 某个特征词可能出现在多个故障类型中
        fault_reason_focus_idx_list = []
        for idx in range(len(self.fault_mapping_df)):
            if fault_focus_reason in self.fault_mapping_df.loc[idx, fault_focus_language]:
                fault_reason_focus_idx_list.append(idx)

        return fault_reason_focus_idx_list


In [8]:

class Database():
    def __init__(self):
        # 打开数据库连接
        db = pymysql.connect(host='47.95.112.122',
                             user='root',
                             password='root',
                             database='clouddatabase')
        #建立游标
        self.cursor = db.cursor()
    def query(self, table, Wellbore, Date, columns=None):
        #不指定columns即查询全部列
        if columns is None:
            self.cursor.execute('SHOW COLUMNS FROM %s'%(table))
            res = self.cursor.fetchall()
            df = pd.DataFrame(res)
            columns_list = df[0].values
        #若指定columns即按照指定的columns查询
        else:
            columns_list = columns
        #拼接sql语句中要查询的列
        columns_sql = ''
        for i in range(len(columns_list) - 1):
            columns_sql += ("`%s`" + ',') % (columns_list[i])
        columns_sql += "`%s`" % (columns_list[-1])
        #形成sql语句
        sql = "select %s from %s where `Wellbore`='%s' " \
              "and `Date` Between '%s' and '%s'" \
              % (columns_sql, table, Wellbore, Date[0], Date[1])
        self.cursor.execute(sql)
        res = self.cursor.fetchall()
        #若查询到即返回结果，查询不到返回None
        if res:
            df = pd.DataFrame(res, columns=columns_list)
            return df
        else:
            return None

In [12]:
# 数据基准文件夹
data_path = 'D:/work/screw_pump/data'

In [13]:
failed_info = pd.read_excel(f'{data_path}/部分故障类型标记.xlsx')

In [14]:
failed_info.head()

Unnamed: 0,Wellbore,Completion,Condition_Code,Well_Type,Open_Date,Fail_Date,Workover_Purpose,Fail_Reason,Running_Time,Pump_Model,Well_Condition,Length,Pump_Depth
0,39-3,K,0,ннс,2016-10-28 00:00:00,2017-01-21 00:00:00,Нет подачи.Очистка забоя.Ревизия ГНО.,13-я штанга слом по телу,85.0,TP 28-1000,новый,5.5,330.56
1,1021-3,J,0,ннс,2016-10-31 00:00:00,2017-04-07 00:00:00,Нет подачи.Очистка забоя.Ревизия ГНО.,28-яштанга слом по телу,158.0,Netzsh 40-D-90SH,подн.,4.05,419.72
2,6210-3S,J,0,бгс,2017-01-12,2017-04-27,Заклин плунжера. Промывка.,Отбракован НКТ 3 1/2-1шт(трещина по телу).,105.0,API 30-250 RWAM 18-2,поднятый,,345.12
3,688,J,0,вс,2014-11-16 00:00:00,2017-04-28 00:00:00,Нет подачи. Очистка забоя. Смена насоса и ВП.,"(отбраковка штанги 1""-20шт коррозия по телу).о...",894.0,Anek (Centrilift) 43-D-124,новый,4.6,412.26
4,1052-3S,J,0,бгс,2016-10-24 00:00:00,2017-05-21 00:00:00,Нет подачи.Очистка забоя.Ревизия ГНО.,"Штанги 1""-20шт(коррозия по телу).Отбракован НК...",209.0,TP 75-1000,новый,5.67,332.87


In [15]:
failed_info.shape

(174, 13)

In [None]:
# 故障映射表实例化
findReason = FindReason(f'{data_path}/fault_mapping_dict.yaml')
# 获取故障映射表的矩阵
fault_mapping_df = findReason.get_fault_mapping_df()
# 获取指定语言的词袋
fault_mapping_bow = findReason.bag_of_words_in_language('RU')
print(f'词袋长度: {len(fault_mapping_bow)},\n{fault_mapping_bow}')



In [None]:


# 特征词语言
fault_focus_language = 'RU'  # ['CN', 'RU', 'EN']

# 获取故障映射字典
fault_dict = {}


for word in fault_mapping_bow:
    # 计算哪些故障某个特征词
    # TODO 列表长度一般为1，如果大于1说明多个故障类型都有相同的特征词，需要单独考虑
    fault_idx_list = findReason.find_reason(word, fault_focus_language)

    if len(fault_idx_list) == 1:
        fault_id = fault_idx_list[0]
    elif len(fault_idx_list) == 0:
        print('不存在该特征词')
        fault_id = None
    elif len(fault_idx_list) > 1:
        # TODO 这里暂时只取第一个出现的故障类型
        fault_id = fault_idx_list[0]

    # print(f'故障库检索发现该特征词在 {len(fault_idx_list)} 个故障中出现')
    # for i in fault_idx_list:
    #     print(f'{fault_focus_reason}, 属于故障编号: {i}, 故障中文名: {fault_mapping_df.loc[i, "CN"][0]}')
    
    fault_dict[word] = fault_id



In [None]:
# 躺井记录的文件路径，通过特征词进行故障分类
fault_data_info_path = f'{data_path}/部分故障类型标记.xlsx'

# 故障数据表
failed_info = pd.read_excel(fault_data_info_path)

# 特征词抽提到新的一列
failed_info['Fail_id_list'] = failed_info['Fail_Reason'].map(lambda x: fault_map_func(x, fault_dict))


In [None]:
# 关注的故障id
focus_id = 0
# 筛选模式
mode = 'first'
# 筛选是否属于该故障记录
fault_id_flag = failed_info['Fail_id_list'].map(lambda x: find_fault_id(x, focus_id, mode=mode))



In [None]:
fault_dict

In [None]:
xy_raw_list = []

# TODO 数据库初始化部分存在一段时间未调用自动断开问题
database = Database()


for focus_id in sorted(list(set(fault_dict.values()))):
    # 第 i 个类型的故障
    # 从故障记录表中，查询： 井名，起止日期
    
    # 关注的故障id
    # 筛选模式
    mode = 'first'
    # 筛选是否属于该故障记录
    fault_id_flag = failed_info['Fail_id_list'].map(lambda x: find_fault_id(x, focus_id, mode=mode))

    all_well_df_focus_id = failed_info[fault_id_flag]
    
    for index, row in all_well_df_focus_id.iterrows():
        wellname = 'NB' + row['Wellbore']
        start2fault_date = [row['Open_Date'], row['Fail_Date']]
        # print(f'{focus_id}\t{wellname}:\t{start2fault_date[0]}\t- {start2fault_date[1]}')
        df = database.query(table='Production_Daily_Data_pcponly',
                        Wellbore=wellname,
                        Date=start2fault_date,
                        columns=None
                    )
        # 查询结果该井在这个时间段的数据为空，跳过
        if df is None:
            continue
            
        xy_raw_list.append([df, focus_id])
        

In [None]:
# 除了井名和时间，均为数值列
float_domain_columns = ['Wellbore', 'Date', 'WHT_0С', 'TP_psig', 'CP_psig', 'Prod_Hrs', 'Wtr_Cut_Dec', 'Sand_Cut_Dec', 'Test_Gross_Bbls', 'Test_Oil_Bbls', 'Test_Oil_ton', 'Gas(103/d)', 'GOR(m3/t)', 'Prod_FL_From_Surface_m', 'Prod_BHP_psia', 'Amps_Torq', 'Efficiency %', 'Depth_m', 'FL_above_Pump', 'Water_m3', 'SPM/RPM', 'Density(g/cm3)', 'Diameter / Factor', 'Displacement']



In [None]:
# 简单的缺失值数据均值填充

float_na_fill_mean = all_concat_df.mean().tolist()
float_na_fill_dict = {}
for key, value in zip(xy[0].columns[2:].tolist(), float_na_fill_mean):
    float_na_fill_dict[key] = value

In [None]:
# 原始数据清洗

xy_list_process = []

# 从开井到躺井小于20天的记录，忽略
alive_min_day = 20


for xy in xy_raw_list:
    x = xy[0][float_domain_columns]
    y = xy[1]
    
    if len(x) < alive_min_day:
        continue
    
    
    # 简单的缺失值数据均值填充
    x = x.fillna(float_na_fill_dict)

    xy_list_process.append([x, y])

    
    # break


In [None]:
path = f'{data_path}/xy_list_process.pkl'
pickle_save(path, xy_list_process)

In [None]:
all_concat_df = pd.concat([i[0] for i in xy_list_process])
print(all_concat_df.shape)
miss(all_concat_df)
