In [1]:
import pandas as pd
import pyspark.pandas as ps
import requests
import json

from sqlalchemy import create_engine
from pca import pca
from pyspark.sql.functions import pandas_udf, PandasUDFType, max, col, countDistinct, when, rank, lit
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType
from pyspark.sql.window import Window

from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.impute import SimpleImputer
from sklearn.metrics import roc_auc_score
from sklearn.pipeline import Pipeline
from imblearn.under_sampling import ClusterCentroids

# from backend_spark.doris_common.doris_client import DorisClient
from functools import reduce
from pyspark.sql import DataFrame
from typing import Optional



In [None]:
#####################################################################################
#######################################解析SQL########################################
#####################################################################################
# doris 数据库连接
client = DorisClient("10.52.199.81", 18030, 9030, user="root", password="Nexchip@123", data_base="etl",
                     mem_limit="68719476736")
                     
"""解析sql 的辅助函数"""
def read_sql(sql_stat, read_client=client, session=spark):
    df1 = read_client.doris_read(session, sql_stat)
    return df1


def process_like(key: str, value: list[str]) -> str:
    # 处理模糊条件的匹配: (key like 'aa%' or key like "bb%")
    key = keyword_map_from_json_to_table.get(key)
    v_join = ' or '.join([f"d1.{key} like  '{v.replace('*', '%')}' " for v in value])
    return "({})".format(v_join)


def process_not_like(key: str, value: list[str]) -> str:
    # 处理非模糊条件的匹配:key in ('aa', 'bb')
    key = keyword_map_from_json_to_table.get(key)
    v_join = ",".join([f"'{v}'" for v in value])
    return "d1.{} in ({})".format(key, v_join)


def test_not_like():
    result = (process_not_like("tool_name", ["aa", "bb", "cc"]))
    assert "tool_name in ('aa','bb','cc')" == result, "not like 验证失败"


def test_like():
    result = process_like("tool_name", ["aa*", "bb*", "cc*"])
    assert "(tool_name like  'aa%'  or tool_name like  'bb%'  or tool_name like  'cc%' )" == result, "like 验证失败"


def process_one_keyword(key, value: list[str]) -> Optional[str]:
    if len(value) == 0:
        return None

    not_like_list = [v for v in value if "*" not in v]
    like_list = [v for v in value if "*" in v]

    # 处理模糊条件
    if len(not_like_list) != 0:
        not_like_sql_str = process_not_like(key, not_like_list)
    else:
        not_like_sql_str = ""

    # 处理非模糊条件

    if len(like_list) != 0:
        like_sql_str = process_like(key, like_list)
    else:
        like_sql_str = ""

    # 去除为一个元素为空字符串的情况的情况的情况
    concat_sql_str_list = [sql_str for sql_str in [like_sql_str, not_like_sql_str] if len(sql_str) != 0]
    # 使用or 操作 单字段过滤 的like 和 not like 语句
    return "(" + " or ".join(concat_sql_str_list) + ")"


def check_time_start_end(min_time, max_time):
    if min_time is not None and max_time is not None:
        pass
    else:
        raise ValueError("起始时间和结束时间必须全填")

def get_time_selection_sql(time_keyword, max_time=None, min_time=None):
    """
    获取时间区间的筛选的sql, 起始时间和结束时间都是可选的
    :param time_keyword:
    :param max_time:
    :param min_time:
    :return:
    """
    # 根据取值，生成单个时间过滤条件
    if min_time:
        time_part_min = f"d1.{time_keyword} >= '{min_time}'"
    else:
        time_part_min = " "

    if max_time:
        time_part_max = f"d1.{time_keyword} < '{max_time}'"
    else:
        time_part_max = " "

    # 如果存在，拼接多个查询条件，或者只保留一个过滤条件
    if (max_time is not None) and (min_time is not None):
        time_sql = f' {time_part_min} and {time_part_max}'
    elif (max_time is None) and (min_time is None):
        time_sql = " "
    else:
        time_sql = time_part_max if max_time else time_part_min

    return time_sql

def concat_time_filter_sql_with_other_keyword_sql(time_filter_sql: str, other_keyword_sql:str) -> str:
    """
    拼接时间过滤条件与非时间过滤条件
    :param time_filter_sql:
    :param other_keyword_sql:
    :return:
    """
    time_strip = time_filter_sql.strip()
    other_strip = other_keyword_sql.strip()
    if len(time_strip) == 0 and len(other_strip) == 0:
        return ""
    elif len(time_strip) != 0 and len(other_strip) == 0:
        return  time_filter_sql
    elif len(time_strip) == 0 and len(other_strip) != 0:
        return other_keyword_sql
    else:
        return f'{time_filter_sql} and {other_keyword_sql}'
    
    
def trans_select_condition_to_sql_with_label(select_condition_dict: dict, table_name: str) -> str:
    # 查询条件转sql,并打标签，label '0': good wafer, '1': bad wafer
    filter_sql_list = []
    for keyword, value in select_condition_dict.items():
        if keyword not in ["dateRange", "waferId", "uploadId", "mergeProdg1"]:
            sql_filter_one_keyword = process_one_keyword(keyword, value)
            if sql_filter_one_keyword is not None:
                filter_sql_list.append(sql_filter_one_keyword)

    # 处理时间区间
    time_bin = select_condition_dict.get("dateRange")


    if len(time_bin) == 1: # list[dict]
        time_bin_dict = time_bin[0]
        min_time = time_bin_dict.get("start")
        max_time = time_bin_dict.get("end")
    else:
        min_time = None
        max_time = None 

    # 去除时间检查，时间范围为可选输入
    # 检查起始时间和结束时间全部非空
    # check_time_start_end(min_time, max_time)

    # 处理waferId
    waferId = select_condition_dict.get("waferId")
    good_wafer_list = waferId.get("good")
    bad_wafer_list = waferId.get("bad")
    upload_id = select_condition_dict.get("uploadId")
    # upload_id = '20231116152808771'

    # 根据time 过滤条件,生成sql
    time_filter_sql = get_time_selection_sql(time_keyword=keyword_map_from_json_to_table.get('dateRange'), max_time=max_time, min_time=min_time)


    #if len(good_wafer_list) > 0 and len(bad_wafer_list) > 0:
    if upload_id is not None and len(upload_id) > 0:

        # good wafer, bad wafe 均有指定，需要从层层字段的过滤的条件下选择
        # good_wafer_filter_sql = process_one_keyword("waferId", good_wafer_list)
        # bad_wafer_filter_sql = process_one_keyword("waferId", bad_wafer_list)
        # # or 拼接
        # wafer_filter_sql = " or ".join([good_wafer_filter_sql, bad_wafer_filter_sql])
        # wafer_filter_sql = f"({wafer_filter_sql})"
        # # 加入wafer 过滤条件
        # filter_sql_list.append(wafer_filter_sql)
        other_keyword_filter = " and ".join(filter_sql_list)

        case_when_statment = f"""  (case
        when d2.GB_FLAG = 'good' then 0 
        else 1
        end ) as label
        """

        filter_sql_concat = concat_time_filter_sql_with_other_keyword_sql(time_filter_sql, other_keyword_filter)

        if filter_sql_concat != '':
            select_sql = f"""select *, {case_when_statment} from {table_name} d1  
            join etl.UPLOADED_WAFER d2
        on d1.WAFER_ID = d2.WAFER_ID

            where {filter_sql_concat} and d2.UPLOAD_ID = '{upload_id}'"""
        else:
            select_sql = f"""select *, {case_when_statment} from {table_name} d1  
            join etl.UPLOADED_WAFER d2
        on d1.WAFER_ID = d2.WAFER_ID where d2.UPLOAD_ID = '{upload_id}'"""

    else: 
        raise ValueError("good bad wafer 都必须选！")

    # elif len(good_wafer_list) > 0 and len(bad_wafer_list) == 0:
    #     # 选good, 剩余为 bad

    #     other_keyword_filter = " and ".join(filter_sql_list)
    #     good_wafer_filter_sql = process_one_keyword("waferId", good_wafer_list)

    #     case_when_statment = f"(case when {good_wafer_filter_sql} then 0 else 1  end ) label"
    #     filter_sql_concat = concat_time_filter_sql_with_other_keyword_sql(time_filter_sql, other_keyword_filter)
    #     select_sql = f"select *, {case_when_statment} from {table_name} where {filter_sql_concat}"

    # elif len(good_wafer_list) == 0 and len(bad_wafer_list) > 0:
    #     # 选bad, 剩余为good
    #     other_keyword_filter = " and ".join(filter_sql_list)
    #     bad_wafer_filter_sql = process_one_keyword("waferId", bad_wafer_list)
    #     case_when_statment = f"""(case when {bad_wafer_filter_sql} then 1 else 0 end ) label"""
    #     filter_sql_concat = concat_time_filter_sql_with_other_keyword_sql(time_filter_sql, other_keyword_filter)
    #     select_sql = f"select *, {case_when_statment} from {table_name} where {filter_sql_concat}"
    #     # case1 stat results 表没有case_info 时间列，暂时去掉
        # select_sql = f"select *, {case_when_statment} from {table_name} where {other_keyword_filter}"

    # print(select_sql)    
    select_sql = select_sql.replace("*",  "d1.WAFER_ID, d1.TOOL_ID, d1.RUN_ID, d1.EQP_NAME, d1.PRODUCT_ID, d1.PRODG1, d1.TOOL_NAME, d1.LOT_ID, d1.RECIPE_NAME, d1.OPER_NO, d1.parametric_name, d1.CASE_INFO, d1.STATUS, d1.STATISTIC_RESULT")
    if table_name == "etl.DWD_POC_CASE_FD_UVA_DATA_TEST":
        select_sql = f"{select_sql} and d1.STATUS != 'ERROR'"
    print("select_sql", select_sql) 
    return select_sql
 


def get_data_from_doris(select_condition_list, table_name):
    select_df_list = [read_sql(trans_select_condition_to_sql_with_label(select_condition_dict, table_name)) for select_condition_dict in select_condition_list]
    # 多个进行union
    df1 = reduce(DataFrame.unionAll, select_df_list)
    return df1

In [84]:
############################################################################
##############################从kafka消息读取需要的资料#########################
############################################################################
def get_some_info(df:pd.DataFrame):
    if len(df) > 0:
        df = df.head(1)

    request_id = df["requestId"].values[0]
    request_params = df["requestParam"].values[0]
    # 避免存在单引号，因为json 引号只有双引号
    request_params = request_params.replace('\'', "\"")   
    parse_dict = json.loads(request_params)
    merge_prodg1 = parse_dict[0]['mergeProdg1']
    
    try:
        merge_operno = list(parse_dict[0]['mergeOperno'])
    except KeyError:
        merge_operno = None

    if merge_prodg1 == '1':
        grpby_list = ['OPER_NO', 'TOOL_NAME']
    elif merge_prodg1 == '0':
        grpby_list = ['PRODG1', 'OPER_NO', 'TOOL_NAME']
    else:
        raise ValueError
    return parse_dict, request_id, grpby_list, merge_operno

In [88]:
# 真正的kafka消息里全都是双引号
json_loads_dict = {
    "requestId": "fff",
    "requestParam": [
        {'dateRange': [{'start': "2023-12-01 00:00:00", 'end': "2024-01-15 00:00:00"}], 
         'lot': [], 
         'operNo': ["1G.EEG1R","1G.PPB10"], 
         'prodg1': [], 
         'productId': [], 
         'eqp': [], 
         'tool': [], 
         'recipeName': [], 
         'waferId': {'good': ["NBX392-15","NBX392-20","NBX392-24","NBX391-24","NBX391-25","NBX548-09",
                     "NBX391-01","NBX391-02","NBX391-13","NBX391-17"], 
                     'bad': ["NBX500-10","NBX500-01","NBX500-09"]}, 
         'uploadId': '20240110170016023', 
         'mergeProdg1': '0',
#          'mergeOperno': [{"2F.CDS10_XX.TDS01": ["2F.CDS10", "XX.TDS01"]},
#                            {"2F.CDS20_XX.CDS20": ["2F.CDS20", "XX.CDS20"]}]
        }
    ]
}

df_pa = pd.DataFrame({
    "requestId": [json_loads_dict["requestId"]], 
    "requestParam": [json.dumps(json_loads_dict["requestParam"])]})

df1 = ps.from_pandas(df_pa).to_spark()

  fields = [
  for column, series in pdf.iteritems():


In [89]:
#  1. 解析json 为字典， df1为kafka输入的结果数据，获取到parse_dict, request_id, grpby_list
df2 = df1.toPandas() 
parse_dict, request_id, grpby_list, merge_operno = get_some_info(df2)
print("parse_dict是：", parse_dict)
print("parse_dict的类型是：", type(parse_dict))
print("request_id是：", request_id)
print("grpby_list是：", grpby_list)
print("merge_operno是：", merge_operno)

# 2. 从kafka 关键字映射都具体数据源中的字段,没有的可以删除
# keyword_map_from_json_to_table: dict = {
#     "prodg1": "PRODG1",
#     "waferId": "WAFER_ID",
#     "dateRange": "START_TIME",
#     "productId": "PRODUCT_ID",
#     "operNo": "OPER_NO",
#     "eqp": "EQP_NAME",
#     "tool": "TOOL_NAME",
#     "lot": "LOT_ID",
#     "recipeName": "RECIPE_NAME"}

# # 3. 获取查询条件list
# select_condition_list = parse_dict

# # 4. 指定查询表名, 根据实际情况需要修改
# table_name = "etl.DWD_POC_CASE_FD_UVA_DATA_TEST"

parse_dict是： [{'dateRange': [{'start': '2023-12-01 00:00:00', 'end': '2024-01-15 00:00:00'}], 'lot': [], 'operNo': ['1G.EEG1R', '1G.PPB10'], 'prodg1': [], 'productId': [], 'eqp': [], 'tool': [], 'recipeName': [], 'waferId': {'good': ['NBX392-15', 'NBX392-20', 'NBX392-24', 'NBX391-24', 'NBX391-25', 'NBX548-09', 'NBX391-01', 'NBX391-02', 'NBX391-13', 'NBX391-17'], 'bad': ['NBX500-10', 'NBX500-01', 'NBX500-09']}, 'uploadId': '20240110170016023', 'mergeProdg1': '0'}]
parse_dict的类型是： <class 'list'>
request_id是： fff
grpby_list是： ['PRODG1', 'OPER_NO', 'TOOL_NAME']
merge_operno是： None


In [72]:
df_pandas = pd.read_csv("small4.csv")
df_pandas.shape

(10, 38)

In [87]:
df_pandas

Unnamed: 0,TOOL_ID,TOOL_NAME,RUN_ID,EQP_NAME,CASE_INFO,PRODUCT_ID,PRODG1,OPER_NO,LOT_ID,WAFER_ID,...,LOWER_OUTLIER,UPPER_OUTLIER,RULES_ENABLED,ALARM_RULE,RESULT,STATUS,REGION,ERROR_MSG,STATISTIC_RESULT,VERSION
0,9279,PBT01_CLHA_4-12,365189,PBT01,2023-12-25,AFPNR901N.0C0C,L2800Z2N,1G.PPB10,NBX500.000,NBX500-01,...,,,1,,149.953478,NORMAL,LOWER_NORMAL,,149.953478,4
1,9280,PBT01_CLHA_4-21,359818,PBT01,2023-12-25,AFPNR901N.0C0C,L2800Z2N,1G.PPB10,NBX500.000,NBX500-10,...,,,1,,150.015,NORMAL,LOWER_NORMAL,,150.015,4
2,9279,PBT01_CLHA_4-12,366799,PBT01,2023-12-28,AFPNR901N.0B01,L2800Z2N,1G.PPB10,NBX548.110,NBX548-09,...,,,1,,149.9875,NORMAL,LOWER_NORMAL,,149.9875,4
3,9279,PBT01_CLHA_4-12,362253,PBT01,2023-12-20,AFPNR901N.0C0C,L2800Z2N,1G.PPB10,NBX500.000,NBX500-01,...,,,1,,149.946667,NORMAL,LOWER_NORMAL,,149.946667,4
4,9280,PBT01_CLHA_4-21,356904,PBT01,2023-12-20,AFPNR901N.0C0C,L2800Z2N,1G.PPB10,NBX500.000,NBX500-10,...,,,1,,150.0075,NORMAL,LOWER_NORMAL,,150.0075,4
5,9279,PBT01_CLHA_4-12,365191,PBT01,2023-12-25,AFPNR901N.0C0C,L2800Z2N,1G.PPB10,NBX500.000,NBX500-09,...,,,1,,149.97125,NORMAL,LOWER_NORMAL,,149.97125,4
6,9279,PBT01_CLHA_4-12,362255,PBT01,2023-12-20,AFPNR901N.0C0C,L2800Z2N,1G.PPB10,NBX500.000,NBX500-09,...,,,1,,149.989167,NORMAL,LOWER_NORMAL,,149.989167,4
7,9279,PBT01_CLHA_4-12,373798,PBT01,2024-01-09,AFPNR901N.0D0D,L2800Z2N,1G.PPB10,NBX392.010,NBX392-24,...,,,1,,149.967083,NORMAL,LOWER_NORMAL,,149.967083,4
8,9279,PBT01_CLHA_4-12,373797,PBT01,2024-01-09,AFPNR901N.0D0D,L2800Z2N,1G.PPB10,NBX392.010,NBX392-20,...,,,1,,149.995833,NORMAL,LOWER_NORMAL,,149.995833,4
9,9278,PBT01_CLHA_4-11,385956,PBT01,2024-01-09,AFPNR901N.0D0D,L2800Z2N,1G.PPB10,NBX392.010,NBX392-15,...,,,1,,149.995217,NORMAL,LOWER_NORMAL,,149.995217,4


In [7]:
# df_pandas[df_pandas['label'] == 0]['WAFER_ID'].unique()
# df_pandas[df_pandas['label'] == 1]['WAFER_ID'].unique()

In [73]:
df1 = ps.from_pandas(df_pandas).to_spark()
df1.count()

  fields = [
  for column, series in pdf.iteritems():


10

In [74]:
#####自己打标签
############################################
######## 1. 客户只定义了bad_wafer = []是什么  ########
############################################
# 将传进来的BAD_WAFER, 用 | 连接起来，
# F.col('WAFER_ID').like('NDJ065%') | F.col('WAFER_ID').like('NDJ067%') 作为条件传入增加label
# 同时将isin模式也作为条件传入增加label

def get_label_single(df, bad_wafer):
    like_conditions = [f"col('WAFER_ID').like('{bad}')" for bad in bad_wafer]
    all_like_conditions = " | ".join(like_conditions)
    isin_conditions = "col('WAFER_ID').isin(bad_wafer)"
    df = df.withColumn('label', 
                when( eval(all_like_conditions) | eval(isin_conditions), int(1)).otherwise(int(0)))
    return df 


############################################
## 2. 客户定义了bad_wafer = [] 和 good_wafer = []######
############################################
# 将传进来的BAD_WAFER, 用 | 连接起来，
# 将传进来的GOOD_WAFER, 也用 | 连接起来，
# 同时将isin模式也作为条件传入增加label

def get_label_double(df, bad_wafer, good_wafer):
    good_like_conditions = [f"col('WAFER_ID').like('{good}')" for good in good_wafer]
    all_good_like_conditions = " | ".join(good_like_conditions)
    good_isin_conditions = "col('WAFER_ID').isin(good_wafer)"

    bad_like_conditions = [f"col('WAFER_ID').like('{bad}')" for bad in bad_wafer]
    all_bad_like_conditions = " | ".join(bad_like_conditions)
    bad_isin_conditions = "col('WAFER_ID').isin(bad_wafer)"

    df = df.withColumn('label',  when(eval(all_good_like_conditions) | eval(good_isin_conditions), int(0)).when(eval(all_bad_like_conditions) | eval(bad_isin_conditions), int(1)).otherwise(222333))
    df = df.filter(df['label'] != int(222333))
    return df

In [75]:
good = ['NBX392-15', 'NBX392-20', 'NBX392-24', 'NBX391-24', 'NBX391-25', 'NBX548-09', 'NBX391-01', 'NBX391-02', 'NBX391-13', 'NBX391-17']
bad  = ['NBX500-10', 'NBX500-01', 'NBX500-09']

if 'label' in df1.columns:
    df1 = df1
else:
    df1 = get_label_double(df1, bad, good)

In [76]:
df1.count()

10

In [90]:
###################################################################
##########################融合OPER_NO字段##########################
###################################################################
def integrate_operno(df, merge_operno_list):
    if merge_operno_list is not None:
        # 将mergeOperno中每个字典的values提取出来，组成一个列表
        values_to_replace = [list(rule.values())[0] for rule in merge_operno_list]

        # 将每一个字典中的values拼接起来
        merged_values = ["_".join( list(rule.values())[0]) for rule in merge_operno_list]

        for values, replacement_value in zip(values_to_replace, merged_values):
            df = df.withColumn("OPER_NO", when(col("OPER_NO").isin(values), replacement_value).otherwise(col("OPER_NO")))
        return df
    else:
        return df

In [14]:
############################################################################
##################################FDC数据预处理###############################
############################################################################
def _pre_process(df):
    """
    param df: 从数据库中读取出来的某个CASE数据
    return: 数据预处理，后面要根据实际情况统一添加
    """
    # 只选出会用到的列
    df = df.select('WAFER_ID', 'TOOL_ID', 'RUN_ID', 'EQP_NAME', 'PRODUCT_ID', 'PRODG1', 'TOOL_NAME',
                   'OPER_NO', 'parametric_name', 'STATISTIC_RESULT', 'label')
    # 剔除NA值
    df = df.filter(col('STATISTIC_RESULT').isNotNull())
    # 按照所有的行进行去重
    df1 = df.dropDuplicates()
    # 选最新的RUN
    df2 = df1.groupBy('WAFER_ID', 'OPER_NO', 'TOOL_ID').agg(max('RUN_ID').alias('RUN_ID'))
    df_run = df1.join(df2.dropDuplicates(subset=['WAFER_ID', 'OPER_NO', 'TOOL_ID', 'RUN_ID']),
                      on=['WAFER_ID', 'OPER_NO', 'TOOL_ID', 'RUN_ID'], how='inner')
    return df_run



def commonality_analysis(df_run, grpby_list):
    """
    param df_run: 数据预处理后的数据
    return: 共性分析后的结果， 返回bad wafer前十的组合
    """
    grps = (df_run.groupBy(grpby_list)
            .agg(countDistinct('WAFER_ID').alias('wafer_count'),
                 countDistinct('WAFER_ID', when(df_run['label'] == 0, 1)).alias('good_num'),
                 countDistinct('WAFER_ID', when(df_run['label'] == 1, 1)).alias('bad_num'))
            .orderBy('bad_num', ascending=False))

    # 单站点+单腔室的情况
    if grps.count() == 1:
        return grps
    else:
        grps = grps.filter(grps['bad_num'] > 0)
        window_sep = Window().orderBy(col("bad_num").desc())
        ranked_df = grps.withColumn("rank", rank().over(window_sep))
        grpss = ranked_df.filter(col("rank") <= 10).drop("rank")
        return grpss

In [15]:
df_run = _pre_process(df1)
print(df_run.count())

7


In [16]:
# df_run.toPandas()

In [17]:
# grpby_list = ['PRODG1', 'OPER_NO', 'TOOL_NAME']
common_res = commonality_analysis(df_run, grpby_list)
common_res.show()

+--------+--------+---------------+-----------+--------+-------+
|  PRODG1| OPER_NO|      TOOL_NAME|wafer_count|good_num|bad_num|
+--------+--------+---------------+-----------+--------+-------+
|L2800Z2N|1G.PPB10|PBT01_CLHA_4-12|          5|       3|      2|
|L2800Z2N|1G.PPB10|PBT01_CLHA_4-21|          1|       0|      1|
+--------+--------+---------------+-----------+--------+-------+



In [None]:
# df_pandas[['WAFER_ID', 'TOOL_ID', 'RUN_ID', 'EQP_NAME', 'PRODUCT_ID', 'PRODG1', 'TOOL_NAME',
#                    'OPER_NO', 'parametric_name', 'STATISTIC_RESULT']]

In [18]:
###########################################################################
#################################获取样本数据#########################
############################################################################
def get_data_list(common_res, grpby_list, big_or_small='big'):
    """
    param common_res: 共性分析后的结果, 按照大样本或者小样本条件筛选出组合
    param grpby_list: 按照PRODG1+OPER_NO+TOOL_NAME分组或OPER_NO+TOOL_NAME分组
    param big_or_small: big或者small
    return: 对应组合的字典形式, 包在一个大列表中
    """
    assert big_or_small in ['big', 'small'], "只能选择big或者small, 请检查拼写"
    if big_or_small == 'big':
        good_bad_grps = common_res.filter("good_num >= 3 AND bad_num >= 3")
    else:
        good_bad_grps = common_res.filter("bad_num >= 1 AND wafer_count >=2")
    good_bad_grps = good_bad_grps.orderBy(col("bad_num").desc(), col("wafer_count").desc(), col("good_num").desc()).limit(5)

    if 'PRODG1' in grpby_list:
        data_list = good_bad_grps['PRODG1', 'OPER_NO', 'TOOL_NAME'].collect()  
    else:
        data_list = good_bad_grps['OPER_NO', 'TOOL_NAME'].collect()

    data_dict_list = [row.asDict() for row in data_list]
    return data_dict_list


def get_train_data(df_run, data_dict_list):
    """
    param df_run: 数据预处理后的数据
    param data_dict: 筛选后的字典结果
    return: 从原始数据中过滤出真正用来建模的组合数据
    """
    if len(data_dict_list[0]) == 3:
        prod, oper, tool = data_dict_list[0]['PRODG1'], data_dict_list[0]['OPER_NO'], data_dict_list[0]['TOOL_NAME']
        df_s = df_run.filter("PRODG1 == '{}' AND OPER_NO == '{}' AND TOOL_NAME == '{}'".format(prod, oper, tool))
        for i in range(1, len(data_dict_list)):
            prod, oper, tool = data_dict_list[i]['PRODG1'], data_dict_list[i]['OPER_NO'], data_dict_list[i]['TOOL_NAME']
            df_m = df_run.filter("PRODG1 == '{}' AND OPER_NO == '{}' and TOOL_NAME == '{}'".format(prod, oper, tool))
            df_s = df_s.union(df_m)
    else:
        oper, tool = data_dict_list[0]['OPER_NO'], data_dict_list[0]['TOOL_NAME']
        df_s = df_run.filter("OPER_NO == '{}' AND TOOL_NAME == '{}'".format(oper, tool))
        for i in range(1, len(data_dict_list)):
            oper, tool = data_dict_list[i]['OPER_NO'], data_dict_list[i]['TOOL_NAME']
            df_m = df_run.filter("OPER_NO == '{}' and TOOL_NAME == '{}'".format(oper, tool))
            df_s = df_s.union(df_m)
    return df_s

In [19]:
data_dict_list_ss = get_data_list(common_res, grpby_list, big_or_small='small')
data_dict_list_ss

[{'PRODG1': 'L2800Z2N', 'OPER_NO': '1G.PPB10', 'TOOL_NAME': 'PBT01_CLHA_4-12'}]

In [20]:
df_run_ss = get_train_data(df_run, data_dict_list_ss)
df_run_ss.count()

5

In [21]:
############################################################################
#########################获取传入的整个数据中的所有bad_wafer个数############
############################################################################
def get_all_bad_wafer_num(df):
    """
    param df: 筛选后的数据
    return: 数据中所有bad_wafer的数量
    """
    return df.filter("label == 1").select('WAFER_ID').distinct().count()

In [22]:
bad_wafer_num_small_sample = get_all_bad_wafer_num(df_run_ss)
bad_wafer_num_small_sample

2

In [23]:
##########################################################################################
#######################################对bad>=1的数据，用pca建模##############################
##########################################################################################
def get_pivot_table(df, by):
    """
    param df: 大样本组合的数据
    param by: 分组字段
    return: 表格透视后的结果
    """
    if len(by) == 3:
        df_pivot = df.dropna(axis=0).pivot_table(index=['WAFER_ID', 'label'], 
                                                     columns=['OPER_NO', 'TOOL_NAME', 'parametric_name', 'PRODG1'],
                                                     values=['STATISTIC_RESULT'])
    else:
        df_pivot = df.dropna(axis=0).pivot_table(index=['WAFER_ID', 'label'], 
                                                     columns=['OPER_NO', 'TOOL_NAME', 'parametric_name'],
                                                     values=['STATISTIC_RESULT'])
    df_pivot.columns = df_pivot.columns.map('#'.join)
    df_pivot = df_pivot.fillna(df_pivot.mean()).reset_index(drop=False)
    return df_pivot



def fit_pca_small_sample(df, by):
    """
    param df: 小样本组合的数据
    param by: 分组字段
    return: PCA建模后的结果
    """
    schema_all = StructType([StructField("PRODG1", StringType(), True),
                             StructField("OPER_NO", StringType(), True),
                             StructField("TOOL_NAME", StringType(), True),
                             StructField("features", StringType(), True),
                             StructField("importance", FloatType(), True),
                             StructField("bad_wafer", IntegerType(), True)]) 

    @pandas_udf(returnType=schema_all, functionType=PandasUDFType.GROUPED_MAP)
    def get_model_result(df_run):
        df_pivot = get_pivot_table(df=df_run, by=by)
        # 由于是小样本，再重新copy一份制造多一点数据传给PCA模型
        df_pivot_copy = df_pivot.copy()
        df_pivot_all = pd.concat([df_pivot, df_pivot_copy], axis=0)

        # 定义自变量
        x_train = df_pivot_all[df_pivot_all.columns.difference(['WAFER_ID', 'label']).tolist()]

        # 建立模型，传入给PCA的n_components选择x_train.shape中的最小值-1；
        # 选择是70%或者80%，出来的特征很有可能只是一两个
        model = pca(n_components=min(x_train.shape[0], x_train.shape[1])-1, verbose=None)
        results = model.fit_transform(x_train)
        res_top = results['topfeat']
        res_top_select = res_top[res_top['type'] == 'best'][['feature', 'loading']]
        res_top_select = res_top_select.drop_duplicates()
        res_top_select['importance'] = abs(res_top_select['loading'])
        res_top_select = res_top_select.rename(columns={'feature': 'features'})
        res_top_select = res_top_select.drop("loading", axis=1)
        
        # 增加一些字段信息
        res_top_select['bad_wafer'] = sum(df_pivot['label'])
        res_top_select['OPER_NO'] = df_run['OPER_NO'].values[0]
        res_top_select['TOOL_NAME'] = df_run['TOOL_NAME'].values[0]
        if len(by) == 3: 
            res_top_select['PRODG1'] = df_run['PRODG1'].values[0]
        else:
            res_top_select['PRODG1'] = 'grplen2'

        return res_top_select
    return df.groupby(by).apply(get_model_result)

In [24]:
res = fit_pca_small_sample(df=df_run_ss, by=grpby_list)
res.show()



+--------+--------+---------------+--------------------+----------+---------+
|  PRODG1| OPER_NO|      TOOL_NAME|            features|importance|bad_wafer|
+--------+--------+---------------+--------------------+----------+---------+
|L2800Z2N|1G.PPB10|PBT01_CLHA_4-12|STATISTIC_RESULT#...|       1.0|        2|
+--------+--------+---------------+--------------------+----------+---------+



In [25]:
#####################################################################################
##################################对bad>=1建模后的结果进行整合############################
#####################################################################################
def split_features(df, index) -> str:
    """
    param df: RandomForest建模后的feature_importance_table
    param index: 顺序值
    return: 字段属性值
    """
    return df['features'].apply(lambda x: x.split('#')[index])


def get_split_feature_importance_table(df, by):
    """
    param df: RandomForest建模后的feature_importance_table
    param by: OPER_NO+TOOL_NAME+PRODG1或者OPER_NO+TOOL_NAME
    return: 分裂features后的表
    """
    df['STATISTIC_RESULT'] = split_features(df, 0)
    df['OPER_NO'] = split_features(df, 1)
    df['TOOL_NAME'] = split_features(df, 2)
    df['parametric_name'] = split_features(df, 3)
    df['step'] = split_features(df, 4)
    df['stats'] = split_features(df, 5)

    if 'PRODG1' in by:
        df['PRODG1'] = split_features(df, 6)
    else:
        df = df.assign(PRODG1 = 'grplen2')

    df = df.drop(['features', 'STATISTIC_RESULT'], axis=1).reset_index(drop=True)
    return df


def add_feature_stats(df):
    """
    param df: 经过处理后的feature_importance_table
    return: 新增一列，含有参数的所有统计特征:feature_stats
    """
    feature_stats = df.groupby(['PRODG1', 'OPER_NO', 'TOOL_NAME', 'parametric_name', 'step'])['stats'].unique().reset_index()
    feature_stats['stats'] = [feature_stats['stats'].iloc[i].tolist() for i in range(len(feature_stats))]
    feature_stats['stats'] = feature_stats['stats'].apply(lambda x: "#".join(x))
    feature_stats = feature_stats.assign(parametric_name=lambda x: x['parametric_name']+str('#')+x['step']).drop('step', axis=1)
    return feature_stats


def split_calculate_features_small_sample(df, by):
    """
    param df: PCA建模后的结果
    param by: 分组字段
    return: features和importance结果
    """
    schema_all = StructType([ StructField("PRODG1", StringType(), True),
                              StructField("OPER_NO", StringType(), True),
                              StructField("TOOL_NAME", StringType(), True),
                              StructField("parametric_name", StringType(), True),
                              StructField("importance", FloatType(), True),
                              StructField("bad_wafer", FloatType(), True),
                              StructField("stats", StringType(), True)]) 

    @pandas_udf(returnType=schema_all, functionType=PandasUDFType.GROUPED_MAP)
    def get_result(model_results):        
        feature_importance_table = model_results[['features',  'importance', 'bad_wafer']].dropna(axis=0)
        # 分裂features
        feature_importance_res_split = get_split_feature_importance_table(feature_importance_table, by)

        # 新增一列，含有参数的所有统计特征:feature_stats
        feature_stats = add_feature_stats(feature_importance_res_split)

        #对同一种组合里的同一个参数进行求和:feature_importance_groupby
        feature_importance_groupby = (feature_importance_res_split.groupby(['PRODG1', 'OPER_NO', 'TOOL_NAME', 'bad_wafer',
                                                                         'parametric_name','step'])['importance'].sum().reset_index())
        feature_importance_groupby = feature_importance_groupby.assign(parametric_name=lambda x: x['parametric_name']+str('#')+x['step']).drop('step', axis=1)

        # feature_stats和feature_importance_groupby连接
        grpby_stats = pd.merge(feature_stats, feature_importance_groupby, on=['PRODG1', 'OPER_NO', 'TOOL_NAME', 'parametric_name']).dropna().reset_index(drop=True)
        return grpby_stats
    return df.groupby(by).apply(get_result)


def get_finall_results_small_sample(f_res, bad_wafer_num):
    """
    param s_res: roc_auc分数结果
    param f_res: features和importance结果
    param bad_wafer_num: 数据中所有bad_wafer的数量
    return: 最后的建模结果
    """
    f_res = f_res.withColumn("bad_ratio", col("bad_wafer") / bad_wafer_num)
    df_merge = f_res.withColumn('weight_original', col('importance') * col('bad_ratio'))

    # 最后再次进行一次归一化
    weight_all = df_merge.agg({"weight_original": "sum"}).collect()[0][0]
    df_merge = df_merge.withColumn("weight", col("weight_original") / weight_all)

    df_merge = df_merge.select(['PRODG1', 'OPER_NO', 'TOOL_NAME',
                                'parametric_name', 'weight', 'stats']).orderBy('weight', ascending=False)
    return df_merge

In [62]:
#####################################################################################
#############################将建模后的结果增加特定的列####################################
#####################################################################################
def add_certain_column(df, by, request_id):
    """
    param df: 最后的建模结果
    param by: 分组字段, 手动增加一列add
    param request_id: 传入的request_id
    return: 最后的建模结果增加特定的列
    """
    schema_all = StructType([
        StructField("PRODG1", StringType(), True),
        StructField("OPER_NO", StringType(), True),
        StructField("TOOL_NAME", StringType(), True),
        StructField("stats", StringType(), True),
        StructField("parametric_name", StringType(), True),
        StructField("weight", FloatType(), True),
        StructField("request_id", StringType(), True),
        StructField("weight_percent", FloatType(), True),
        StructField("index_no", IntegerType(), True)])

    @pandas_udf(returnType=schema_all, functionType=PandasUDFType.GROUPED_MAP)
    def get_result(final_res):
        final_res['weight'] = final_res['weight'].astype(float)
        final_res = final_res.query("weight > 0")
        final_res['request_id'] = request_id
        final_res['weight_percent'] = final_res['weight'] * 100
        final_res = final_res.sort_values('weight', ascending=False)
        final_res['index_no'] = [i + 1 for i in range(len(final_res))]
        final_res = final_res.drop('add', axis=1)
        # final_res['parametric_name'] = final_res['parametric_name'].str.replace("_", "+")
        final_res['PRODG1'] = final_res['PRODG1'].apply(lambda x: None if x == 'grplen2' else x)
        return final_res
    return df.groupby(by).apply(get_result)

In [26]:
f_res = split_calculate_features_small_sample(df=res, by=grpby_list)
f_res.show()

+--------+--------+---------------+--------------------+----------+---------+-----+
|  PRODG1| OPER_NO|      TOOL_NAME|     parametric_name|importance|bad_wafer|stats|
+--------+--------+---------------+--------------------+----------+---------+-----+
|L2800Z2N|1G.PPB10|PBT01_CLHA_4-12|PLATE_TEMP#DHP150...|       1.0|      2.0| MEAN|
+--------+--------+---------------+--------------------+----------+---------+-----+



In [27]:
model_res_ss = get_finall_results_small_sample(f_res=f_res, bad_wafer_num=bad_wafer_num_small_sample)
model_res_ss.show()

+--------+--------+---------------+--------------------+------+-----+
|  PRODG1| OPER_NO|      TOOL_NAME|     parametric_name|weight|stats|
+--------+--------+---------------+--------------------+------+-----+
|L2800Z2N|1G.PPB10|PBT01_CLHA_4-12|PLATE_TEMP#DHP150...|   1.0| MEAN|
+--------+--------+---------------+--------------------+------+-----+



#### 利用CASE1制作一个小样本的CASE

In [50]:
df_case1 = pd.read_csv("../DWD_POC_CASE_FD_UVA_DATA_CASE1_PROCESSED1.csv")
df_case1_small_sample_pandas = df_case1[df_case1['WAFER_ID'].isin(['NGE186-06', 'NGE186-12', 'NGE186-24', 'NGG239-19', 'NGE197-02', 'NGE197-15', 'NGE197-21', 'NGF482-01', 'NGF482-14'])]
df_case1_small_sample_pandas.shape

(736, 16)

In [51]:
df1 = ps.from_pandas(df_case1_small_sample_pandas).to_spark()
df1.count()

  fields = [
  for column, series in pdf.iteritems():


736

In [52]:
grpby_list

['PRODG1', 'OPER_NO', 'TOOL_NAME']

In [53]:
df_run = _pre_process(df1)
print(df_run.count())

736


In [54]:
common_res = commonality_analysis(df_run, grpby_list)
common_res.show()

+--------+--------+---------+-----------+--------+-------+
|  PRODG1| OPER_NO|TOOL_NAME|wafer_count|good_num|bad_num|
+--------+--------+---------+-----------+--------+-------+
|L11CD02A|1F.EEK10|EKT72_PM1|          6|       3|      3|
|L15DV07A|1F.EEK10|EKT72_PM1|          2|       0|      2|
+--------+--------+---------+-----------+--------+-------+



In [55]:
data_dict_list_ss = get_data_list(common_res, grpby_list, big_or_small='small')
data_dict_list_ss

[{'PRODG1': 'L11CD02A', 'OPER_NO': '1F.EEK10', 'TOOL_NAME': 'EKT72_PM1'},
 {'PRODG1': 'L15DV07A', 'OPER_NO': '1F.EEK10', 'TOOL_NAME': 'EKT72_PM1'}]

In [56]:
df_run_ss = get_train_data(df_run, data_dict_list_ss)
df_run_ss.count()

674

In [57]:
bad_wafer_num_small_sample = get_all_bad_wafer_num(df_run_ss)
bad_wafer_num_small_sample

5

In [58]:
res = fit_pca_small_sample(df=df_run_ss, by=grpby_list)
res.show()



+--------+--------+---------+--------------------+----------+---------+
|  PRODG1| OPER_NO|TOOL_NAME|            features|importance|bad_wafer|
+--------+--------+---------+--------------------+----------+---------+
|L11CD02A|1F.EEK10|EKT72_PM1|STATISTIC_RESULT#...| 0.5198736|        3|
|L11CD02A|1F.EEK10|EKT72_PM1|STATISTIC_RESULT#...| 0.9020127|        3|
|L11CD02A|1F.EEK10|EKT72_PM1|STATISTIC_RESULT#...|  0.940183|        3|
|L11CD02A|1F.EEK10|EKT72_PM1|STATISTIC_RESULT#...|0.76628584|        3|
|L11CD02A|1F.EEK10|EKT72_PM1|STATISTIC_RESULT#...| 0.6545386|        3|
|L11CD02A|1F.EEK10|EKT72_PM1|STATISTIC_RESULT#...|0.92193615|        3|
|L11CD02A|1F.EEK10|EKT72_PM1|STATISTIC_RESULT#...| 0.9986149|        3|
|L11CD02A|1F.EEK10|EKT72_PM1|STATISTIC_RESULT#...| 0.9862689|        3|
|L11CD02A|1F.EEK10|EKT72_PM1|STATISTIC_RESULT#...| 0.9045629|        3|
|L11CD02A|1F.EEK10|EKT72_PM1|STATISTIC_RESULT#...|0.98242646|        3|
|L11CD02A|1F.EEK10|EKT72_PM1|STATISTIC_RESULT#...| 0.7724064|   

In [59]:
f_res = split_calculate_features_small_sample(df=res, by=grpby_list)
f_res.show()

+--------+--------+---------+--------------------+----------+---------+----------+
|  PRODG1| OPER_NO|TOOL_NAME|     parametric_name|importance|bad_wafer|     stats|
+--------+--------+---------+--------------------+----------+---------+----------+
|L11CD02A|1F.EEK10|EKT72_PM1|APC_POSITION#AOTU...| 1.9848838|      3.0|MEAN#RANGE|
|L11CD02A|1F.EEK10|EKT72_PM1|BOTTOMFLOWRATE#AO...| 0.9045629|      3.0|      MEAN|
|L11CD02A|1F.EEK10|EKT72_PM1|CENTER_GAS_PRESSU...|0.98242646|      3.0|      MEAN|
|L11CD02A|1F.EEK10|EKT72_PM1|CENTER_HE_PRESSUR...| 0.7724064|      3.0|      MEAN|
|L11CD02A|1F.EEK10|EKT72_PM1|ESC_CURRENT#AOTU_...| 0.6545386|      3.0|       MAX|
|L11CD02A|1F.EEK10|EKT72_PM1|LO_C1_VAR_CAPACIT...|  0.940183|      3.0|     RANGE|
|L11CD02A|1F.EEK10|EKT72_PM1|LO_RF_POWER#AOTU_...|0.92193615|      3.0|      MEAN|
|L11CD02A|1F.EEK10|EKT72_PM1|LO_RF_VPP#AOTU_ST...| 0.5198736|      3.0|      MEAN|
|L11CD02A|1F.EEK10|EKT72_PM1|PROCESS_GAS_5_CHF...| 0.9020127|      3.0|       SUM|
|L11

In [60]:
model_res_ss = get_finall_results_small_sample(f_res=f_res, bad_wafer_num=bad_wafer_num_small_sample)
model_res_ss.show()

+--------+--------+---------+--------------------+--------------------+----------+
|  PRODG1| OPER_NO|TOOL_NAME|     parametric_name|              weight|     stats|
+--------+--------+---------+--------------------+--------------------+----------+
|L11CD02A|1F.EEK10|EKT72_PM1|APC_POSITION#AOTU...|  0.1811487599999665|MEAN#RANGE|
|L11CD02A|1F.EEK10|EKT72_PM1|CENTER_GAS_PRESSU...| 0.08966033032687143|      MEAN|
|L15DV07A|1F.EEK10|EKT72_PM1|APC_POSITION#AOTU...| 0.08591935284863195|      MEAN|
|L11CD02A|1F.EEK10|EKT72_PM1|LO_C1_VAR_CAPACIT...| 0.08580501436300049|     RANGE|
|L11CD02A|1F.EEK10|EKT72_PM1|LO_RF_POWER#AOTU_...|   0.084139732715689|      MEAN|
|L11CD02A|1F.EEK10|EKT72_PM1|BOTTOMFLOWRATE#AO...| 0.08255417630960758|      MEAN|
|L11CD02A|1F.EEK10|EKT72_PM1|PROCESS_GAS_5_CHF...| 0.08232143583157621|       SUM|
|L11CD02A|1F.EEK10|EKT72_PM1|CENTER_HE_PRESSUR...| 0.07049302456904988|      MEAN|
|L11CD02A|1F.EEK10|EKT72_PM1|UPPER_TEMPERATURE...|  0.0699344365422384|      MEAN|
|L15

In [63]:
final_res_ss = model_res_ss.withColumn('add', lit(0))
final_res_add_columns = add_certain_column(df=final_res_ss, by='add', request_id=request_id)
final_res_add_columns.show()

+--------+--------+---------+----------+--------------------+-----------+----------+--------------+--------+
|  PRODG1| OPER_NO|TOOL_NAME|     stats|     parametric_name|     weight|request_id|weight_percent|index_no|
+--------+--------+---------+----------+--------------------+-----------+----------+--------------+--------+
|L11CD02A|1F.EEK10|EKT72_PM1|MEAN#RANGE|APC_POSITION#AOTU...| 0.18114875|       fff|     18.114876|       1|
|L11CD02A|1F.EEK10|EKT72_PM1|      MEAN|CENTER_GAS_PRESSU...| 0.08966033|       fff|      8.966033|       2|
|L15DV07A|1F.EEK10|EKT72_PM1|      MEAN|APC_POSITION#AOTU...| 0.08591935|       fff|      8.591935|       3|
|L11CD02A|1F.EEK10|EKT72_PM1|     RANGE|LO_C1_VAR_CAPACIT...|0.085805014|       fff|      8.580502|       4|
|L11CD02A|1F.EEK10|EKT72_PM1|      MEAN|LO_RF_POWER#AOTU_...|0.084139735|       fff|      8.413973|       5|
|L11CD02A|1F.EEK10|EKT72_PM1|      MEAN|BOTTOMFLOWRATE#AO...| 0.08255418|       fff|      8.255418|       6|
|L11CD02A|1F.EEK10|

In [40]:
# df_run_ss_pandas = df_run_ss.toPandas()
# df_run_ss_pandas

# df_run_bs_pandas[df_run_bs_pandas['label'] == 0]['parametric_name'].nunique()

# df_run_bs_pandas[df_run_bs_pandas['label'] == 1]['parametric_name'].nunique()

# df_pivot = df_run_ss_pandas.dropna(axis=0).pivot_table(index=['WAFER_ID', 'label'], 
#                                                  columns=['OPER_NO', 'TOOL_NAME', 'parametric_name'],
#                                                  values=['STATISTIC_RESULT'])

# df_pivot.columns = df_pivot.columns.map('#'.join)
# df_pivot = df_pivot.fillna(df_pivot.mean()).reset_index(drop=False)
# df_pivot

# df_pivot1 = df_pivot.copy()
# df_pivot_all = pd.concat([df_pivot, df_pivot1], axis=0)

# df_pivot_all

# # 定义自变量
# x_train = df_pivot_all[df_pivot_all.columns.difference(['WAFER_ID', 'label']).tolist()]

# # 建立模型
# model = pca(n_components=min(x_train.shape[0], x_train.shape[1])-1, verbose=None)
# results = model.fit_transform(x_train)
# res_top = results['topfeat']
# res_top_select = res_top[res_top['type'] == 'best'][['feature', 'loading']]
# res_top_select = res_top_select.drop_duplicates()

# res.toPandas().sort_values('importance')

------------------------------

In [77]:
#####################################################################################
####################################小样本算法整合###################################
#####################################################################################

def fit_small_data_model(df_run, common_res, grpby_list, request_id):
    
    df1 = None
    df2 = None

    data_dict_list_ss = get_data_list(common_res=common_res, grpby_list=grpby_list, big_or_small='small')
    print("data_dict_list_ss:", data_dict_list_ss)
    if len(data_dict_list_ss) == 0:
        msg = '该查询条件下数据库中实际BAD_WAFER数量为0, 无法分析'
        df_kafka = pd.DataFrame({"code": 1, "msg": f'{msg}', "requestId": request_id}, index=[0])
        df1 = spark.createDataFrame(df_kafka)
        return df1, df2

    df_run_ss = get_train_data(df_run=df_run, data_dict_list=data_dict_list_ss)
    if df_run_ss.count() == 0:
        msg = '数据库中暂无此类数据!'
        df_kafka = pd.DataFrame({"code": 1, "msg": f'{msg}', "requestId": request_id}, index=[0])
        df1 = spark.createDataFrame(df_kafka)
        return df1, df2

    bad_wafer_num_small_sample = get_all_bad_wafer_num(df_run_ss)
    if bad_wafer_num_small_sample < 1:
        msg = '该查询条件下数据库中实际BAD_WAFER数量小于1片, 请提供更多的BAD_WAFER数量!'
        df_kafka = pd.DataFrame({"code": 1, "msg": f'{msg}', "requestId": request_id}, index=[0])
        df1 = spark.createDataFrame(df_kafka)
        return df1, df2

    res = fit_pca_small_sample(df=df_run_ss, by=grpby_list)
    if res.count() == 0:
        msg = '算法内部暂时异常!'
        df_kafka = pd.DataFrame({"code": 1, "msg": f'{msg}', "requestId": request_id}, index=[0])
        df1 = spark.createDataFrame(df_kafka)
        return df1, df2

    f_res = split_calculate_features_small_sample(df=res, by=grpby_list)
    if f_res.count() == 0:
        msg = '算法结果求和暂时异常'
        df_kafka = pd.DataFrame({"code": 1, "msg": f'{msg}', "requestId": request_id}, index=[0])
        df1 = spark.createDataFrame(df_kafka)
        return df1, df2

    model_res_ss = get_finall_results_small_sample(f_res=f_res, bad_wafer_num=bad_wafer_num_small_sample)
    if model_res_ss.count() == 0:
        msg = '算法结果拼接暂时异常'
        df_kafka = pd.DataFrame({"code": 1, "msg": f'{msg}', "requestId": request_id}, index=[0])
        df1 = spark.createDataFrame(df_kafka)
        return df1, df2

    final_res_ss = model_res_ss.withColumn('add', lit(0))
    final_res_add_columns = add_certain_column(df=final_res_ss, by='add', request_id=request_id)
    if final_res_add_columns.count() == 0:
        msg = '算法结果增加列暂时异常'
        df_kafka = pd.DataFrame({"code": 1, "msg": f'{msg}', "requestId": request_id}, index=[0])
        df1 = spark.createDataFrame(df_kafka)
        return df1, df2
    else:
        return df1, final_res_add_columns

In [78]:
#####################################################################################
################################将最后的结果写回数据库###############################
#####################################################################################
def doris_stream_load_from_df(df, engine, table, is_json=True, chunksize=100000, partitions=None):
    engine_url = engine.url
    url = 'http://%s:18030/api/%s/%s/_stream_load' % (engine_url.host, engine_url.database, table)

    format_str = 'csv' if not is_json else 'json'
    headers = {
        'Content-Type': 'text/plain; charset=UTF-8',
        'format': format_str,
        'Expect': '100-continue'
    }
    if is_json:
        headers['strip_outer_array'] = 'true'
        headers['read_json_by_line'] = 'true'
    else:
        headers['column_separator'] = '@'
    
    if partitions:
        headers['partitions'] = partitions
    
    auth = requests.auth.HTTPBasicAuth(engine_url.username, engine_url.password)
    session = requests.sessions.Session()
    session.should_strip_auth = lambda old_url, new_url: False
    
    l = len(df)
    if l > 0:
        if chunksize and chunksize < l:
            batches = l // chunksize
            if l % chunksize > 0:
                batches += 1
            for i in range(batches):
                si = i * chunksize
                ei = min(si + chunksize, l)
                sub = df[si:ei]
                do_doris_stream_load_from_df(sub, session, url, headers, auth, is_json)
        else:
            do_doris_stream_load_from_df(df, session, url, headers, auth, is_json)


def do_doris_stream_load_from_df(df, session, url, headers, auth, is_json=False):
    data = df.to_csv(header=False, index=False, sep='@') if not is_json else df.to_json(orient='records', date_format='iso')
    #print(data)
    
    resp = session.request(
        'PUT',
        url = url,
        data=data.encode('utf-8'),
        headers=headers,
        auth=auth
    )
    print(resp.reason, resp.text)
    check_stream_load_response(resp.text)


def check_stream_load_response(resp_text):
    resp = json.loads(resp_text)
    if resp['Status'] not in ["Success", "Publish Timeout"]:
        raise Exception(resp['Message'])

In [45]:
##########################################################################################
#######################################正式调用以上函数#######################################
##########################################################################################
# request_id = 'sdd'
# grpby_list = ['OPER_NO', 'TOOL_NAME']

# # 1. 解析json 为字典， df1为kafka输入的结果数据，获取到parse_dict, request_id, grpby_list
# df2 = df1.toPandas() 
# parse_dict, request_id, grpby_list = get_some_info(df2)
# print(type(parse_dict))
# print(grpby_list)

# # 2. 从kafka 关键字映射都具体数据源中的字段,没有的可以删除
# keyword_map_from_json_to_table: dict = {
#     "prodg1": "PRODG1",
#     "waferId": "WAFER_ID",
#     "dateRange": "START_TIME",
#     "productId": "PRODUCT_ID",
#     "operNo": "OPER_NO",
#     "eqp": "EQP_NAME",
#     "tool": "TOOL_NAME",
#     "lot": "LOT_ID",
#     "recipeName": "RECIPE_NAME"}

# # 3. 获取查询条件list
# select_condition_list = parse_dict

# # 4. 指定查询表名, 根据实际情况需要修改
# table_name = "etl.DWD_POC_CASE_FD_UVA_DATA_TEST"

In [None]:
# from pyspark.sql import SparkSession
# spark = (SparkSession.builder
#             .master("local[*]")
#             .config("spark.jars.packages", "ai.catboost:catboost-spark_3.3_2.12:1.2")
#             .appName("RF")
#             .getOrCreate())

-----------------------

In [91]:
df_pandas = pd.read_csv("small4.csv")

df1 = ps.from_pandas(df_pandas).to_spark()
print(df1.count())

good = ['NBX392-15', 'NBX392-20', 'NBX392-24', 'NBX391-24', 'NBX391-25', 'NBX548-09', 'NBX391-01', 'NBX391-02', 'NBX391-13', 'NBX391-17']
bad  = ['NBX500-10', 'NBX500-01', 'NBX500-09']

if 'label' in df1.columns:
    df1 = df1
else:
    df1 = get_label_double(df1, bad, good)

  fields = [
  for column, series in pdf.iteritems():


10


In [92]:
df1.show()

+-------+---------------+------+--------+----------+--------------+--------+--------+----------+---------+--------------------+----------+-----------+----------+--------------------+-------------------+------------+-------------+--------------+-------+---------------+-------------------+-------------------+----------------+----------------+----------------+----------------+----------------+-------------+-------------+-------------+----------+-------------+------+------------+---------+----------------+-------+-----+
+-------+---------------+------+--------+----------+--------------+--------+--------+----------+---------+--------------------+----------+-----------+----------+--------------------+-------------------+------------+-------------+--------------+-------+---------------+-------------------+-------------------+----------------+----------------+----------------+----------------+----------------+-------------+-------------+-------------+----------+-------------+------+-----------

In [96]:
# 主程序
try:
    # 从数据库中获取数据
#     df1 = get_data_from_doris(select_condition_list=select_condition_list, table_name=table_name)
#     print(df1.count())
#     if df1.count() == 0:
#         msg = '解析SQL获取数据异常: 数据库中可能没有数据!'
#         df_kafka = pd.DataFrame({"code": 1, "msg": f'{msg}', "requestId": request_id}, index=[0])
#         df1 = spark.createDataFrame(df_kafka)
#         raise ValueError

    # 1. 站点融合和数据预处理
    df1 = integrate_operno(df=df1, merge_operno_list=merge_operno)
    print(df1.count())
    df_run = _pre_process(df1)
    print(df_run.count())
    if df_run.count() == 0:
        msg = '该条件下数据库中暂无数据，请检查！'
        df_kafka = pd.DataFrame({"code": 1, "msg": f'{msg}', "requestId": request_id}, index=[0])
        df1 = spark.createDataFrame(df_kafka)
        raise ValueError

    # 2. 进行共性分析
    common_res = commonality_analysis(df_run, grpby_list)
    common_res.show()
    if common_res.count() == 0:
        msg = '共性分析结果异常!'
        df_kafka = pd.DataFrame({"code": 1, "msg": f'{msg}', "requestId": request_id}, index=[0])
        df1 = spark.createDataFrame(df_kafka)
        raise ValueError

    # 3. 挑选出数据：bad和good要同时大于3
    data_dict_list_bs = get_data_list(common_res, grpby_list, big_or_small='big')
    print("data_dict_list_bs:", data_dict_list_bs)
    if len(data_dict_list_bs) != 0:
        print("****************大样本算法调用****************")
        df1, final_res_add_columns = fit_big_data_model(df_run, data_dict_list_bs, grpby_list, request_id)
    else:        
        print("****************小样本算法调用****************")
        df1, final_res_add_columns = fit_small_data_model(df_run, common_res, grpby_list, request_id)
    

    if df1 is not None:
        raise ValueError
    else:
        # final_res_add_columns 是最后的结果，要写回数据库
        # ddd = final_res_add_columns.toPandas()
        # user ="root"
        # host = "10.52.199.81"
        # password = "Nexchip%40123"
        # db = "etl"
        # port = 9030
        # engine = create_engine("mysql+pymysql://{user}:{password}@{host}:{port}/{db}".format(user = user,
        #                                                                                     password = password,
        #                                                                                     host = host,
        #                                                                                     port = port,
        #                                                                                     db = db))
        # doris_stream_load_from_df(ddd, engine, "results")

        # # 最终成功的话，就会输出下面这条
        print("运行成功")
        df_kafka = pd.DataFrame({"code": 0, "msg": "运行成功", "requestId": request_id}, index=[0])
        df1 = spark.createDataFrame(df_kafka)

except ValueError as ve:
    pass

except Exception as e:
    df_kafka = pd.DataFrame({"code": 1, "msg": f"主程序发生异常: {str(e)}", "requestId": request_id}, index=[0])
    df1 = spark.createDataFrame(df_kafka)

NameError: name 'spark' is not defined

In [97]:
print("最终的df1是：")
print(type(df1))
df1.show()

print("最终的算法结果是：")
print(type(final_res_add_columns))
final_res_add_columns.show()

最终的df1是：
<class 'NoneType'>


AttributeError: 'NoneType' object has no attribute 'show'

In [95]:
final_res_add_columns.show()

+--------+--------+---------------+-----+--------------------+------+----------+--------------+--------+
|  PRODG1| OPER_NO|      TOOL_NAME|stats|     parametric_name|weight|request_id|weight_percent|index_no|
+--------+--------+---------------+-----+--------------------+------+----------+--------------+--------+
|L2800Z2N|1G.PPB10|PBT01_CLHA_4-12| MEAN|PLATE_TEMP#DHP150...|   1.0|       fff|         100.0|       1|
+--------+--------+---------------+-----+--------------------+------+----------+--------------+--------+

