In [1]:
import gc
import json
import time

import numpy as np
import pandas as pd
from mypackage.common_tools import *
import phonenumbers
from phonenumbers import PhoneNumber
from multiprocessing import Pool,Process,Queue,Manager,resource_tracker
from collections import defaultdict
import gc
import asyncio
import  traceback
from joblib import Parallel, delayed
from datetime import datetime
# 遍历文件夹sms_url的文件，文件名格式为raw_sms2023-06-01/raw_sms2024-07-01，按月份统计逾期率
import os
import re

In [2]:
from pathlib import Path
raw_dir = Path('/home/longxiaolei/raw_mx/newcust')
work_dir = Path('/home/mayongzhi/marshall/sender_word_call_app')

In [3]:
def get_res_from_proc(pid):
    try:
        with open(f'/proc/{pid}/statm', 'r') as f:
            data = f.read().split() 
            res = data[1]  # 第二个字段是 RES
            # 转成GB
            res = int(res) * 4 / (1024 * 1024)
            return res
    except FileNotFoundError:
        print(f"PID {pid} not found.")
    except Exception as e:
        print(f"Error reading /proc: {e}")
    return None

In [4]:
def parse_json_data(df, json_column, id_column, retain_column=None):
    """
    将df中的json字符串的摊平，并根据想要保留的字段，保留相关的值
    :param df:  待摊平处理的dataframe
    :param json_column: json字符串所在的那一列
    :param id_column: 唯一关联主键，后续用于定位转化失败的数据，或外部唯一关联的id
    :param retain_column: 待保留的列，可以是字符串，也可以是数组保存多列
    :return:
    """
    new_rows = []
    for i, row in df.iterrows():
        json_data_str = row[json_column]
        id_column_value = row[id_column]
        if isinstance(json_data_str, str):
            try:
                json_data = json.loads(json_data_str)
                for item in json_data:
                    new_row = item
                    new_row[id_column] = id_column_value
                    if isinstance(retain_column, str) and len(retain_column) > 0:
                        new_row[retain_column] = row[retain_column]
                    elif isinstance(retain_column, list) and len(retain_column) > 0:
                        for column in retain_column:
                            new_row[column] = row[column]
                    new_rows.append(new_row)
            except json.JSONDecodeError:
                print(f"Failed to parse JSON data for {id_column}:{id_column_value}")
                continue
    return pd.DataFrame(new_rows)

In [5]:
def data_of_dir(dir_path: str, contains_flags="", start_date='2023-01-01', end_date='2999-01-01'):
    def _fetch_filenams(dir_path: str, contain_flag, start_date=None, end_date=None):
        file_paths = []
        contain_flag = contain_flag or ""
        pattern = r"\d{4}-\d{2}-\d{2}"
        for file_name in os.listdir(dir_path):
            if (contain_flag in file_name) and (
            file_name.endswith(('.pqt', '.parquet', '.csv', '.xlsx', '.pickle', '.pkl'))):
                if start_date is None:
                    file_paths.append(os.path.join(dir_path, file_name))
                else:
                    match = re.search(pattern, file_name)
                    date = match.group()  # type: ignore
                    if (date >= start_date) and (date < end_date):
                        file_paths.append(os.path.join(dir_path, file_name))
        file_paths.sort()
        return file_paths

    if isinstance(contains_flags, str) or contains_flags is None:
        return _fetch_filenams(dir_path, contains_flags, start_date, end_date)
    elif isinstance(contains_flags, list):
        file_names = None
        for contains_flag in contains_flags:  # type: ignore
            if file_names is None:
                file_names = _fetch_filenams(dir_path, contains_flag, start_date, end_date)
            else:
                file_names = file_names + _fetch_filenams(dir_path, contains_flag, start_date, end_date)
        return file_names

In [6]:
def month_window(month_list:list[str],window_size:int):
    from dateutil.relativedelta import relativedelta
    date_objects = [datetime.strptime(date, '%Y-%m') for date in month_list]
    results = []
    for date in date_objects:
        end_month = date - relativedelta(months=1)
        start_month = date - relativedelta(months=window_size)
        results.append((date.strftime('%Y-%m'),end_month.strftime('%Y-%m'),start_month.strftime('%Y-%m')))
    return results

In [7]:
def parallel_process(task_function,task_list,process_num=10):
    start_time = time.time()
    def _task(task_function,queue,task_params):
        try:
            task_function(*task_params)
        finally:
            queue.put(True)
    from multiprocessing import Queue, Process
    mp_queue = Queue()
    for i in range(process_num):
        mp_queue.put(True)
    for task_params in tqdm(task_list):
        mp_queue.get()
        Process(target=_task,args=(task_function,mp_queue,task_params)).start()
    for i in range(process_num):
        mp_queue.get()
    mp_queue.close()
    gc.collect()
    print(f'任务完成，总计{len(task_list)}个任务,耗时{time.time()-start_time}s')

# 提取phone的逻辑
1. 保留字母型号码
2. 查看是否有以‘+’开头且包含字母的号码
3. 将以‘+’开头且不包含字母的号码和纯数字号码进行phone_normalize验证

In [8]:
def normalize_phone(phone, country_id):
    # 去掉phone中的非数字和非字母
    def phone_normalize(phone):
        if len(phone) == 0 or phone is None:
            return 0, '', ''
        if phone[0] == '+':
            phone = '+' + phone.replace('+', '').lstrip('0')
            length = len(phone) - 1
        else:
            phone = phone.lstrip('0')
            length = len(phone)
        is_vaild, country_code, national_phone = 0, '', phone.lstrip('+')

        if length < 7:
            return str(national_phone)

        try:
            if phone.startswith('+'):
                parse_info = phonenumbers.parse(phone)
            else:
                parse_info = phonenumbers.parse(phone, country_id.upper())

            # print(phonenumbers.is_valid_number(parse_info))
            is_vaild = 1 if phonenumbers.is_valid_number(parse_info) else 0
            if is_vaild:
                # country_code = parse_info.country_code
                national_phone = parse_info.national_number
                return str(national_phone)
        except Exception:
            pass
        return str(national_phone)

    phone = str(phone)
    try:
        if len(phone) == 0 or phone is None:
            return ''
        if phone[0] == '+':
            phone = '+' + ''.join(filter(str.isalnum, phone[1:])).lower()
        else:
            phone = ''.join(filter(str.isalnum, phone)).lower()  # 去掉phone中的非数字和非字母
    except Exception as e:
        pass
    if any(c.isalpha() for c in phone):
        return phone.replace('+', '')
    return phone_normalize(phone)

# data_sample处理过程
1. 每一行parse_json_data提取出'src_phone'、'app_order_id'、'apply_time'、'agr_pd7'、'def_pd7'列
2. 对'src_phone'列的数据进行normalize_phone处理，将结果存入parse后的'national_phone'列
3. 统计'national_phone'列的数据

# 按月份统计逾期率

$ phone_i\_overdue\_rate = \frac{\frac{Bad\_Count_i}{Total\_Count_i}} {\frac{Bad\_label}{Total\_label}} $

# sender

In [9]:
def get_senders(sms_data):
    """
    获取每行的phone
    :param sms_data: df 中的'sms_data'列
    :return: 
    """
    # row = pd.DataFrame(row).T
    # print(row)
    # 如果‘sms_data'列为空或者为'[]'，则返回空字典
    
    if sms_data == '[]' or sms_data == '' or sms_data is None:
        return []
    parse_df = pd.DataFrame(json.loads(sms_data))
    if 'src_phone' not in parse_df.columns:
        print('src_phone not in columns')
        return []
    # if len(parse_df[parse_df['src_phone'].str.len()>30]) > 0:
    #     print(parse_df[parse_df['src_phone'].str.len()>30]['src_phone'])
    parse_df['national_phone'] = parse_df['src_phone'].apply(lambda x: normalize_phone(x,'MX'))
    senders = parse_df['national_phone'].tolist()
    return senders

In [10]:

def generate_month_sender_config(file_path=None, month=None, sender_data=None):
    """
    生成某个月sender逾期率文件
    :param file_path: 生成配置文件的文件路径
    :param month: 月份,'YYYY-MM'
    :param sender_data: 如果sender_data不为空，则直接使用sender_data
    :return: 
    """
    tmp_dir = Path('/home/mayongzhi/marshall/sender_word_call_app/tmp_config')
    if not os.path.exists(tmp_dir):
        os.makedirs(tmp_dir)
    # 确保当sender_data不为空时，file_path和month为空，反之亦然
    assert (sender_data is None) != (file_path is None and month is None), "if sender_data is not None, file_path and month should be None and vice versa"
    if sender_data is None:
        file_names = os.listdir(file_path)

        month_files = defaultdict(list)
        for file_name in file_names:
            file_name = file_path / file_name
            match = re.search(r'(\d{4}-\d{2})',str(file_name))
            if match and match.group() == month:
                month_files[month].append(str(file_name))
        files = month_files[month]
        print(files)
        sender_data = batch_load_files_to_df(files)
    sender_data = sender_data[['sms_data','def_pd7','agr_pd7']] 
    sender_data['bad'] = np.where(sender_data['agr_pd7'] == 1, sender_data['def_pd7'], None)
    sender_data = sender_data[sender_data['bad'].notnull()]
    bad_rate_total = sender_data['bad'].mean()

    df_month_sender = pd.DataFrame()
    df_month_sender['sender'] = Parallel(n_jobs=10)(delayed(get_senders)(sms_data) for sms_data in sender_data['sms_data'])
    df_month_sender['def_pd7'] = sender_data['def_pd7']
    df_month_sender = df_month_sender.explode('sender')
    df_month_sender = df_month_sender[df_month_sender['sender'].apply(lambda x: isinstance(x, str))]
    
    print(df_month_sender.shape)
    overdue_rate_df = df_month_sender.groupby('sender')['def_pd7'].agg(['sum','count']).reset_index()
    overdue_rate_df = overdue_rate_df.rename(columns={'sum':'bad_count','count':'total_count'})
    overdue_rate_df['bad_rate_total'] =  bad_rate_total
    overdue_rate_df['bad_rate'] = overdue_rate_df['bad_count'] / overdue_rate_df['total_count']
    
    print(month)
    print(overdue_rate_df.shape)
    print('before:',get_res_from_proc(os.getpid()))
    # 释放内存
    del df_month_sender, sender_data
    gc.collect()
    print('after:',get_res_from_proc(os.getpid()))
    overdue_rate_df.to_parquet(tmp_dir / f'sender_overdue_rate_{month}.parquet',compression='zstd')
    return overdue_rate_df

In [13]:
tmp_dir = Path('/home/mayongzhi/marshall/sender_word_call_app/tmp_config')
if not os.path.exists(tmp_dir):
    os.makedirs(tmp_dir)

In [11]:
for month in ['2023-08','2023-09','2023-10','2023-11','2023-12','2024-01','2024-02','2024-03','2024-04','2024-05','2024-06','2024-07','2024-08']:
    overdue_rate_config = generate_month_sender_config(file_path=raw_dir,month=month)
    overdue_rate_config.to_parquet(f'sender_overdue_rate_{month}.parquet',compression='zstd')

['/home/longxiaolei/raw_mx/newcust/raw_YM_2023-08-18.pqt', '/home/longxiaolei/raw_mx/newcust/raw_YM_2023-08-19.pqt', '/home/longxiaolei/raw_mx/newcust/raw_YM_2023-08-20.pqt', '/home/longxiaolei/raw_mx/newcust/raw_YM_2023-08-21.pqt', '/home/longxiaolei/raw_mx/newcust/raw_YM_2023-08-23.pqt', '/home/longxiaolei/raw_mx/newcust/raw_YM_2023-08-24.pqt', '/home/longxiaolei/raw_mx/newcust/raw_YM_2023-08-25.pqt', '/home/longxiaolei/raw_mx/newcust/raw_YM_2023-08-26.pqt', '/home/longxiaolei/raw_mx/newcust/raw_YM_2023-08-28.pqt', '/home/longxiaolei/raw_mx/newcust/raw_YM_2023-08-29.pqt', '/home/longxiaolei/raw_mx/newcust/raw_YM_2023-08-30.pqt', '/home/longxiaolei/raw_mx/newcust/raw_YM_2023-08-31.pqt', '/home/longxiaolei/raw_mx/newcust/raw_M_2023-08-01.pqt', '/home/longxiaolei/raw_mx/newcust/raw_M_2023-08-02.pqt', '/home/longxiaolei/raw_mx/newcust/raw_M_2023-08-03.pqt', '/home/longxiaolei/raw_mx/newcust/raw_M_2023-08-04.pqt', '/home/longxiaolei/raw_mx/newcust/raw_M_2023-08-05.pqt', '/home/longxiaolei

In [14]:
check = pd.read_parquet('sender_overdue_rate_2023-06.parquet')
check.head()

Unnamed: 0,sender,bad_count,total_count,bad_rate,woe
0,,57,117,0.487179,0.075665
1,1.0,7,9,0.777778,1.37972
2,10.0,1,1,1.0,0.0
3,100.0,0,7,0.0,0.0
4,1000.0,2,2,1.0,0.0


In [48]:
# sender_overdue_rate_2024_07  文件大小，单位为MB
os.path.getsize('sender_overdue_rate_2024-07.parquet') / 1024 / 1024

11.449203491210938

In [10]:
def parallel_process(task_function,task_list,process_num=10):
    """
    task_function : 任务函数
    task_list : 任务列表
    """
    start_time = time.time()
    def _task(task_function,queue,task_params):
        try:
            task_function(*task_params)
        finally:
            queue.put(True)
    from multiprocessing import Queue, Process
    mp_queue = Queue()
    for i in range(process_num):
        mp_queue.put(True)
    for task_params in tqdm(task_list):
        mp_queue.get()
        Process(target=_task,args=(task_function,mp_queue,task_params)).start() # mq_queue 用于控制进程数
        # 完成后kill掉进程
        process_id = os.getpid()
        os.system(f'kill -9 {process_id}')
    for i in range(process_num):
        mp_queue.get()
    mp_queue.close()
    gc.collect()
    print(f'任务完成，总计{len(task_list)}个任务,耗时{time.time()-start_time}s')

# words

In [9]:
# 西班牙语停用词
# import nltk
# nltk.download('stopwords')
from nltk.corpus import stopwords
stop_words = set(stopwords.words('spanish'))
# 添加自定义停用词
user_stop_words = ['body','phone', 'src_phone', 'read', 'time', 'type']
stop_words.update(user_stop_words)
print(stop_words)
print('body' in stop_words)
len(stop_words)

{'son', 'entre', 'sus', 'estando', 'tuyas', 'serás', 'seas', 'estuvo', 'tenido', 'tendrá', 'tuviéramos', 'ante', 'fuimos', 'type', 'seré', 'fuesen', 'vuestros', 'ese', 'tengamos', 'uno', 'vosotros', 'desde', 'eso', 'tenía', 'tendríamos', 'durante', 'las', 'todo', 'hubieran', 'mío', 'unos', 'tendréis', 'antes', 'estábamos', 'nosotros', 'fue', 'somos', 'tuvieseis', 'sintiendo', 'le', 'ellos', 'estén', 'estaréis', 'serías', 'tendremos', 'estaríamos', 'fuésemos', 'tiene', 'habrán', 'seremos', 'fuera', 'time', 'estés', 'contra', 'serían', 'sentidos', 'nada', 'muy', 'tuvieras', 'tuve', 'tengan', 'estarán', 'un', 'quien', 'otro', 'fueras', 'vuestras', 'sois', 'tuvieran', 'serán', 'estamos', 'teníais', 'hubiesen', 'estaría', 'es', 'habían', 'tengo', 'estarás', 'otras', 'eres', 'body', 'tendrían', 'su', 'de', 'estás', 'habríamos', 'como', 'tú', 'eran', 'fueran', 'seríamos', 'he', 'seríais', 'el', 'otros', 'hemos', 'estas', 'tenida', 'erais', 'los', 'hubo', 'habidos', 'tienen', 'estuvieron', 'su

319

In [18]:
from concurrent.futures import ProcessPoolExecutor
# parse后统计‘body’列的词频
from collections import Counter
# 并行处理

def get_words(sms_data):
    """
    获取每行的word
    :param sms_data: df 中的'sms_data'列
    :return: 根据sms_data提取的word
    """
    if sms_data == '[]' or sms_data == '' or sms_data is None:
        return []
    parse_df = pd.DataFrame(json.loads(sms_data))
    if 'body' not in parse_df.columns:
        print('body not in columns','id:')
        return []
    
    words = parse_df['body'].apply(lambda x: set(re.compile(r'\b[Ññáéíóú¡a-z]+\b').findall(x.lower() if isinstance(x,str) else '')).difference(stop_words))
    words = words.apply(lambda x:list(filter(lambda x: len(x) > 2, x)))
    words = set(sum(words.tolist(),[]))
    words = list(words)
    return words
    
def generate_month_word_config(file_path=None, month=None, word_data=None):
    """
    生成某个月word逾期率文件
    :param file_path: 生成配置文件的文件路径
    :param month: 月份
    :param word_data: 如果word_data不为空，则直接使用word_data
    """
    try:
        tmp_dir = Path('/home/mayongzhi/marshall/sender_word_call_app/tmp_config')
        assert (word_data is None) != (file_path is None and month is None), "if word_data is not None, file_path and month should be None and vice versa"
        if word_data is None:
            file_names = os.listdir(file_path)
            
            month_files = defaultdict(list)
            for file_name in file_names:
                manager = Manager()
                file_names = os.listdir(file_path)

                month_files = defaultdict(list)
                for file_name in file_names:
                    file_name = file_path / file_name
                    match = re.search(r'(\d{4}-\d{2})',str(file_name))
                    if match and match.group() == month:
                        month_files[month].append(str(file_name))
            files = month_files[month]
            print(files)
            word_data = batch_load_files_to_df(files)
        word_data = word_data[['sms_data','def_pd7','agr_pd7']]
        word_data['bad'] = np.where(word_data['agr_pd7'] == 1, word_data['def_pd7'], None)
        word_data = word_data[word_data['bad'].notnull()]
        bad_rate_total = word_data['bad'].mean()
        
        df_month_word = pd.DataFrame()
        df_month_word['word'] = Parallel(n_jobs=15)(delayed(get_words)(sms_data) for sms_data in word_data['sms_data'])
        df_month_word['def_pd7'] = word_data['def_pd7']
        df_month_word = df_month_word.explode('word')
        df_month_word = df_month_word[df_month_word['word'].apply(lambda x: isinstance(x, str))]
        print(df_month_word.shape)
        overdue_rate_df = df_month_word.groupby('word')['def_pd7'].agg(['sum','count']).reset_index()
        overdue_rate_df = overdue_rate_df.rename(columns={'sum':'bad_count','count':'total_count'})
        overdue_rate_df['bad_rate_total'] =  bad_rate_total
        overdue_rate_df['bad_rate'] = overdue_rate_df['bad_count'] / overdue_rate_df['total_count']
        overdue_rate_df.to_parquet(tmp_dir / f'word_overdue_rate_{month}.parquet',compression='zstd')
        # del df_month_word, word_data, overdue_rate_df
        # gc.collect()
        return 0
    except Exception as e:
        print(f"Error: {e}, month: {month}",traceback.format_exc())
        return 1
    # return overdue_rate_df

In [11]:
for month in ['2024-06','2024-07','2024-08']:
    generate_month_word_config(file_path=raw_dir,month=month)

KeyboardInterrupt: 

In [None]:
print('a')

In [75]:
months = ['2024-06','2024-07','2024-08']
task_list = [(raw_dir,month) for month in months]
parallel_process(generate_month_word_config,task_list,process_num=2)

  0%|          | 0/3 [00:00<?, ?it/s]

['/home/longxiaolei/raw_mx/newcust/raw_AM_2024-07-02.pqt', '/home/longxiaolei/raw_mx/newcust/raw_AM_2024-07-03.pqt', '/home/longxiaolei/raw_mx/newcust/raw_AM_2024-07-04.pqt', '/home/longxiaolei/raw_mx/newcust/raw_AM_2024-07-05.pqt', '/home/longxiaolei/raw_mx/newcust/raw_AM_2024-07-07.pqt', '/home/longxiaolei/raw_mx/newcust/raw_AM_2024-07-08.pqt', '/home/longxiaolei/raw_mx/newcust/raw_AM_2024-07-09.pqt', '/home/longxiaolei/raw_mx/newcust/raw_AM_2024-07-10.pqt', '/home/longxiaolei/raw_mx/newcust/raw_AM_2024-07-12.pqt', '/home/longxiaolei/raw_mx/newcust/raw_AM_2024-07-13.pqt', '/home/longxiaolei/raw_mx/newcust/raw_AM_2024-07-14.pqt', '/home/longxiaolei/raw_mx/newcust/raw_AM_2024-07-15.pqt', '/home/longxiaolei/raw_mx/newcust/raw_AM_2024-07-18.pqt', '/home/longxiaolei/raw_mx/newcust/raw_AM_2024-07-19.pqt', '/home/longxiaolei/raw_mx/newcust/raw_AM_2024-07-20.pqt', '/home/longxiaolei/raw_mx/newcust/raw_AM_2024-07-21.pqt', '/home/longxiaolei/raw_mx/newcust/raw_AM_2024-07-23.pqt', '/home/longxi



(68232189, 2)
进程50682完成


100%|██████████| 3/3 [15:35<00:00, 311.76s/it]


['/home/longxiaolei/raw_mx/newcust/raw_AM_2024-08-02.pqt', '/home/longxiaolei/raw_mx/newcust/raw_AM_2024-08-03.pqt', '/home/longxiaolei/raw_mx/newcust/raw_AM_2024-08-04.pqt', '/home/longxiaolei/raw_mx/newcust/raw_AM_2024-08-05.pqt', '/home/longxiaolei/raw_mx/newcust/raw_AM_2024-08-07.pqt', '/home/longxiaolei/raw_mx/newcust/raw_AM_2024-08-08.pqt', '/home/longxiaolei/raw_mx/newcust/raw_AM_2024-08-09.pqt', '/home/longxiaolei/raw_mx/newcust/raw_AM_2024-08-10.pqt', '/home/longxiaolei/raw_mx/newcust/raw_AM_2024-08-12.pqt', '/home/longxiaolei/raw_mx/newcust/raw_AM_2024-08-13.pqt', '/home/longxiaolei/raw_mx/newcust/raw_AM_2024-08-14.pqt', '/home/longxiaolei/raw_mx/newcust/raw_AM_2024-08-15.pqt', '/home/longxiaolei/raw_mx/newcust/raw_AM_2024-08-17.pqt', '/home/longxiaolei/raw_mx/newcust/raw_AM_2024-08-18.pqt', '/home/longxiaolei/raw_mx/newcust/raw_AM_2024-08-19.pqt', '/home/longxiaolei/raw_mx/newcust/raw_YM_2024-08-02.pqt', '/home/longxiaolei/raw_mx/newcust/raw_YM_2024-08-03.pqt', '/home/longxi



(105627345, 2)
进程50681完成
任务完成，总计3个任务,耗时1273.6771664619446s


## check_word_config

In [None]:
word_config = pd.read_parquet(tmp_dir / 'word_overdue_rate_2023-06.parquet')
word_config['total_count'].describe()

In [None]:
for month in ['2023-08','2023-09','2023-10','2023-11','2023-12','2024-01','2024-02','2024-03','2024-04','2024-05','2024-06','2024-07','2024-08']:
    overdue_rate_config = generate_month_word_config(file_path=raw_dir,month=month)
    overdue_rate_config.to_parquet(f'word_overdue_rate_{month}.parquet',compression='zstd')

# applist

In [27]:
def get_apps(applist_data):
    """
    获取每行的app
    :param applist_data: df 中的'applist_data'列
    :return: 
    """
    if applist_data == '[]' or applist_data == '' or applist_data is None:
        return []
    parse_df = pd.DataFrame(json.loads(applist_data))
    if 'app_package' not in parse_df.columns:
        print('app_package not in columns')
        return []
    parse_df['app_package'] = parse_df['app_package'].apply(lambda x: x.lower() if isinstance(x,str) else '')
    apps = parse_df['app_package'].tolist()
    return apps

In [59]:
def generate_month_applist_config(file_path=None, month=None, applist_data=None):
    """
    生成某个月app逾期率文件
    :param file_path: 生成配置文件的文件路径
    :param month: 月份,'YYYY-MM'
    :param applist_data: 如果applist_data不为空，则直接使用applist_data
    :return: 
    """
    try:
        tmp_dir = Path('/home/mayongzhi/marshall/sender_word_call_app/tmp_config')
        assert (applist_data is None) != (file_path is None and month is None), "if applist_data is not None, file_path and month should be None and vice versa"
        if applist_data is None:
            file_names = os.listdir(file_path)
            month_files = defaultdict(list)
            for file_name in file_names:
                file_name = file_path / file_name   
                match = re.search(r'(\d{4}-\d{2})',str(file_name))
                if match and match.group() == month: 
                    month_files[month].append(str(file_name))
            files = month_files[month]
            print(files)
            applist_data = batch_load_files_to_df(files)
        applist_data = applist_data[['applist_data','def_pd7','agr_pd7']]
        applist_data['bad'] = np.where(applist_data['agr_pd7'] == 1, applist_data['def_pd7'], None)
        applist_data = applist_data[applist_data['bad'].notnull()]
        bad_rate_total = applist_data['bad'].mean()

        df_month_app = pd.DataFrame()
        df_month_app['app'] = Parallel(n_jobs=15)(delayed(get_apps)(applist_data) for applist_data in applist_data['applist_data'])
        df_month_app['def_pd7'] = applist_data['def_pd7']
        df_month_app = df_month_app.explode('app')
        print(df_month_app.shape)
        overdue_rate_df = df_month_app.groupby('app')['def_pd7'].agg(['sum','count']).reset_index()
        overdue_rate_df = overdue_rate_df.rename(columns={'sum':'bad_count','count':'total_count'})
        overdue_rate_df['bad_rate_total'] =  bad_rate_total
        overdue_rate_df['bad_rate'] = overdue_rate_df['bad_count'] / overdue_rate_df['total_count']
     
        print(month)
        print(overdue_rate_df.shape)
        # 释放内存
        del df_month_app, applist_data
        gc.collect()
        overdue_rate_df = overdue_rate_df[overdue_rate_df['app'].apply(lambda x: isinstance(x, str))]
        print(overdue_rate_df.shape)
        overdue_rate_df.to_parquet(tmp_dir / f'app_overdue_rate_{month}.parquet',compression='zstd')
        return 1
    except Exception as e:
        print(f"Error: {e}, month: {month}",traceback.format_exc())
        return 0

In [None]:
months = ['2024-03','2024-04','2024-05','2024-06','2024-07','2024-08']
task_list = [(raw_dir,month) for month in months]
parallel_process(generate_month_applist_config,task_list,process_num=2)

In [None]:
for month in ['2023-10','2023-11','2023-12','2024-01','2024-02','2024-03','2024-04','2024-05','2024-06','2024-07','2024-08']:
    generate_month_applist_config(file_path=raw_dir,month=month)

In [15]:
overdue_rate_config_07 = generate_month_applist_config(file_path=raw_dir,month='2024-07')
overdue_rate_config_07.to_parquet('app_overdue_rate_2024-07.parquet',compression='zstd')

(9868965, 2)
(56128, 3)


##  case_check

# calllog

m:becallPhone, ym:becallPhone, am:call_log_number

In [13]:
manager = Manager()
file_names = os.listdir('sms_url')

month_files = defaultdict(list)
for file_name in file_names:
    file_name = 'sms_url/' + file_name
    try:
        month = re.findall(r'(\d{4}-\d{2})_\w+', file_name)
        # print(month)
        month = re.findall(r'(\d{4}-\d{2})_\w+', file_name)[0][:7]
        month_files[month].append(file_name)
    except:
        pass
        # print(file_name)
month_files

defaultdict(list,
            {'2023-06': ['sms_url/2023-06_AM',
              'sms_url/2023-06_YM',
              'sms_url/2023-06_M'],
             '2023-07': ['sms_url/2023-07_AM',
              'sms_url/2023-07_YM',
              'sms_url/2023-07_M'],
             '2023-08': ['sms_url/2023-08_AM',
              'sms_url/2023-08_YM',
              'sms_url/2023-08_M'],
             '2023-09': ['sms_url/2023-09_AM',
              'sms_url/2023-09_YM',
              'sms_url/2023-09_M'],
             '2023-10': ['sms_url/2023-10_AM',
              'sms_url/2023-10_YM',
              'sms_url/2023-10_M'],
             '2023-11': ['sms_url/2023-11_AM',
              'sms_url/2023-11_YM',
              'sms_url/2023-11_M'],
             '2023-12': ['sms_url/2023-12_AM',
              'sms_url/2023-12_YM',
              'sms_url/2023-12_M'],
             '2024-01': ['sms_url/2024-01_AM',
              'sms_url/2024-01_YM',
              'sms_url/2024-01_M'],
             '2024-02': ['sms_

In [14]:
def extract_phone(data):
    if data is None or data == '[]':
        return set()
    entries = json.loads(data)
    phones = []
    for entry in entries:
        if 'becallPhone' in entry:
            phone = normalize_phone(entry.get('becallPhone'))
            if phone:
                phones.extend(phone)
        if 'call_log_number' in entry:
            phone = normalize_phone(entry.get('call_log_number'))
            if phone:
                phones.extend(phone)
    return set(phones)

def generate_month_calllog_config(month):
    """
    生成calllog配置文件：逾期率
    :param month: 
    :return: 
    """
    try:
        files = month_files[month]
        print(month)
        print(files)
        calllog_data = batch_load_files_to_df(files)
        calllog_data = calllog_data[calllog_data['agr_pd7'] == 1]
        month_overdue_rate = calllog_data['def_pd7'].mean()
        calllog_data = calllog_data[(calllog_data['call_records_url'].notnull()) & (calllog_data['call_records_url'] != '[]')]
        calllog_data['calllog'] = calllog_data['call_records_url_data'].apply(extract_phone)
        calllog_overdue_rate = {}
        for phones, label in zip(calllog_data['calllog'], calllog_data['def_pd7']):
            for phone in phones:
                phone = normalize_phone(str(phone))[-1]
                if phone not in calllog_overdue_rate:
                    calllog_overdue_rate[phone] = [0, 0, 0]
                if label == 1:
                    calllog_overdue_rate[phone][0] += 1
                calllog_overdue_rate[phone][1] += 1
                calllog_overdue_rate[phone][2] = round(calllog_overdue_rate[phone][0] / calllog_overdue_rate[phone][1], 4)
        overdue_rate_df = pd.DataFrame(calllog_overdue_rate).T
        overdue_rate_df.rename(columns={0: 'bad_count', 1: 'total_count', 2: 'rate'}, inplace=True)
        overdue_rate_df.sort_values('total_count', ascending=False, inplace=True)
        overdue_rate_df['woe'] = overdue_rate_df['rate'].apply(lambda x: x / month_overdue_rate)
        overdue_rate_df.reset_index(inplace=True)
        overdue_rate_df.rename(columns={'index':'phone'},inplace=True)
        overdue_rate_df['phone'] = overdue_rate_df['phone'].astype(str)
        overdue_rate_df.to_parquet(f'calllog_overdue_rate_{month}.parquet',compression='zstd')
        print(f"Finish processing {month}")
    except Exception as e:
        print(f"Error: {e}, month: {month}")
    finally:
        local_vars = list(locals().keys())
        for var in local_vars:
            del locals()[var]
        gc.collect()

for month in month_files.keys():
    generate_month_calllog_config(month)

2023-06
['sms_url/2023-06_AM', 'sms_url/2023-06_YM', 'sms_url/2023-06_M']


KeyboardInterrupt: 

In [11]:
files = month_files[month]
print(month)
print(files)
calllog_data = batch_load_files_to_df(files)
calllog_data = calllog_data[calllog_data['agr_pd7'] == 1]
calllog_data = calllog_data[(calllog_data['call_records_url'].notnull()) & (calllog_data['call_records_url'] != '[]')]
month_overdue_rate = calllog_data['def_pd7'].mean()
calllog_data['calllog'] = calllog_data['call_records_url_data'].apply(extract_phone)
calllog_overdue_rate = {}
for phones, label in zip(calllog_data['calllog'], calllog_data['def_pd7']):
    for phone in phones:
        phone = normalize_phone(str(phone))[-1]
        if phone not in calllog_overdue_rate:
            calllog_overdue_rate[phone] = [0, 0, 0]
        if label == 1:
            calllog_overdue_rate[phone][0] += 1
        calllog_overdue_rate[phone][1] += 1
        calllog_overdue_rate[phone][2] = round(calllog_overdue_rate[phone][0] / calllog_overdue_rate[phone][1], 4)
overdue_rate_df = pd.DataFrame(calllog_overdue_rate).T
overdue_rate_df.rename(columns={0: 'bad_count', 1: 'total_count', 2: 'rate'}, inplace=True)
overdue_rate_df.sort_values('total_count', ascending=False, inplace=True)
overdue_rate_df['woe'] = overdue_rate_df['rate'].apply(lambda x: x / month_overdue_rate)
overdue_rate_df.to_parquet(f'callog_overdue_rate_{month}.parquet',compression='zstd')
print(f"Finish processing {month}")

2023-07
['sms_url/2023-07_AM', 'sms_url/2023-07_YM', 'sms_url/2023-07_M']


ArrowTypeError: ("Expected bytes, got a 'int' object", 'Conversion failed for column None with type object')

In [14]:
calllog_data.shape

(5, 29)

In [13]:
overdue_rate_df.describe()

Unnamed: 0,bad_count,total_count,rate,woe
count,1677.0,1677.0,1677.0,1677.0
mean,0.494335,1.008348,0.489565,0.815941
std,0.506047,0.114266,0.498846,0.831409
min,0.0,1.0,0.0,0.0
25%,0.0,1.0,0.0,0.0
50%,0.0,1.0,0.0,0.0
75%,1.0,1.0,1.0,1.666667
max,3.0,4.0,1.0,1.666667


In [17]:
overdue_rate_df = overdue_rate_df.astype('float64')

In [18]:
overdue_rate_df.to_parquet(f'callog_overdue_rate_{month}.parquet',compression='zstd')

ArrowTypeError: ("Expected bytes, got a 'int' object", 'Conversion failed for column None with type object')

In [23]:
overdue_rate_df.columns

Index(['bad_count', 'total_count', 'rate', 'woe'], dtype='object')

In [30]:
overdue_rate_df.reset_index(inplace=True)

In [32]:
overdue_rate_df.head()

Unnamed: 0,index,bad_count,total_count,rate,woe
0,86,2.0,4.0,0.5,0.833333
1,4429800292,3.0,3.0,1.0,1.666667
2,4734540036,1.0,2.0,0.5,0.833333
3,5544380802,1.0,2.0,0.5,0.833333
4,5592735569,1.0,2.0,0.5,0.833333


In [33]:
overdue_rate_df.dtypes

index           object
bad_count      float64
total_count    float64
rate           float64
woe            float64
dtype: object

In [34]:
overdue_rate_df.isnull().sum()

index          0
bad_count      0
total_count    0
rate           0
woe            0
dtype: int64

In [35]:
overdue_rate_df.rename(columns={'index':'phone'},inplace=True)
overdue_rate_df['phone'] = overdue_rate_df['phone'].astype(str)

In [29]:
print(overdue_rate_df.dtypes)

bad_count      float64
total_count    float64
rate           float64
woe            float64
dtype: object


In [24]:
overdue_rate_df.head()

Unnamed: 0,bad_count,total_count,rate,woe
86,2.0,4.0,0.5,0.833333
4429800292,3.0,3.0,1.0,1.666667
4734540036,1.0,2.0,0.5,0.833333
5544380802,1.0,2.0,0.5,0.833333
5592735569,1.0,2.0,0.5,0.833333


In [25]:
overdue_rate_df.isnull().sum()

bad_count      0
total_count    0
rate           0
woe            0
dtype: int64

In [27]:
overdue_rate_df.fillna(0,inplace=True)

In [36]:
overdue_rate_df.to_parquet(f'callog_overdue_rate_{month}.parquet',compression='zstd')

In [21]:
# overdue_rate_df.to_parquet(f'callog_overdue_rate_{month}.parquet',compression='zstd')，报错：ArrowTypeError: ("Expected bytes, got a 'int' object", 'Conversion failed for column None with type object')

new_df = pd.DataFrame({'bad_count': overdue_rate_df['bad_count'],
                       'total_count': overdue_rate_df['total_count'],
                       'rate': overdue_rate_df['rate'],
                       'woe': overdue_rate_df['woe']})
new_df.to_parquet(f'callog_overdue_rate_{month}.parquet', compression='zstd')


ArrowTypeError: ("Expected bytes, got a 'int' object", 'Conversion failed for column None with type object')

In [19]:
overdue_rate_df['woe'].describe()

count    1677.000000
mean        0.815941
std         0.831409
min         0.000000
25%         0.000000
50%         0.000000
75%         1.666667
max         1.666667
Name: woe, dtype: float64

In [12]:
overdue_rate_df.head()

Unnamed: 0,bad_count,total_count,rate,woe
86,2.0,4.0,0.5,0.833333
4429800292,3.0,3.0,1.0,1.666667
4734540036,1.0,2.0,0.5,0.833333
5544380802,1.0,2.0,0.5,0.833333
5592735569,1.0,2.0,0.5,0.833333


In [10]:
process_month_calllog(month)

2023-07
['sms_url/2023-07_AM', 'sms_url/2023-07_YM', 'sms_url/2023-07_M']
Error: ("Expected bytes, got a 'int' object", 'Conversion failed for column None with type object'), month: 2023-07


In [9]:
for month in month_files.keys():
    process_month_calllog(month)

2023-06
['sms_url/2023-06_AM', 'sms_url/2023-06_YM', 'sms_url/2023-06_M']
Error: ("Expected bytes, got a 'int' object", 'Conversion failed for column None with type object'), month: 2023-06
2023-07
['sms_url/2023-07_AM', 'sms_url/2023-07_YM', 'sms_url/2023-07_M']


KeyboardInterrupt: 

In [32]:
list('abc') + list('def')

['a', 'b', 'c', 'd', 'e', 'f']

In [7]:
files = month_files[month]
print(month)
print(files)
calllog_data = batch_load_files_to_df(files)
calllog_data = calllog_data[calllog_data['agr_pd7'] == 1]
calllog_data = calllog_data[(calllog_data['call_records_url'].notnull()) & (calllog_data['call_records_url'] != '[]')]
month_overdue_rate = calllog_data['def_pd7'].mean()
# pattern 提取becallPhone或者call_log_number
def extract_phone(data):
    if data is None or data == '[]':
        return set()
    entries = json.loads(data)
    return set(
        normalize_phone(entry.get('becallPhone'))[-1]
        for entry in entries
        if 'becallPhone' in entry or 'call_log_number' in entry
    ).union(
        normalize_phone(entry.get('call_log_number'))[-1]
        for entry in entries
        if 'call_log_number' in entry
    )
# 如果becallPhone不存在，就提取call_log_number

calllog_data['calllog'] = calllog_data['call_records_url_data'].apply(extract_phone)

2024-06
['sms_url/2024-06_AM', 'sms_url/2024-06_YM', 'sms_url/2024-06_M']


In [8]:
calllog_data['calllog'].sample(5)

991      {, 528016203428, 527950448115, 5521620992, 554...
23998    {4432457732, 4435419142, 4434375688, 443174401...
1693     {9243951105, 5509154819, 8145302535, 555774976...
37051    {2201065985, 5535492867, 7971351819, 220171367...
26514    {4344978434, 4438672134, 4341006599, 434128615...
Name: calllog, dtype: object

## check_case_call

In [4]:
case_check_call = pd.read_parquet('calllog_overdue_rate_2024-03.parquet')

In [6]:
case_check_call.head()

Unnamed: 0,phone,bad_count,total_count,rate,woe
0,4429800292.0,2809.0,32426.0,0.0866,0.712457
1,86.0,3678.0,30154.0,0.122,1.003692
2,,2420.0,17894.0,0.1352,1.112288
3,3341690119.0,1960.0,16783.0,0.1168,0.960911
4,5512046092.0,1274.0,15470.0,0.0824,0.677903


In [7]:
case_check_call.describe()

Unnamed: 0,bad_count,total_count,rate,woe
count,1652206.0,1652206.0,1652206.0,1652206.0
mean,1.268689,10.94403,0.2634886,2.167716
std,7.910757,56.86508,0.4251789,3.497939
min,0.0,1.0,0.0,0.0
25%,0.0,1.0,0.0,0.0
50%,0.0,4.0,0.0,0.0
75%,1.0,10.0,0.5,4.113491
max,3678.0,32426.0,1.0,8.226982


In [8]:
case_check_call['total_count'].quantile(0.99)

100.0

In [9]:
case_check_call[case_check_call['total_count'] > case_check_call['total_count'].quantile(0.99)]['woe'].describe()

count    16481.000000
mean         0.550947
std          1.306492
min          0.000000
25%          0.000000
50%          0.000000
75%          0.587406
max          8.226982
Name: woe, dtype: float64

In [10]:
pd.cut(case_check_call[case_check_call['total_count'] > 100]['woe'], bins=20).value_counts().sort_index()

woe
(-0.00823, 0.411]    11560
(0.411, 0.823]        1837
(0.823, 1.234]        1305
(1.234, 1.645]         605
(1.645, 2.057]         224
(2.057, 2.468]         114
(2.468, 2.879]          67
(2.879, 3.291]          33
(3.291, 3.702]          42
(3.702, 4.113]          30
(4.113, 4.525]          21
(4.525, 4.936]          10
(4.936, 5.348]          11
(5.348, 5.759]           8
(5.759, 6.17]          108
(6.17, 6.582]          426
(6.582, 6.993]          11
(6.993, 7.404]           7
(7.404, 7.816]           3
(7.816, 8.227]          59
Name: count, dtype: int64

In [None]:
calllog_data['call_records_url_data'][0]

In [10]:
overdue_rate_df.shape

(753, 4)

In [11]:
overdue_rate_df.head(50)

Unnamed: 0,bad_count,total_count,rate,woe
,140570012.0,358673722.0,0.3919,0.935081
l,43818216.0,109943373.0,0.3986,0.951067
a,30738172.0,76884488.0,0.3998,0.953931
e,23558191.0,61365372.0,0.3839,0.915993
c,22720694.0,56901435.0,0.3993,0.952738
1,14817549.0,39629738.0,0.3739,0.892133
t,15665635.0,38636940.0,0.4055,0.967531
o,13254040.0,30520903.0,0.4343,1.036248
2,11100916.0,30186558.0,0.3677,0.877339
5,10392366.0,26687675.0,0.3894,0.929116


In [8]:
for phones, label in zip(calllog_data['call_records_url_data'], calllog_data['def_pd7']):
    if phones is None:
        continue
    for phone in phones:
        phone = normalize_phone(phone)[-1]
        if phone not in calllog_overdue_rate:
            calllog_overdue_rate[phone] = [0, 0, 0]
        if label == 1:
            calllog_overdue_rate[phone][0] += 1
        calllog_overdue_rate[phone][1] += 1
        calllog_overdue_rate[phone][2] = round(calllog_overdue_rate[phone][0] / calllog_overdue_rate[phone][1], 4)
overdue_rate_df = pd.DataFrame(calllog_overdue_rate).T
overdue_rate_df.rename(columns={0: 'bad_count', 1: 'total_count', 2: 'rate'}, inplace=True)
overdue_rate_df.sort_values('total_count', ascending=False, inplace=True)
overdue_rate_df['woe'] = overdue_rate_df['rate'].apply(lambda x: x / month_overdue_rate)
overdue_rate_df.to_parquet(f'callog_overdue_rate_{month}.parquet',compression='zstd')
print(f"Finish processing {month}")

NameError: name 'calllog_data' is not defined

# 滑动窗口特征

In [22]:
## todo 定期把配置文件算好，ssh发送到固定路径

In [None]:
# 平滑后的woe 分n箱后+套路 5=< n <= 10

In [None]:
# 通话时长+频次

In [None]:
# 南美其他国家只看Y

In [None]:
# 按模块做

In [96]:
def generate_month_badcount(file_path=None, month=None):
    """
    生成某个月app逾期率文件
    :param file_path: 生成配置文件的文件路径
    :param month: 月份,'YYYY-MM'
    :param applist_data: 如果applist_data不为空，则直接使用applist_data
    :return: 
    """
    file_names = os.listdir(file_path)
    month_files = defaultdict(list)
    for file_name in file_names:
        file_name = file_path / file_name   
        match = re.search(r'(\d{4}-\d{2})',str(file_name))
        if match and match.group() == month: 
            month_files[month].append(str(file_name))
    files = month_files[month]
    print(files)
    data = batch_load_files_to_df(files)
    data = data[['def_pd7','agr_pd7']]
    data['bad'] = np.where(data['agr_pd7'] == 1, data['def_pd7'], None)
    data = data[data['bad'].notnull()]
    total_count = data['def_pd7'].count()
    bad_count = data['def_pd7'].sum()
    return month, total_count, bad_count

In [97]:
tmp_dir = Path('/home/mayongzhi/marshall/sender_word_call_app/tmp_config')
month_badcount = pd.DataFrame()
months = ['2023-06','2023-07','2023-08','2023-09','2023-10','2023-11','2023-12','2024-01','2024-02','2024-03','2024-04','2024-05','2024-06','2024-07','2024-08']
task_list = [(raw_dir,month) for month in months]
for month in months:
    month, total_count, bad_count = generate_month_badcount(raw_dir, month)
    month_badcount = pd.concat([month_badcount, pd.DataFrame({'month': [month], 'total_count': [total_count], 'bad_count': [bad_count]})], axis=0)


['/home/longxiaolei/raw_mx/newcust/raw_M_2023-06-01.pqt', '/home/longxiaolei/raw_mx/newcust/raw_M_2023-06-02.pqt', '/home/longxiaolei/raw_mx/newcust/raw_M_2023-06-03.pqt', '/home/longxiaolei/raw_mx/newcust/raw_M_2023-06-04.pqt', '/home/longxiaolei/raw_mx/newcust/raw_M_2023-06-05.pqt', '/home/longxiaolei/raw_mx/newcust/raw_M_2023-06-06.pqt', '/home/longxiaolei/raw_mx/newcust/raw_M_2023-06-07.pqt', '/home/longxiaolei/raw_mx/newcust/raw_M_2023-06-08.pqt', '/home/longxiaolei/raw_mx/newcust/raw_M_2023-06-09.pqt', '/home/longxiaolei/raw_mx/newcust/raw_M_2023-06-10.pqt', '/home/longxiaolei/raw_mx/newcust/raw_M_2023-06-11.pqt', '/home/longxiaolei/raw_mx/newcust/raw_M_2023-06-12.pqt', '/home/longxiaolei/raw_mx/newcust/raw_M_2023-06-13.pqt', '/home/longxiaolei/raw_mx/newcust/raw_M_2023-06-14.pqt', '/home/longxiaolei/raw_mx/newcust/raw_M_2023-06-15.pqt', '/home/longxiaolei/raw_mx/newcust/raw_M_2023-06-16.pqt', '/home/longxiaolei/raw_mx/newcust/raw_M_2023-06-17.pqt', '/home/longxiaolei/raw_mx/newc

In [99]:
month_badcount.to_parquet(tmp_dir / 'month_badcount.parquet',compression='zstd')

In [139]:



def generate_app_config_data(
        calc_month:str,
        rlevel_bins:int, 
        window_size:int
):
    """
    生成配置数据，前三个月的数据
    :param calc_month:  计算月份
    :param rlevel_bins:  分箱数
    :param window_size: 滑动窗口大小
    :return:
    """
    now_month = datetime.strptime(calc_month, '%Y-%m')
    config_all = pd.DataFrame()
    # 遍历当前文件夹下的所有文件
    for file in os.listdir(tmp_dir):
        if file.startswith('app_overdue_rate'):
            config_month = datetime.strptime(file.split('_')[-1].split('.')[0], '%Y-%m')
            month_diff = (now_month.year - config_month.year) * 12 + now_month.month - config_month.month
            if 1 < month_diff <= window_size+1:
                config = pd.read_parquet(file)
                mask = (config['total_count'] > 50)     # 滑动窗口中的每个月的app的总数大于50
                config = config[mask]
                config_all = pd.concat([config_all, config], axis=0)
        else:
            continue
    if config_all.empty:
        print(f'config_all is empty, month: {calc_month}')
        return 
        # config_all = config_all[config_all.groupby('app')['total_count'].transform('count') == num_of_configs]  # 选取在每个月都出现的app
    config_all = config_all[config_all.groupby('app')['bad_rate'].transform('std') < config_all.groupby('app')['bad_rate'].transform('std').quantile(0.85)] # 去除overdue_rate的标准差大于0.9分位数的app
    if config_all.empty:
        return
    config_all = config_all.groupby('app').agg({'bad_count': 'sum', 'total_count': 'sum'}).reset_index()
    config_all['bad_rate'] = config_all['bad_count'] / config_all['total_count']
    config_all = config_all.copy()
    now_month = month_badcount[month_badcount['month'] == calc_month]
    bad_rate_total = now_month['bad_count'] / now_month['total_count']
    bad_rate_total = bad_rate_total.values[0]
  
    config_all['woe'] = np.where(
        (config_all['bad_rate'] == 0) | (config_all['bad_rate'] == 1),
        0,
        np.log( (config_all['bad_rate'] / bad_rate_total / 
                ((1 - config_all['bad_rate']) / (1 - bad_rate_total)))
                )
    )
    config_all['woe_bin_freq'] = pd.qcut(config_all['woe'], rlevel_bins, duplicates='drop')
    config_all['level_freq'] = config_all['woe_bin_freq'].cat.codes+1
    config_all['woe_bin_dist'] = pd.cut(config_all['woe'], rlevel_bins, duplicates='drop')
    config_all['level_dist'] = config_all['woe_bin_dist'].cat.codes+1
    config_all['woe_bin_freq'] = config_all['woe_bin_freq'].astype(str)
    config_all['woe_bin_dist'] = config_all['woe_bin_dist'].astype(str)
    print('freq', config_all['level_freq'].value_counts())
    print('dist', config_all['level_dist'].value_counts())
    return config_all


def generate_sender_config_data(
        calc_month:str,
        rlevel_bins:int, 
        window_size:int
):
    """
    生成配置数据，前三个月的数据
    :param calc_month:  计算月份
    :param rlevel_bins:  分箱数
    :param window_size: 滑动窗口大小
    :return:
    """
    now_month = datetime.strptime(calc_month, '%Y-%m')
    config_all = pd.DataFrame()
    for file in os.listdir(tmp_dir):
        if file.startswith('sender_overdue_rate'):
            config_month = datetime.strptime(file.split('_')[-1].split('.')[0], '%Y-%m')
            month_diff = (now_month.year - config_month.year) * 12 + now_month.month - config_month.month
            if 1 < month_diff <= window_size+1:
                config = pd.read_parquet(file)
                mask = (config['total_count'] > 50) 
                config = config[mask]
                config_all = pd.concat([config_all, config], axis=0)
        else:
            continue
    if config_all.empty:
        print(f'config_all is empty, month: {calc_month}')
        return
                
                
    config_all = config_all[config_all.groupby('sender')['bad_rate'].transform('std') < config_all.groupby('sender')['bad_rate'].transform('std').quantile(0.85)]
    if config_all.empty:
        return 
    config_all = config_all.groupby('sender').agg({'bad_count': 'sum', 'total_count': 'sum'}).reset_index()
    config_all['bad_rate'] = config_all['bad_count'] / config_all['total_count']
    config_all = config_all.copy()
    now_month = month_badcount[month_badcount['month'] == calc_month]
    bad_rate_total = now_month['bad_count'] / now_month['total_count']
    bad_rate_total = bad_rate_total.values[0]
    
    config_all['woe'] = np.where(
        (config_all['bad_rate'] == 0) | (config_all['bad_rate'] == 1),
        0,
        np.log( (config_all['bad_rate'] / bad_rate_total) / 
                ((1 - config_all['bad_rate']) / (1 - bad_rate_total)))
    )
    config_all['woe_bin_dist'] = pd.cut(config_all['woe'], rlevel_bins, duplicates='drop')
    # 选出占比超过40%的woe_bin_dist，再将其等频分10箱，每箱随机选百分之10的数据
    woe_bin_dist = config_all['woe_bin_dist'].value_counts(normalize=True)
    woe_bin_dist = woe_bin_dist[woe_bin_dist > 0.4]
    woe_bin_dist_index = woe_bin_dist.index
    woe_bin_dist = config_all[config_all['woe_bin_dist'].isin(woe_bin_dist_index)]
    woe_bin_dist['woe_bin_dist'] = pd.qcut(woe_bin_dist['woe'], rlevel_bins, duplicates='drop')
    # 每箱随机选百分之10的数据
    woe_bin_dist = config_all[config_all['woe_bin_dist'].isin(woe_bin_dist_index)].groupby('woe_bin_dist').apply(lambda x: x.sample(frac=0.1))
    retain_index = woe_bin_dist.index.get_level_values(1)
    # 将config_all的占比超过40%的数据用woe_bin_dist替换
    print(woe_bin_dist.head())
    print(config_all.head())
    config_all = pd.concat([config_all[~config_all['woe_bin_dist'].isin(woe_bin_dist_index)], config_all[config_all.index.isin(retain_index)]],  axis=0)
    
    config_all['level_dist'] = config_all['woe_bin_dist'].cat.codes+1
    
    config_all['woe_bin_freq'] = pd.qcut(config_all['woe'], rlevel_bins, duplicates='drop')
    config_all['level_freq'] = config_all['woe_bin_freq'].cat.codes+1
    
    config_all['woe_bin_freq'] = config_all['woe_bin_freq'].astype(str)
    config_all['woe_bin_dist'] = config_all['woe_bin_dist'].astype(str)
    print(config_all.dtypes)
    config_all = config_all[['sender','bad_count','total_count','bad_rate','woe','level_freq','level_dist']]
    print('freq', config_all['level_freq'].value_counts())
    print('dist', config_all['level_dist'].value_counts())
    return config_all

def generate_word_config_data(
        calc_month:str,
        rlevel_bins:int, 
        window_size:int
):
    """
    生成配置数据，前三个月的数据
    :param calc_month:  计算月份
    :param rlevel_bins:  分箱数
    :param window_size: 滑动窗口大小
    :return:
    """

    now_month = datetime.strptime(calc_month, '%Y-%m')
    config_all = pd.DataFrame()
    for file in os.listdir(tmp_dir):
        if file.startswith('word_overdue_rate'):
            config_month = datetime.strptime(file.split('_')[-1].split('.')[0], '%Y-%m')
            month_diff = (now_month.year - config_month.year) * 12 + now_month.month - config_month.month
            if 1 < month_diff <= window_size+1:
                config = pd.read_parquet(file)
                mask = (config['total_count'] > 100) 
                config = config[mask]
                config_all = pd.concat([config_all, config], axis=0)
        else:
            continue
    if config_all.empty:
        print(f'config_all is empty, month: {calc_month}')
        return
    
    config_all = config_all[config_all.groupby('word')['bad_rate'].transform('std') < config_all.groupby('word')['bad_rate'].transform('std').quantile(0.85)]
    if config_all.empty:
        return 
    config_all = config_all.groupby('word').agg({'bad_count': 'sum', 'total_count': 'sum'}).reset_index()
    config_all['bad_rate'] = config_all['bad_count'] / config_all['total_count']
    config_all = config_all.copy()
    
    now_month = month_badcount[month_badcount['month'] == calc_month]
    bad_rate_total = now_month['bad_count'] / now_month['total_count']
    bad_rate_total = bad_rate_total.values[0]
    
    config_all['woe'] = np.where(
        (config_all['bad_rate'] == 0) | (config_all['bad_rate'] == 1),
        0,
        np.log( (config_all['bad_rate'] / bad_rate_total) / 
                ((1 - config_all['bad_rate']) / (1 - bad_rate_total)))
    )
    
    config_all['woe_bin_freq'] = pd.qcut(config_all['woe'], rlevel_bins, duplicates='drop')
    config_all['level_freq'] = config_all['woe_bin_freq'].cat.codes+1
    config_all['woe_bin_dist'] = pd.cut(config_all['woe'], rlevel_bins, duplicates='drop')
    config_all['level_dist'] = config_all['woe_bin_dist'].cat.codes+1
    config_all['woe_bin_freq'] = config_all['woe_bin_freq'].astype(str)
    config_all['woe_bin_dist'] = config_all['woe_bin_dist'].astype(str)
    print('freq', config_all['level_freq'].value_counts())
    print('dist', config_all['level_dist'].value_counts())
    return config_all
    

In [79]:
tmp_dir = Path('/home/mayongzhi/marshall/sender_word_call_app/tmp_config')

In [116]:
word_config_dir = work_dir / 'word_config_window'
if not word_config_dir.exists():
    word_config_dir.mkdir(exist_ok=True)
app_config_dir = work_dir / 'app_config_window'
if not app_config_dir.exists():
    app_config_dir.mkdir(exist_ok=True)
sender_config_dir = work_dir / 'sender_config_window'
if not sender_config_dir.exists():
    sender_config_dir.mkdir(exist_ok=True)

In [140]:
for month in ['2023-06','2023-07','2023-08','2023-09','2023-10','2023-11','2023-12','2024-01','2024-02','2024-03','2024-04','2024-05','2024-06','2024-07','2024-08']:
    print('=' * 20, month, '=' * 20)
    print('sender')
    overdue_rate_df = generate_sender_config_data(calc_month=month,rlevel_bins=10,window_size=3)
    if overdue_rate_df is None:
        continue
    print(overdue_rate_df['level_dist'].value_counts())
    
    overdue_rate_df.to_parquet(sender_config_dir / f'sender_overdue_rate_window3_{month}.parquet',compression='zstd')

sender
config_all is empty, month: 2023-06
sender
config_all is empty, month: 2023-07
sender
sender


  result = getattr(ufunc, method)(*inputs, **kwargs)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  woe_bin_dist['woe_bin_dist'] = pd.qcut(woe_bin_dist['woe'], rlevel_bins, duplicates='drop')
  woe_bin_dist = config_all[config_all['woe_bin_dist'].isin(woe_bin_dist_index)].groupby('woe_bin_dist').apply(lambda x: x.sample(frac=0.1))
  woe_bin_dist = config_all[config_all['woe_bin_dist'].isin(woe_bin_dist_index)].groupby('woe_bin_dist').apply(lambda x: x.sample(frac=0.1))


                          sender  bad_count  total_count  bad_rate       woe  \
woe_bin_dist                                                                   
(-0.452, 0.624] 7649  5593383815         78          157  0.496815  0.500339   
                30    2201711391         54          123  0.439024  0.267956   
                8108  5594180566         85          169  0.502959  0.524913   
                7944  5594180332         76          176  0.431818  0.238641   
                6575  5592738824         72          161  0.447205  0.301108   

                         woe_bin_dist  
woe_bin_dist                           
(-0.452, 0.624] 7649  (-0.452, 0.624]  
                30    (-0.452, 0.624]  
                8108  (-0.452, 0.624]  
                7944  (-0.452, 0.624]  
                6575  (-0.452, 0.624]  
  sender  bad_count  total_count  bad_rate       woe     woe_bin_dist
0  10080       2056         3638  0.565146  0.775151     (0.624, 1.7]
1  10086      39000

  result = getattr(ufunc, method)(*inputs, **kwargs)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  woe_bin_dist['woe_bin_dist'] = pd.qcut(woe_bin_dist['woe'], rlevel_bins, duplicates='drop')
  woe_bin_dist = config_all[config_all['woe_bin_dist'].isin(woe_bin_dist_index)].groupby('woe_bin_dist').apply(lambda x: x.sample(frac=0.1))
  woe_bin_dist = config_all[config_all['woe_bin_dist'].isin(woe_bin_dist_index)].groupby('woe_bin_dist').apply(lambda x: x.sample(frac=0.1))


                          sender  bad_count  total_count  bad_rate       woe  \
woe_bin_dist                                                                   
(-0.61, 0.489] 5938   4141880493         92          224  0.410714  0.107300   
               8477   4427322631         71          150  0.473333  0.361545   
               20127  8141738606         92          207  0.444444  0.245169   
               15626  5594193280         92          221  0.416290  0.130289   
               1722   3341604489        140          326  0.429448  0.184209   

                        woe_bin_dist  
woe_bin_dist                          
(-0.61, 0.489] 5938   (-0.61, 0.489]  
               8477   (-0.61, 0.489]  
               20127  (-0.61, 0.489]  
               15626  (-0.61, 0.489]  
               1722   (-0.61, 0.489]  
  sender  bad_count  total_count  bad_rate       woe    woe_bin_dist
0  10080       2762         5371  0.514243  0.525301  (0.489, 1.588]
1  10086      52555       10

  result = getattr(ufunc, method)(*inputs, **kwargs)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  woe_bin_dist['woe_bin_dist'] = pd.qcut(woe_bin_dist['woe'], rlevel_bins, duplicates='drop')
  woe_bin_dist = config_all[config_all['woe_bin_dist'].isin(woe_bin_dist_index)].groupby('woe_bin_dist').apply(lambda x: x.sample(frac=0.1))
  woe_bin_dist = config_all[config_all['woe_bin_dist'].isin(woe_bin_dist_index)].groupby('woe_bin_dist').apply(lambda x: x.sample(frac=0.1))


                           sender  bad_count  total_count  bad_rate       woe  \
woe_bin_dist                                                                    
(-0.325, 0.747] 2393   3341593131         97          200  0.485000  0.328456   
                19529  8141630674         72          184  0.391304 -0.053359   
                18357  8141599215        198          496  0.399194 -0.020353   
                12825  5593023849         41          111  0.369369 -0.146450   
                8850   4429254452         85          178  0.477528  0.298525   

                          woe_bin_dist  
woe_bin_dist                            
(-0.325, 0.747] 2393   (-0.325, 0.747]  
                19529  (-0.325, 0.747]  
                18357  (-0.325, 0.747]  
                12825  (-0.325, 0.747]  
                8850   (-0.325, 0.747]  
  sender  bad_count  total_count  bad_rate       woe     woe_bin_dist
0  10080       2639         5410  0.487800  0.339665  (-0.325, 0.747]
1  10

  result = getattr(ufunc, method)(*inputs, **kwargs)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  woe_bin_dist['woe_bin_dist'] = pd.qcut(woe_bin_dist['woe'], rlevel_bins, duplicates='drop')
  woe_bin_dist = config_all[config_all['woe_bin_dist'].isin(woe_bin_dist_index)].groupby('woe_bin_dist').apply(lambda x: x.sample(frac=0.1))
  woe_bin_dist = config_all[config_all['woe_bin_dist'].isin(woe_bin_dist_index)].groupby('woe_bin_dist').apply(lambda x: x.sample(frac=0.1))


                           sender  bad_count  total_count  bad_rate       woe  \
woe_bin_dist                                                                    
(-0.651, 0.492] 17891  5595062552        103          219  0.470320  0.170793   
                1908   3341591324         85          231  0.367965 -0.251301   
                9303   4951460087         49          139  0.352518 -0.318335   
                16403  5594183665        101          278  0.363309 -0.271375   
                3904   3341602109         48          152  0.315789 -0.483535   

                          woe_bin_dist  
woe_bin_dist                            
(-0.651, 0.492] 17891  (-0.651, 0.492]  
                1908   (-0.651, 0.492]  
                9303   (-0.651, 0.492]  
                16403  (-0.651, 0.492]  
                3904   (-0.651, 0.492]  
  sender  bad_count  total_count  bad_rate       woe     woe_bin_dist
0  10080       2058         4865  0.423022 -0.020727  (-0.651, 0.492]
1  10

  result = getattr(ufunc, method)(*inputs, **kwargs)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  woe_bin_dist['woe_bin_dist'] = pd.qcut(woe_bin_dist['woe'], rlevel_bins, duplicates='drop')
  woe_bin_dist = config_all[config_all['woe_bin_dist'].isin(woe_bin_dist_index)].groupby('woe_bin_dist').apply(lambda x: x.sample(frac=0.1))
  woe_bin_dist = config_all[config_all['woe_bin_dist'].isin(woe_bin_dist_index)].groupby('woe_bin_dist').apply(lambda x: x.sample(frac=0.1))


                             sender  bad_count  total_count  bad_rate  \
woe_bin_dist                                                            
(-1.078, -0.0178] 14678  5592738799         43          155  0.277419   
                  13867  5592283932        104          289  0.359862   
                  27776  8141596845         61          172  0.354651   
                  34730  8673001933         80          224  0.357143   
                  5699   3341713598         47          129  0.364341   

                              woe       woe_bin_dist  
woe_bin_dist                                          
(-1.078, -0.0178] 14678 -0.525670  (-1.078, -0.0178]  
                  13867 -0.144336  (-1.078, -0.0178]  
                  27776 -0.167028  (-1.078, -0.0178]  
                  34730 -0.156158  (-1.078, -0.0178]  
                  5699  -0.124943  (-1.078, -0.0178]  
  sender  bad_count  total_count  bad_rate       woe       woe_bin_dist
0  10080       1868         457

  result = getattr(ufunc, method)(*inputs, **kwargs)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  woe_bin_dist['woe_bin_dist'] = pd.qcut(woe_bin_dist['woe'], rlevel_bins, duplicates='drop')
  woe_bin_dist = config_all[config_all['woe_bin_dist'].isin(woe_bin_dist_index)].groupby('woe_bin_dist').apply(lambda x: x.sample(frac=0.1))
  woe_bin_dist = config_all[config_all['woe_bin_dist'].isin(woe_bin_dist_index)].groupby('woe_bin_dist').apply(lambda x: x.sample(frac=0.1))


                           sender  bad_count  total_count  bad_rate       woe  \
woe_bin_dist                                                                    
(-0.206, 0.685] 23044  8121865534         66          152  0.434211  0.171512   
                23745  8141595513         89          246  0.361789 -0.131405   
                22998  7821966399        414          414  1.000000  0.000000   
                30875  8159805393         81          220  0.368182 -0.103820   
                11875  5592739613         73          192  0.380208 -0.052459   

                          woe_bin_dist  
woe_bin_dist                            
(-0.206, 0.685] 23044  (-0.206, 0.685]  
                23745  (-0.206, 0.685]  
                22998  (-0.206, 0.685]  
                30875  (-0.206, 0.685]  
                11875  (-0.206, 0.685]  
  sender  bad_count  total_count  bad_rate       woe     woe_bin_dist
0  10080       2235         5261  0.424824  0.133204  (-0.206, 0.685]
1  10

  result = getattr(ufunc, method)(*inputs, **kwargs)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  woe_bin_dist['woe_bin_dist'] = pd.qcut(woe_bin_dist['woe'], rlevel_bins, duplicates='drop')
  woe_bin_dist = config_all[config_all['woe_bin_dist'].isin(woe_bin_dist_index)].groupby('woe_bin_dist').apply(lambda x: x.sample(frac=0.1))
  woe_bin_dist = config_all[config_all['woe_bin_dist'].isin(woe_bin_dist_index)].groupby('woe_bin_dist').apply(lambda x: x.sample(frac=0.1))


                          sender  bad_count  total_count  bad_rate       woe  \
woe_bin_dist                                                                   
(-0.61, 0.607] 26615  8141657401         65          174  0.373563  0.047525   
               19680  5595491373         46          158  0.291139 -0.325372   
               20967  5595493064         61          208  0.293269 -0.315073   
               14317  5594522981        213          710  0.300000 -0.282812   
               13407  5594522060        259          751  0.344874 -0.077165   

                        woe_bin_dist  
woe_bin_dist                          
(-0.61, 0.607] 26615  (-0.61, 0.607]  
               19680  (-0.61, 0.607]  
               20967  (-0.61, 0.607]  
               14317  (-0.61, 0.607]  
               13407  (-0.61, 0.607]  
  sender  bad_count  total_count  bad_rate       woe    woe_bin_dist
0  10080       2254         5349  0.421387  0.247404  (-0.61, 0.607]
1  10086      42103       10

  result = getattr(ufunc, method)(*inputs, **kwargs)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  woe_bin_dist['woe_bin_dist'] = pd.qcut(woe_bin_dist['woe'], rlevel_bins, duplicates='drop')
  woe_bin_dist = config_all[config_all['woe_bin_dist'].isin(woe_bin_dist_index)].groupby('woe_bin_dist').apply(lambda x: x.sample(frac=0.1))
  woe_bin_dist = config_all[config_all['woe_bin_dist'].isin(woe_bin_dist_index)].groupby('woe_bin_dist').apply(lambda x: x.sample(frac=0.1))


                           sender  bad_count  total_count  bad_rate       woe  \
woe_bin_dist                                                                    
(-0.772, 0.446] 26061  8159802472         43          151  0.284768 -0.517736   
                7755   4779650690         84          271  0.309963 -0.397097   
                24324  8141655919         67          232  0.288793 -0.498058   
                11573  5594526023         96          299  0.321070 -0.345663   
                10710  5594522561        106          391  0.271100 -0.585855   

                          woe_bin_dist  
woe_bin_dist                            
(-0.772, 0.446] 26061  (-0.772, 0.446]  
                7755   (-0.772, 0.446]  
                24324  (-0.772, 0.446]  
                11573  (-0.772, 0.446]  
                10710  (-0.772, 0.446]  
  sender  bad_count  total_count  bad_rate       woe     woe_bin_dist
0            611727      1354605  0.451591  0.208949  (-0.772, 0.446]
1  10

  result = getattr(ufunc, method)(*inputs, **kwargs)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  woe_bin_dist['woe_bin_dist'] = pd.qcut(woe_bin_dist['woe'], rlevel_bins, duplicates='drop')
  woe_bin_dist = config_all[config_all['woe_bin_dist'].isin(woe_bin_dist_index)].groupby('woe_bin_dist').apply(lambda x: x.sample(frac=0.1))
  woe_bin_dist = config_all[config_all['woe_bin_dist'].isin(woe_bin_dist_index)].groupby('woe_bin_dist').apply(lambda x: x.sample(frac=0.1))


                            sender  bad_count  total_count  bad_rate  \
woe_bin_dist                                                           
(-1.023, -0.153] 13386  5595494070         83          274  0.302920   
                 19253  8141638144         68          218  0.311927   
                 21934  8141656050         68          242  0.280992   
                 5446   5541728857         37          108  0.342593   
                 5922   5592627269         41          127  0.322835   

                             woe      woe_bin_dist  
woe_bin_dist                                        
(-1.023, -0.153] 13386 -0.606958  (-1.023, -0.153]  
                 19253 -0.564653  (-1.023, -0.153]  
                 21934 -0.713073  (-1.023, -0.153]  
                 5446  -0.425287  (-1.023, -0.153]  
                 5922  -0.514301  (-1.023, -0.153]  
  sender  bad_count  total_count  bad_rate       woe      woe_bin_dist
0            838714      1816778  0.461649  0.072769 

  result = getattr(ufunc, method)(*inputs, **kwargs)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  woe_bin_dist['woe_bin_dist'] = pd.qcut(woe_bin_dist['woe'], rlevel_bins, duplicates='drop')
  woe_bin_dist = config_all[config_all['woe_bin_dist'].isin(woe_bin_dist_index)].groupby('woe_bin_dist').apply(lambda x: x.sample(frac=0.1))
  woe_bin_dist = config_all[config_all['woe_bin_dist'].isin(woe_bin_dist_index)].groupby('woe_bin_dist').apply(lambda x: x.sample(frac=0.1))


                            sender  bad_count  total_count  bad_rate  \
woe_bin_dist                                                           
(-1.211, -0.318] 1124   3341596015         54          174  0.310345   
                 6728   5594185667         44          158  0.278481   
                 740    3341595508         37          168  0.220238   
                 2075   3341597193         51          179  0.284916   
                 20593  8141655902         44          168  0.261905   

                             woe      woe_bin_dist  
woe_bin_dist                                        
(-1.211, -0.318] 1124  -0.556831  (-1.211, -0.318]  
                 6728  -0.710332  (-1.211, -0.318]  
                 740   -1.022602  (-1.211, -0.318]  
                 2075  -0.678528  (-1.211, -0.318]  
                 20593 -0.794415  (-1.211, -0.318]  
  sender  bad_count  total_count  bad_rate       woe      woe_bin_dist
0      1         56          170  0.329412 -0.469170 

  result = getattr(ufunc, method)(*inputs, **kwargs)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  woe_bin_dist['woe_bin_dist'] = pd.qcut(woe_bin_dist['woe'], rlevel_bins, duplicates='drop')
  woe_bin_dist = config_all[config_all['woe_bin_dist'].isin(woe_bin_dist_index)].groupby('woe_bin_dist').apply(lambda x: x.sample(frac=0.1))
  woe_bin_dist = config_all[config_all['woe_bin_dist'].isin(woe_bin_dist_index)].groupby('woe_bin_dist').apply(lambda x: x.sample(frac=0.1))


                            sender  bad_count  total_count  bad_rate  \
woe_bin_dist                                                           
(-1.447, -0.484] 15042  8141636296         52          207  0.251208   
                 1059   3341595777         52          175  0.297143   
                 10626  5595589148         46          150  0.306667   
                 6634   5594186671         47          161  0.291925   
                 2139   3341597081         94          299  0.314381   

                             woe      woe_bin_dist  
woe_bin_dist                                        
(-1.447, -0.484] 15042 -0.856006  (-1.447, -0.484]  
                 1059  -0.624765  (-1.447, -0.484]  
                 10626 -0.579574  (-1.447, -0.484]  
                 6634  -0.649875  (-1.447, -0.484]  
                 2139  -0.543539  (-1.447, -0.484]  
  sender  bad_count  total_count  bad_rate       woe    woe_bin_dist
0      1        145          362  0.400552 -0.166988  (

  result = getattr(ufunc, method)(*inputs, **kwargs)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  woe_bin_dist['woe_bin_dist'] = pd.qcut(woe_bin_dist['woe'], rlevel_bins, duplicates='drop')
  woe_bin_dist = config_all[config_all['woe_bin_dist'].isin(woe_bin_dist_index)].groupby('woe_bin_dist').apply(lambda x: x.sample(frac=0.1))
  woe_bin_dist = config_all[config_all['woe_bin_dist'].isin(woe_bin_dist_index)].groupby('woe_bin_dist').apply(lambda x: x.sample(frac=0.1))


In [117]:
for month in ['2023-06','2023-07','2023-08','2023-09','2023-10','2023-11','2023-12','2024-01','2024-02','2024-03','2024-04','2024-05','2024-06','2024-07','2024-08']:
    print('=' * 20, month, '=' * 20)
    print('word')
    overdue_rate_config = generate_word_config_data(calc_month=month,rlevel_bins=10,window_size=3)
    if overdue_rate_config is None:
        continue
    overdue_rate_config.to_parquet(word_config_dir / f'word_overdue_rate_window3_{month}.parquet',compression='zstd')
    
    print('app') 
    overdue_rate_df = generate_app_config_data(calc_month=month,rlevel_bins=10,window_size=3)
    if overdue_rate_df is None:
        continue
    overdue_rate_df.to_parquet(app_config_dir / f'app_overdue_rate_window3_{month}.parquet',compression='zstd')
    
    print('sender')
    overdue_rate_df = generate_sender_config_data(calc_month=month,rlevel_bins=10,window_size=3)
    if overdue_rate_df is None:
        continue
    overdue_rate_df.to_parquet(sender_config_dir / f'sender_overdue_rate_window3_{month}.parquet',compression='zstd')

word
config_all is empty, month: 2023-06
word
config_all is empty, month: 2023-07
word
word
freq level_freq
8     1691
10    1690
1     1690
3     1690
7     1689
2     1689
6     1689
5     1689
4     1688
9     1687
Name: count, dtype: int64
dist level_dist
4     10087
5      3655
3      2106
6       620
2       291
7        73
1        36
8        15
9         6
10        3
Name: count, dtype: int64
app
freq level_freq
3     338
8     338
1     337
5     337
2     336
9     336
10    336
7     336
6     336
4     335
Name: count, dtype: int64
dist level_dist
7     1370
6     1310
8      278
5      203
9       83
4       59
10      32
3       27
2        2
1        1
Name: count, dtype: int64
sender


  result = getattr(ufunc, method)(*inputs, **kwargs)


freq level_freq
1     1240
6     1240
3     1237
9     1235
8     1233
5     1232
10    1231
4     1229
2     1227
7     1225
Name: count, dtype: int64
dist level_dist
6     11465
7       806
5        44
8         6
3         2
4         2
9         2
1         1
10        1
Name: count, dtype: int64
word
freq level_freq
1     1822
9     1822
7     1822
5     1822
3     1821
10    1821
6     1821
2     1821
4     1821
8     1820
Name: count, dtype: int64
dist level_dist
4     10561
5      5456
3       978
6       933
2       177
7        85
8        11
1         6
9         4
10        2
Name: count, dtype: int64
app
freq level_freq
1     372
10    372
5     371
9     371
2     371
3     371
6     371
4     371
8     371
7     371
Name: count, dtype: int64
dist level_dist
6     1979
7      865
5      468
8      211
4       92
9       46
3       28
10      13
2        8
1        2
Name: count, dtype: int64
sender


  result = getattr(ufunc, method)(*inputs, **kwargs)


freq level_freq
3     2182
2     2168
7     2128
5     2128
9     2126
1     2126
6     2124
10    2124
8     2120
4     2027
Name: count, dtype: int64
dist level_dist
6     20029
7      1185
5        19
8         6
3         4
10        4
4         3
1         2
9         1
Name: count, dtype: int64
word
freq level_freq
1     1897
3     1896
9     1896
6     1896
8     1896
7     1895
10    1895
4     1895
2     1895
5     1895
Name: count, dtype: int64
dist level_dist
5     8595
4     8012
6     1048
3      982
7      152
2      133
8       14
1       12
9        6
10       2
Name: count, dtype: int64
app
freq level_freq
9     381
1     379
7     379
4     379
3     378
8     378
5     378
2     378
6     378
10    376
Name: count, dtype: int64
dist level_dist
6     2057
7      924
5      395
8      212
4      105
9       36
3       31
10      18
1        3
2        3
Name: count, dtype: int64
sender


  result = getattr(ufunc, method)(*inputs, **kwargs)


freq level_freq
2     2574
1     2566
6     2566
9     2563
5     2559
8     2559
10    2556
3     2551
4     2550
7     2549
Name: count, dtype: int64
dist level_dist
6     24161
5      1370
7        33
3         9
4         8
10        4
8         3
1         2
2         2
9         1
Name: count, dtype: int64
word
freq level_freq
8     1934
10    1932
4     1932
1     1932
2     1932
6     1932
5     1931
3     1931
7     1931
9     1929
Name: count, dtype: int64
dist level_dist
4     13296
5      3400
3      1363
6       805
2       248
7       142
8        27
1        26
10        5
9         4
Name: count, dtype: int64
app
freq level_freq
6     384
3     374
5     369
1     369
10    369
9     368
2     368
8     368
4     362
7     352
Name: count, dtype: int64
dist level_dist
5     2385
6      643
4      388
7      165
3       64
2       17
8       15
9        3
1        2
10       1
Name: count, dtype: int64
sender


  result = getattr(ufunc, method)(*inputs, **kwargs)


freq level_freq
9     2892
2     2885
1     2883
5     2881
8     2881
7     2880
4     2880
6     2880
3     2874
10    2867
Name: count, dtype: int64
dist level_dist
5     27963
4       793
3        21
6        15
2         5
1         3
9         2
10        1
Name: count, dtype: int64
word
freq level_freq
8     1993
10    1993
7     1993
6     1993
5     1993
9     1993
4     1993
1     1993
2     1993
3     1993
Name: count, dtype: int64
dist level_dist
4     9743
5     7595
6     1334
3      821
7      236
2      135
8       40
1       12
9        9
10       5
Name: count, dtype: int64
app
freq level_freq
3     394
9     383
8     374
5     374
1     374
2     373
6     373
7     373
10    364
4     353
Name: count, dtype: int64
dist level_dist
5     1677
6     1538
7      239
4      191
3       49
8       28
9       10
10       1
2        1
1        1
Name: count, dtype: int64
sender


  result = getattr(ufunc, method)(*inputs, **kwargs)


freq level_freq
6     3632
3     3623
9     3620
8     3610
2     3609
1     3609
5     3605
4     3599
10    3597
7     3585
Name: count, dtype: int64
dist level_dist
4     29738
5      6300
3        25
2        15
1         6
6         2
9         1
7         1
10        1
Name: count, dtype: int64
word
freq level_freq
1     2050
6     2049
9     2049
10    2048
5     2048
8     2048
4     2048
3     2048
7     2047
2     2047
Name: count, dtype: int64
dist level_dist
4     10505
5      8000
6       927
3       797
7       114
2       101
8        26
9         5
1         4
10        3
Name: count, dtype: int64
app
freq level_freq
1     383
8     383
6     383
10    383
3     383
2     383
5     382
7     382
4     382
9     382
Name: count, dtype: int64
dist level_dist
4     2135
5     1176
3      260
6      159
7       44
2       33
8       10
1        5
10       2
9        2
Name: count, dtype: int64
sender


  result = getattr(ufunc, method)(*inputs, **kwargs)


freq level_freq
3     3657
8     3471
1     3465
6     3464
2     3462
5     3461
7     3460
10    3460
9     3456
4     3267
Name: count, dtype: int64
dist level_dist
6     20835
5     13683
7        32
4        31
10       12
2        10
3         9
9         6
8         3
1         2
Name: count, dtype: int64
word
freq level_freq
1     2032
10    2031
6     2031
5     2030
9     2030
8     2030
4     2030
3     2030
7     2029
2     2029
Name: count, dtype: int64
dist level_dist
4     12772
3      3465
5      3190
6       460
2       276
7        80
1        29
8        16
9         9
10        5
Name: count, dtype: int64
app
freq level_freq
6     388
4     386
9     385
1     385
2     384
3     384
10    384
8     384
5     383
7     381
Name: count, dtype: int64
dist level_dist
4     2852
5      493
3      376
6       73
2       34
7        9
10       5
1        2
Name: count, dtype: int64
sender


  result = getattr(ufunc, method)(*inputs, **kwargs)


freq level_freq
1     3398
3     3397
7     3396
10    3395
6     3395
9     3394
8     3393
4     3393
5     3393
2     3392
Name: count, dtype: int64
dist level_dist
6     33468
5       213
7       171
8        29
9        21
10       18
4        15
3         8
1         2
2         1
Name: count, dtype: int64
word
freq level_freq
7     1989
5     1986
2     1986
10    1986
4     1986
9     1986
1     1986
6     1985
3     1985
8     1982
Name: count, dtype: int64
dist level_dist
5     9141
4     8960
6      862
3      700
7       96
2       65
8       12
9        9
1        7
10       5
Name: count, dtype: int64
app
freq level_freq
5     379
2     376
7     375
1     375
10    375
4     375
8     374
9     374
3     372
6     369
Name: count, dtype: int64
dist level_dist
4     1989
3     1292
5      178
2      168
1       74
6       30
10       6
7        4
9        3
Name: count, dtype: int64
sender


  result = getattr(ufunc, method)(*inputs, **kwargs)


freq level_freq
5     3038
9     3032
4     3031
3     3030
1     3030
2     3029
7     3029
10    3025
8     3023
6     3019
Name: count, dtype: int64
dist level_dist
6     29703
5       260
7       211
8        62
9        17
4        17
10        8
3         6
1         1
2         1
Name: count, dtype: int64
word
freq level_freq
6     1920
10    1920
4     1920
1     1920
9     1919
5     1919
8     1919
7     1919
3     1919
2     1919
Name: count, dtype: int64
dist level_dist
5     8798
6     7574
7     1260
4      972
8      276
3      209
9       61
2       26
10      12
1        6
Name: count, dtype: int64
app
freq level_freq
1     353
2     353
8     353
10    353
4     353
6     353
3     352
7     352
5     352
9     352
Name: count, dtype: int64
dist level_dist
5     2029
6      679
4      454
7      146
3      138
2       35
8       22
9       10
1        8
10       5
Name: count, dtype: int64
sender


  result = getattr(ufunc, method)(*inputs, **kwargs)


freq level_freq
3     2459
1     2458
7     2456
6     2456
4     2455
10    2455
8     2454
5     2453
2     2453
9     2452
Name: count, dtype: int64
dist level_dist
6     22939
7      1171
5       413
4        11
3         7
1         5
10        2
2         2
9         1
Name: count, dtype: int64
word
freq level_freq
4     1885
1     1885
9     1884
10    1884
6     1884
3     1884
7     1884
8     1884
2     1884
5     1883
Name: count, dtype: int64
dist level_dist
4     11578
5      5901
3       668
6       563
2        58
7        55
8        12
10        3
9         2
1         1
Name: count, dtype: int64
app
freq level_freq
3     360
1     344
7     344
10    344
2     344
5     344
9     344
6     343
8     343
4     327
Name: count, dtype: int64
dist level_dist
4     2024
5      895
3      276
6      120
2       62
7       29
1       20
8        6
9        3
10       2
Name: count, dtype: int64
sender


  result = getattr(ufunc, method)(*inputs, **kwargs)


freq level_freq
4     2317
8     2310
1     2301
7     2299
10    2299
3     2299
6     2297
2     2297
9     2287
5     2281
Name: count, dtype: int64
dist level_dist
5     19852
6      3024
4        85
7         8
3         6
2         4
1         4
8         2
9         1
10        1
Name: count, dtype: int64
word
freq level_freq
8     1932
1     1932
4     1931
10    1931
5     1931
2     1931
7     1931
3     1931
6     1931
9     1930
Name: count, dtype: int64
dist level_dist
4     12313
3      4561
5      1799
2       296
6       243
7        51
8        21
1        19
9         6
10        2
Name: count, dtype: int64
app
freq level_freq
1     355
10    355
5     355
3     354
9     354
4     354
2     354
8     354
7     354
6     354
Name: count, dtype: int64
dist level_dist
3     1621
4     1546
5      194
2      115
1       25
6       25
7       13
8        2
9        1
10       1
Name: count, dtype: int64
sender


  result = getattr(ufunc, method)(*inputs, **kwargs)


freq level_freq
8     1951
1     1951
3     1951
10    1950
6     1949
5     1949
7     1949
2     1948
9     1947
4     1947
Name: count, dtype: int64
dist level_dist
5     11284
4      8137
8        22
3        18
6        16
1         6
2         4
7         3
9         1
10        1
Name: count, dtype: int64
word
freq level_freq
9     1885
4     1878
7     1878
1     1878
6     1877
5     1877
3     1877
8     1877
2     1877
10    1870
Name: count, dtype: int64
dist level_dist
4     8488
5     6428
3     2636
6      738
2      229
7      151
8       58
1       29
9       15
10       2
Name: count, dtype: int64
app
freq level_freq
3     421
2     410
8     410
6     410
10    410
1     410
5     409
7     409
9     409
4     398
Name: count, dtype: int64
dist level_dist
3     2392
4     1180
2      225
5      223
1       44
6       20
7        9
10       3
Name: count, dtype: int64
sender
freq level_freq
6     2092
1     2087
8     2083
10    2083
2     2083
5     2083
4     2083
9

  result = getattr(ufunc, method)(*inputs, **kwargs)


In [12]:
month = '2024-08'
app_config_dir = work_dir / 'app_config_window'
word_config_dir = work_dir / 'word_config_window'
sender_config_dir = work_dir / 'sender_config_window'
overdue_rate_config = generate_word_config_data(calc_month=month,rlevel_bins=10,window_size=3)
overdue_rate_config.to_parquet(word_config_dir / f'word_overdue_rate_window3_{month}.parquet',compression='zstd')
overdue_rate_config = generate_app_config_data(calc_month=month,rlevel_bins=10,window_size=3)
overdue_rate_config.to_parquet(app_config_dir / f'app_overdue_rate_window3_{month}.parquet',compression='zstd')
overdue_rate_config = generate_sender_config_data(calc_month=month,rlevel_bins=10,window_size=3)
overdue_rate_config.to_parquet(sender_config_dir / f'sender_overdue_rate_window3_{month}.parquet',compression='zstd')

freq level_freq
5     2311
10    2311
9     2311
6     2311
3     2311
8     2311
7     2311
1     2311
2     2311
4     2311
Name: count, dtype: int64
dist level_dist
5     11000
4      5326
6      3588
3      2294
7       623
2       171
8        80
1        13
9        11
10        4
Name: count, dtype: int64
freq level_freq
1     360
10    360
6     359
2     359
3     359
9     359
5     359
8     359
4     359
7     359
Name: count, dtype: int64
dist level_dist
3     2024
4     1127
2      189
5      185
1       43
6       19
7        2
8        2
10       1
Name: count, dtype: int64
freq level_freq
9     1180
3     1179
4     1179
7     1178
1     1178
8     1177
2     1177
6     1177
10    1175
5     1174
Name: count, dtype: int64
dist level_dist
5     4781
4     2863
6     1991
3     1072
7      606
2      318
8      106
1       25
9       10
10       2
Name: count, dtype: int64


In [40]:
app_config_dir = work_dir / 'app_config_window'
if not app_config_dir.exists():
    app_config_dir.mkdir(exist_ok=True)
    
for month in ['2023-06','2023-07','2023-08','2023-09','2023-10','2023-11','2023-12','2024-01','2024-02','2024-03','2024-04','2024-05','2024-06','2024-07']:
    overdue_rate_config = generate_app_config_data(calc_month=month,rlevel_bins=10,window_size=3)
    overdue_rate_config.to_parquet(app_config_dir / f'app_overdue_rate_window3_{month}.parquet',compression='zstd')

freq level_freq
1     357
9     357
5     357
3     356
2     356
8     356
10    356
6     356
7     356
4     356
Name: count, dtype: int64
dist level_dist
6     1531
7     1204
8      298
5      286
9       89
4       75
3       36
10      35
2        8
1        1
Name: count, dtype: int64
freq level_freq
7     352
9     349
3     349
1     348
5     348
2     348
6     348
10    347
4     347
8     344
Name: count, dtype: int64
dist level_dist
6     1730
7      939
5      398
8      219
4       85
9       54
3       25
10      17
2       12
1        1
Name: count, dtype: int64
freq level_freq
1     367
4     367
10    367
9     367
8     367
2     367
3     367
5     367
6     367
7     367
Name: count, dtype: int64
dist level_dist
6     1853
7      684
5      573
8      317
4      141
3       46
9       38
2       12
10       4
1        2
Name: count, dtype: int64
freq level_freq
6     376
9     363
1     363
10    363
2     363
4     363
5     363
3     362
8     362
7     349
Na

In [42]:
sender_config_dir = work_dir / 'sender_config_window'
if not sender_config_dir.exists():
    sender_config_dir.mkdir(exist_ok=True)

for month in ['2023-06','2023-07','2023-08','2023-09','2023-10','2023-11','2023-12','2024-01','2024-02','2024-03','2024-04','2024-05','2024-06','2024-07']:
    overdue_rate_config = generate_sender_config_data(calc_month=month,rlevel_bins=10,window_size=3)
    overdue_rate_config.to_parquet(sender_config_dir / f'sender_overdue_rate_window3_{month}.parquet',compression='zstd')

freq level_freq
5     1289
2     1287
1     1287
9     1286
4     1286
7     1285
8     1284
6     1284
10    1283
3     1282
Name: count, dtype: int64
dist level_dist
5     4164
4     3884
6     1886
3     1719
7      503
2      437
8      166
1       63
9       28
10       3
Name: count, dtype: int64
freq level_freq
1     654
4     653
10    652
9     651
7     651
8     651
6     651
5     650
3     650
2     650
Name: count, dtype: int64
dist level_dist
5     2355
4     1801
6     1215
3      507
7      322
8      169
2       88
9       40
1       11
10       5
Name: count, dtype: int64
freq level_freq
10    431
7     431
1     431
4     431
8     430
9     430
2     430
5     430
6     430
3     430
Name: count, dtype: int64
dist level_dist
5     1351
4     1140
6      799
3      457
7      291
8      121
2       68
9       61
1        8
10       8
Name: count, dtype: int64
freq level_freq
1     288
9     287
10    287
8     287
6     287
3     287
7     287
5     287
2     287
4 

In [41]:
overdue_rate_config.head()

Unnamed: 0,app,bad_count,total_count,overdue_rate,overdue_bin_freq,level_freq,overdue_bin_dist,level_dist
0,acb2b.app.pr,58,213,0.2723,"(0.224, 0.36]",1,"(0.27, 0.316]",2
1,agency.equilibrio.sodexomexico,1042,2743,0.379876,"(0.374, 0.382]",3,"(0.361, 0.406]",4
2,ai.bohemian.multipolls,121,239,0.506276,"(0.436, 0.678]",10,"(0.497, 0.542]",7
3,ai.powerup.stori,6490,17258,0.376057,"(0.374, 0.382]",3,"(0.361, 0.406]",4
4,air.cinepolis,2479,6482,0.382444,"(0.382, 0.387]",4,"(0.361, 0.406]",4


In [46]:
# 内存占用大小 
overdue_rate_config.memory_usage().sum() / 1024 / 1024

0.6050891876220703

# 新方法

In [29]:
def data_of_dir(dir_path: str, contains_flags="", start_date='2023-01-01', end_date='2999-01-01'):
    """
    获取指定目录下的文件
    :param dir_path:  目录路径
    :param contains_flags: 包含的标志
    :param start_date:  开始日期
    :param end_date:  结束日期
    :return: 
    """
    def _fetch_filenams(dir_path: str, contain_flag, start_date=None, end_date=None):
        file_paths = []
        contain_flag = contain_flag or ""
        pattern = r"\d{4}-\d{2}-\d{2}"
        for file_name in os.listdir(dir_path):
            if (contain_flag in file_name) and (
            file_name.endswith(('.pqt', '.parquet', '.csv', '.xlsx', '.pickle', '.pkl'))):
                if start_date is None:
                    file_paths.append(os.path.join(dir_path, file_name))
                else:
                    match = re.search(pattern, file_name)
                    date = match.group()  # type: ignore
                    if (date >= start_date) and (date < end_date):
                        file_paths.append(os.path.join(dir_path, file_name))
        file_paths.sort()
        return file_paths

    if isinstance(contains_flags, str) or contains_flags is None:
        return _fetch_filenams(dir_path, contains_flags, start_date, end_date)
    elif isinstance(contains_flags, list):
        file_names = None
        for contains_flag in contains_flags:  # type: ignore
            if file_names is None:
                file_names = _fetch_filenams(dir_path, contains_flag, start_date, end_date)
            else:
                file_names = file_names + _fetch_filenams(dir_path, contains_flag, start_date, end_date)
        return file_names

In [30]:
def parse_json_data(df, json_column, id_column, retain_column=None):
    new_rows = []
    for i, row in df.iterrows():
        json_data_str = row[json_column]
        id_column_value = row[id_column]
        if isinstance(json_data_str, str) and len(json_data_str.strip())>0 :
            try:
                json_data = json.loads(json_data_str)
                for item in json_data:
                    new_row = item
                    new_row[id_column] = id_column_value
                    if isinstance(retain_column, str) and len(retain_column) > 0:
                        new_row[retain_column] = row[retain_column]
                    elif isinstance(retain_column, list) and len(retain_column) > 0:
                        for column in retain_column:
                            new_row[column] = row[column]
                    new_rows.append(new_row)
            except Exception as e:
                print(f"异常数据 {id_column_value}:{traceback.format_exc()}")
                continue
    new_df = pd.DataFrame(new_rows)
    return new_df

In [64]:
def parallel_process(task_function,task_list,process_num=10):
    """
    task_function : 任务函数
    task_list : 任务列表
    """
    start_time = time.time()
    def _task(task_function,queue,task_params):
        try:
            task_function(*task_params)
        finally:
            queue.put(True)
    from multiprocessing import Queue, Process
    mp_queue = Queue()
    for i in range(process_num):
        mp_queue.put(True)
    for task_params in tqdm(task_list):
        mp_queue.get()
        Process(target=_task,args=(task_function,mp_queue,task_params)).start() # mq_queue 用于控制进程数
    for i in range(process_num):
        mp_queue.get()
    mp_queue.close()
    gc.collect()
    print(f'任务完成，总计{len(task_list)}个任务,耗时{time.time()-start_time}s')

In [72]:
import time
from multiprocessing import Queue, Process
from tqdm import tqdm

def parallel_process(task_function, task_list, process_num=10):
    """
    task_function : 任务函数
    task_list : 任务列表
    """
    start_time = time.time()
    
    def _task(task_function, queue, task_params):
        try:
            task_function(*task_params)
        finally:
            queue.put(True)
            pid = os.getpid()
            print(f'进程{pid}完成')
            # kill pid
            os.system(f'kill -9 {pid}')
    
    mp_queue = Queue()
    
    for i in range(process_num):
        mp_queue.put(True)
    
    processes = []
    
    for task_params in tqdm(task_list):
        mp_queue.get()
        p = Process(target=_task, args=(task_function, mp_queue, task_params))
        p.start()
        processes.append(p)
    
    for p in processes:
        p.join()
    
    mp_queue.close()
    gc.collect()
    
    print(f'任务完成，总计{len(task_list)}个任务,耗时{time.time()-start_time}s')


In [57]:
import time
from multiprocessing import Pool
from tqdm import tqdm

def parallel_process(task_function, task_list, process_num=10):
    """
    task_function : 任务函数
    task_list : 任务列表
    """
    start_time = time.time()
    
    with Pool(processes=process_num) as pool:
        results = list(tqdm(pool.imap(task_function, task_list), total=len(task_list)))
    
    print(f'任务完成，总计{len(task_list)}个任务,耗时{time.time()-start_time}s')
    return results


# woe 配置文件计算

## 先计算每一天的applist的bad数、order数、user数

In [12]:
# 业务逻辑，读取指定目录的文件，然后解析其中的applist，再计算出其对应的月份的woe的切片
def computer_app_woe_daily(file_path:str,save_path:str):
    try:
        raw_df = pd.read_parquet(file_path) # 
        raw_df['apply_month'] = raw_df['apply_time'].dt.strftime('%Y-%m')
        raw_df['bad'] = np.where(raw_df['agr_pd7']==1,raw_df['def_pd7'],None)
        df = raw_df[raw_df['bad'].notnull()]
        if len(df) == 0:
            return 0
        user_apps = parse_json_data(df,'applist_data','app_order_id',['apply_month','bad','apply_time','user_id'])
        tmp_arr = []
        for key,grp in user_apps.groupby(['apply_month','app_package']):
            apply_month,app_package = key
            rs = {}
            bad = grp['bad'].sum()
            order = grp['bad'].count()
            user = grp['user_id'].nunique()
            rs['apply_month'] = apply_month
            rs['app_package'] = app_package
            rs['bad'] = bad
            rs['order'] = order
            rs['user'] = user
            rs['app_name'] = ';'.join(grp['app_name'].unique())
            tmp_arr.append(rs)
        df = pd.DataFrame(tmp_arr)
        df.to_parquet(save_path,compression='zstd')
        return 0 
    except:
        print(f'{file_path}计算失败 \n', traceback.format_exc())
        return 1


In [None]:
tmp_dir = work_dir/'applist_tmp' # 设定计算结果保存的路径
if not tmp_dir.exists():
    tmp_dir.mkdir()
file_paths = data_of_dir(raw_dir,['_YM','_M','_AM'])
task_list = [ (x,  str(tmp_dir/x.split('/')[-1]) )  for x in file_paths ]  # 生成任务的参数列表,[('/home/longxiaolei/raw_mx/newcust/raw_YM_2023-08-16.pqt','/home/mayongzhi/marshall/sender_word_call_app/applist_tmp/raw_YM_2023-08-16.pqt'),  ...]
parallel_process(computer_app_woe_daily,task_list,46)

In [31]:
# 查看计算结果
files = list(tmp_dir.glob('*'))
df = pd.read_parquet(files[0])
df.head()

Unnamed: 0,apply_month,app_package,bad,order,user,app_name
0,2023-08,abunda.unox.tasew.mbak.ssqc.bbva.cfiqoza,1,3,3,Abundance
1,2023-08,ai.powerup.stori,5,9,9,Stori
2,2023-08,air.Cinepolis,1,1,1,Cinépolis
3,2023-08,air.com.cetesdirecto.cetesmobile,1,1,1,cetesdirecto
4,2023-08,android,17,27,27,Sistema Android


In [63]:
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor(16) as executor:
    # 使用 list comprehension 来收集结果
    results = list(tqdm(executor.map(pd.read_parquet, files), total=len(files)))

# 合并所有结果
app_woe_info_20240827 = pd.concat(results, ignore_index=True)

# 进行 groupby 和聚合操作
app_woe_info_20240827 = app_woe_info_20240827.groupby(['apply_month', 'app_package', 'app_name']).agg({
    'bad': 'sum',
    'order': 'sum',
    'user': 'sum'
}).reset_index()

100%|██████████| 1214/1214 [00:03<00:00, 347.07it/s]


In [68]:
app_woe_info_20240827.shape

NameError: name 'app_woe_info_20240827' is not defined

In [67]:
app_woe_info_20240827.head()

Unnamed: 0,apply_month,app_package,bad,order,user
0,2023-01,Adavan.CACInv,0,2,2
1,2023-01,Adavan.PyCCAC,0,2,2
2,2023-01,CFE.CAsistencia,0,1,1
3,2023-01,CMG.GMF,0,1,1
4,2023-01,DOCECG.Doctor,0,4,4


In [56]:
print(app_woe_info_20240827.dtypes)

apply_month    object
app_name       object
bad             int64
order           int64
user            int64
dtype: object


In [70]:
app_woe_monthly.to_parquet(work_dir/'app_woe_info20240827.pqt',compression='zstd')

In [14]:
tmp_dir = work_dir/'applist_tmp'
list_of_M = data_of_dir(tmp_dir, '_M') # 获取所有的M文件
print(len(list_of_M))

584

In [18]:
raw_M_20240827_df = pd.DataFrame()
for raw_file in data_of_dir(raw_dir,'_M_'):
    # 不读'applis_data'列和‘sms_data’列
    raw_file_df = pd.read_parquet(raw_file, columns=['apply_time','agr_pd7','def_pd7','user_id'])
    raw_M_20240827_df = pd.concat([raw_M_20240827_df,raw_file_df])
raw_M_20240827_df.to_parquet(work_dir/'raw_M_20240827.pqt',compression='zstd')

In [22]:
raw_AM_20240827_df = pd.DataFrame()
for raw_file in data_of_dir(raw_dir,'_AM_'):
    raw_file_df = pd.read_parquet(raw_file, columns=['apply_time','agr_pd7','def_pd7','user_id'])
    raw_AM_20240827_df = pd.concat([raw_AM_20240827_df,raw_file_df])
raw_AM_20240827_df.to_parquet(work_dir/'raw_AM_20240827.pqt',compression='zstd')

raw_YM_20240827_df = pd.DataFrame()
for raw_file in data_of_dir(raw_dir,'_YM_'):
    raw_file_df = pd.read_parquet(raw_file, columns=['apply_time','agr_pd7','def_pd7','user_id'])
    raw_YM_20240827_df = pd.concat([raw_YM_20240827_df,raw_file_df])
raw_YM_20240827_df.to_parquet(work_dir/'raw_YM_20240827.pqt',compression='zstd')

## 根据窗口计算woe特性的配置文件

In [35]:
calc_month = ['2023-10', '2023-11', '2023-12','2024-01', '2024-02',
              '2024-03', '2024-04', '2024-05', '2024-06','2024-07', '2024-08']


In [37]:
raw_df_m = pd.read_parquet(work_dir/'raw_M_20240827.pqt')
raw_df_ym = pd.read_parquet(work_dir/'raw_YM_20240827.pqt')
raw_df_am = pd.read_parquet(work_dir/'raw_AM_20240827.pqt')
raw_df_ym['apply_month'] = raw_df_ym['apply_time'].dt.strftime('%Y-%m')
raw_df_m['apply_month'] = raw_df_m['apply_time'].dt.strftime('%Y-%m')
raw_df_am['apply_month'] = raw_df_am['apply_time'].dt.strftime('%Y-%m')
brate_ym=raw_df_ym.groupby('apply_month').agg({
    'def_pd7':'sum',
    'agr_pd7':'sum',
})
brate_m=raw_df_m.groupby('apply_month').agg({
    'def_pd7':'sum',
    'agr_pd7':'sum',
})
brate_am=raw_df_am.groupby('apply_month').agg({
    'def_pd7':'sum',
    'agr_pd7':'sum',
})
brate_df = pd.concat([brate_m, brate_ym, brate_am]).groupby(level=0).sum()
brate_df['brate'] = brate_df['def_pd7']/brate_df['agr_pd7'] # apply_month,def_pd7,agr_pd7,brate

In [38]:
brate_df

Unnamed: 0_level_0,def_pd7,agr_pd7,brate
apply_month,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
2023-01,15185,36151,0.420044
2023-02,9057,23574,0.384194
2023-03,11506,27719,0.415094
2023-04,15567,35852,0.434202
2023-05,11907,29663,0.401409
2023-06,10076,21516,0.468303
2023-07,18954,35067,0.540508
2023-08,12157,27531,0.441575
2023-09,9312,24867,0.374472
2023-10,10211,26521,0.385016


In [34]:
example_file = files[0]
pd.read_parquet(example_file).head()

Unnamed: 0,id,user_id,acq_channel,app_order_id,contract_no,repayment_date,installment_num,installment_amount,principal,interest,...,is_extension,app_order_id_x,repaid,payout,profit,def_ltv,file_name,call_log_data,applist_data,sms_data
1,363,1171937404689764352,ADNF,1171937419898310656,1171937432632217600,2023-11-14,1,155.0,145.0,10.0,...,0,1171937419898310656,155.0,2213.0,-2058.0,1,raw_AM_2023-11-08.pqt,[],"[{""app_name"":""com.android.cts.priv.ctsshim"",""a...","[{""body"":""[FinMex] 3497 es su codigo de verifi..."


In [None]:
# 生成任务的参数列表
tmp_dir = work_dir/'applist_tmp' # 设定计算结果保存的路径
if not tmp_dir.exists():
    tmp_dir.mkdir()
file_paths = data_of_dir(raw_dir,['_YM','_M','_AM'])
task_list = [ (x,  str(tmp_dir/x.split('/')[-1]) )  for x in file_paths ]

# 同时开始46个进程执行任务
parallel_process(computer_app_woe_daily,task_list,46)

### 3个月滑动窗口

In [70]:
app_woe_monthly.head()

Unnamed: 0,apply_month,app_package,bad,order,user
0,2023-01,Adavan.CACInv,0,2,2
1,2023-01,Adavan.PyCCAC,0,2,2
2,2023-01,CFE.CAsistencia,0,1,1
3,2023-01,CMG.GMF,0,1,1
4,2023-01,DOCECG.Doctor,0,4,4


In [42]:
window_size = 3
level_num = 10
app_woe_window_df = pd.DataFrame()
app_woe_window_list = []
for month, end_month, start_month in tqdm(month_window(calc_month,window_size)):
    window_df = app_woe_monthly[(app_woe_monthly['apply_month'] >= start_month) & (app_woe_monthly['apply_month'] <= end_month)]   # 窗口内的所有app
    window_brate_df = brate_df[(brate_df.index >= start_month) & (brate_df.index <= end_month)] # 月逾期率
    brate_total = window_brate_df['def_pd7'].sum() / window_brate_df['agr_pd7'].sum()
    print(f'window_df:{window_df.shape}')
    print('brate_total:',brate_total)
    grouped = window_df.groupby('app_package').agg({
        'bad':'sum',
        'order':'sum',
        'apply_month':'nunique'
    }).reset_index()
    grouped['brate'] = grouped['bad'] / grouped['order']
    grouped['woe'] = np.where(
        (grouped['brate'] == 0) | (grouped['brate'] == 1),
        0,
        np.log((grouped['brate'] / brate_total) / ((1 - grouped['brate']) / (1 - brate_total) + 1e-6))
    )
    grouped['woe'] = grouped['woe'].round(4)
    grouped.rename(columns={'apply_month':'month_nums'},inplace=True)
    grouped['apply_month'] = month
    app_woe_window_list.append(grouped)
app_woe_window_df = pd.concat(app_woe_window_list, ignore_index=True)

  result = getattr(ufunc, method)(*inputs, **kwargs)
  9%|▉         | 1/11 [00:00<00:01,  5.19it/s]

window_df:(157930, 5)
brate_total: 0.46216200766020693
window_df:(154620, 5)
brate_total: 0.4014242451120769


  result = getattr(ufunc, method)(*inputs, **kwargs)
  result = getattr(ufunc, method)(*inputs, **kwargs)
 27%|██▋       | 3/11 [00:00<00:01,  5.38it/s]

window_df:(144945, 5)
brate_total: 0.38852633556710714
window_df:(145036, 5)
brate_total: 0.4065006032219147


  result = getattr(ufunc, method)(*inputs, **kwargs)
  result = getattr(ufunc, method)(*inputs, **kwargs)
 45%|████▌     | 5/11 [00:00<00:01,  5.35it/s]

window_df:(142636, 5)
brate_total: 0.40925671514519874
window_df:(140601, 5)
brate_total: 0.4059470740050135


  result = getattr(ufunc, method)(*inputs, **kwargs)
  result = getattr(ufunc, method)(*inputs, **kwargs)
 64%|██████▎   | 7/11 [00:01<00:00,  5.38it/s]

window_df:(136515, 5)
brate_total: 0.383948587655059
window_df:(133742, 5)
brate_total: 0.38549342580551943


  result = getattr(ufunc, method)(*inputs, **kwargs)
  result = getattr(ufunc, method)(*inputs, **kwargs)
 82%|████████▏ | 9/11 [00:01<00:00,  5.37it/s]

window_df:(142065, 5)
brate_total: 0.408191671714503
window_df:(170498, 5)
brate_total: 0.4331914433101259


  result = getattr(ufunc, method)(*inputs, **kwargs)
  result = getattr(ufunc, method)(*inputs, **kwargs)
100%|██████████| 11/11 [00:02<00:00,  5.17it/s]

window_df:(182629, 5)
brate_total: 0.44126642501132757





In [43]:
app_woe_window_df.head()

Unnamed: 0,app_package,bad,order,month_nums,brate,woe,apply_month
0,7.0.9.1,0,1,1,0.0,0.0,2023-10
1,r_aml_301400200,1,1,1,1.0,0.0,2023-10
2,-,1,2,2,0.5,0.1516,2023-10
3,0,1,1,1,1.0,0.0,2023-10
4,0.0.0,1,2,1,0.5,0.1516,2023-10


In [86]:
app_woe_window_df[app_woe_window_df['month_nums']>1].shape

(422124, 7)

In [87]:
app_woe_window_df[app_woe_window_df['month_nums'] == window_size].shape

(256544, 7)

In [46]:
app_woe_window_df[app_woe_window_df['woe'] < 0 ].shape

(184282, 7)

In [44]:
tmp = []
for apply_month,grp in app_woe_window_df.groupby('apply_month'):
    print(grp[grp['month_nums'] == window_size].shape)
    condition = (grp['month_nums'] == window_size)  & (grp['order'] > 50)
    grp = grp[condition].copy()
    print(apply_month, len(grp))
    if len(grp) > level_num:
        grp.loc[:,'woe_bin_freq'] = pd.qcut(grp['woe'],level_num,duplicates='drop')
        grp.loc[:,'level_freq'] = grp['woe_bin_freq'].cat.codes+1
        grp.loc[:,'woe_bin_dist'] = pd.cut(grp['woe'],level_num,duplicates='drop')
        grp.loc[:,'level_dist'] = grp['woe_bin_dist'].cat.codes+1
        tmp.append(grp)
        print('freq',grp['level_freq'].value_counts().sort_index())
        print('dist',grp['level_dist'].value_counts().sort_index())
    print('='*20)
app_woe_config = pd.concat(tmp)

(23457, 7)
2023-10 7584
freq level_freq
1     759
2     758
3     758
4     759
5     760
6     756
7     759
8     759
9     757
10    759
Name: count, dtype: int64
dist level_dist
1        3
2       22
3      121
4      595
5     2926
6     3001
7      788
8      108
9       17
10       3
Name: count, dtype: int64
(23018, 7)
2023-11 7221
freq level_freq
1     736
2     720
3     735
4     698
5     723
6     721
7     723
8     725
9     724
10    716
Name: count, dtype: int64
dist level_dist
1        1
2        7
3       35
4      174
5      861
6     4088
7     1708
8      289
9       45
10      13
Name: count, dtype: int64
(22381, 7)
2023-12 7318
freq level_freq
1     732
2     734
3     731
4     733
5     729
6     732
7     731
8     732
9     733
10    731
Name: count, dtype: int64
dist level_dist
1        6
2       37
3      255
4     1385
5     4250
6     1204
7      155
8       18
9        6
10       2
Name: count, dtype: int64
(22532, 7)
2024-01 7566
freq level_freq
1     

In [47]:
app_woe_config[app_woe_config['apply_month'] == '2024-05'].head(30)

Unnamed: 0,app_package,bad,order,month_nums,brate,woe,apply_month,woe_bin_freq,level_freq,woe_bin_dist,level_dist
605400,acb2b.app.pr,70,232,3,0.301724,-0.3728,2024-05,"(-1.6749999999999998, -0.344]",1,"(-0.648, -0.306]",4
605412,actinver.bursanet,66,206,3,0.320388,-0.2857,2024-05,"(-0.344, -0.196]",2,"(-0.306, 0.0354]",5
605424,addon.sprd.browser.plugindrm,43,105,3,0.409524,0.1004,2024-05,"(0.0608, 0.124]",8,"(0.0354, 0.377]",6
605426,addon.sprd.documentsui.plugindrm,44,109,3,0.40367,0.0761,2024-05,"(0.0608, 0.124]",8,"(0.0354, 0.377]",6
605427,addon.sprd.downloadprovider,38,123,3,0.308943,-0.3388,2024-05,"(-0.344, -0.196]",2,"(-0.648, -0.306]",4
605440,advanced.scientific.calculator.calc991.plus,34,103,3,0.330097,-0.2415,2024-05,"(-0.344, -0.196]",2,"(-0.306, 0.0354]",5
605457,agency.equilibrio.sodexomexico,936,2564,3,0.365055,-0.0872,2024-05,"(-0.122, -0.0814]",4,"(-0.306, 0.0354]",5
605474,ai.bohemian.multipolls,90,181,3,0.497238,0.4552,2024-05,"(0.264, 1.745]",10,"(0.377, 0.719]",7
605502,ai.moises,85,198,3,0.429293,0.1816,2024-05,"(0.124, 0.264]",9,"(0.0354, 0.377]",6
605504,ai.mybuddy.talkingflashcards_new,40,112,3,0.357143,-0.1215,2024-05,"(-0.122, -0.0814]",4,"(-0.306, 0.0354]",5


In [48]:
app_woe_config['woe'].describe()

count    83951.000000
mean        -0.009053
std          0.279553
min         -1.941600
25%         -0.135300
50%         -0.020600
75%          0.117100
max          4.206500
Name: woe, dtype: float64

In [49]:
app_woe_config.shape

(83951, 11)

In [106]:
app_woe_config[app_woe_config['level_dist'] > 8]

Unnamed: 0,app_package,bad,order,month_nums,brate,woe,apply_month,woe_bin_freq,level_freq,woe_bin_dist,level_dist
12287,call.blacklist.blocker,110,144,3,0.763889,1.3258,2023-10,"(0.415, 1.451]",10,"(1.306, 1.451]",10
48590,com.mi.mxmcocoxme.credit,48,65,3,0.738462,1.1896,2023-10,"(0.415, 1.451]",10,"(1.161, 1.306]",9
57530,com.prestamos.confiables,44,56,3,0.785714,1.4509,2023-10,"(0.415, 1.451]",10,"(1.306, 1.451]",10
95870,1,70,102,3,0.686275,1.1823,2023-11,"(0.39, 1.304]",10,"(1.174, 1.304]",10
95871,1.0,3856,5629,3,0.685024,1.1765,2023-11,"(0.39, 1.304]",10,"(1.174, 1.304]",10
95873,1.0.0,485,697,3,0.695839,1.2271,2023-11,"(0.39, 1.304]",10,"(1.174, 1.304]",10
96156,1.0.8,51,73,3,0.69863,1.2403,2023-11,"(0.39, 1.304]",10,"(1.174, 1.304]",10
96169,1.0.9,37,55,3,0.672727,1.1201,2023-11,"(0.39, 1.304]",10,"(1.043, 1.174]",9
96294,1.1.3,44,66,3,0.666667,1.0927,2023-11,"(0.39, 1.304]",10,"(1.043, 1.174]",9
97484,11,2833,4027,3,0.703501,1.2636,2023-11,"(0.39, 1.304]",10,"(1.174, 1.304]",10


In [50]:
app_woe_config.to_excel(work_dir/f'app_woe_config_{window_size}.xlsx',index=False)

## 每一天的sms数据sender、word的bad数、order数、user数

In [12]:

def get_words(sms_body):
    """
    获取每行的word
    :param sms_body: df 中的'body'列
    :return: 根据sms_data提取的word
    """

    words =  set(re.compile(r'\b[Ññáéíóú¡a-z]+\b').findall(sms_body.lower()))
    words = words.apply(lambda x: x - stop_words)
    words = words.apply(lambda x:list(filter(lambda x: len(x) > 2, x)))
    words = set(sum(words.tolist(),[]))
    words = list(words)
    return words

In [26]:
def get_words(sms_data):
    """
    获取每行的word
    :param sms_data: df 中的'sms_data'列
    :return: 根据sms_data提取的word
    """
    if sms_data == '[]' or sms_data == '' or sms_data is None:
        return []
    parse_df = pd.DataFrame(json.loads(sms_data))
    if 'body' not in parse_df.columns:
        print('body not in columns','id:')
        return []

    words = parse_df['body'].apply(lambda x: set(re.compile(r'\b[Ññáéíóú¡a-z]+\b').findall(str(x).lower())))
    words = words.apply(lambda x: x - stop_words)
    words = words.apply(lambda x:list(filter(lambda x: len(x) > 2, x)))
    # print(words)
    words = set(sum(words.tolist(),[]))
    words = list(words)
    return words

In [14]:
from nltk.corpus import stopwords
stop_words = set(stopwords.words('spanish'))
# 添加自定义停用词
user_stop_words = ['body','phone', 'src_phone', 'read', 'time', 'type']
stop_words.update(user_stop_words)

In [27]:
def computer_sms_woe_daily(file_path:str,save_path:str):
    try:
        raw_df = pd.read_parquet(file_path) #
        raw_df['apply_month'] = raw_df['apply_time'].dt.strftime('%Y-%m')
        raw_df['bad'] = np.where(raw_df['agr_pd7']==1,raw_df['def_pd7'],None)
        df = raw_df[raw_df['bad'].notnull()]
        if len(df) == 0:
            return 0
        user_word = df[['def_pd7','user_id','bad','apply_month']]
        user_word.loc[:,'words'] = df['sms_data'].apply(get_words)
        user_word = user_word.explode('words')

        tmp_arr = []
        for key,grp in user_word.groupby(['apply_month','words']):
            apply_month,word = key
            rs = {}
            bad = grp['bad'].sum()
            order = grp['bad'].count()
            user = grp['user_id'].nunique()
            rs['apply_month'] = apply_month
            rs['word'] = word
            rs['bad'] = bad
            rs['order'] = order
            rs['user'] = user
            tmp_arr.append(rs)
        df = pd.DataFrame(tmp_arr)
        df.to_parquet(save_path,compression='zstd')
        return 0
    except:
        print(f'{file_path}计算失败 \n', traceback.format_exc())
        return 1

In [29]:
tmp_dir = work_dir/'sms_tmp' # 设定计算结果保存的路径
if not tmp_dir.exists():
    tmp_dir.mkdir()
file_paths = data_of_dir(raw_dir,['_YM','_M','_AM'])
# 如果tmp_dir/file_name存在，就不再计算
task_list = [ (x,  str(tmp_dir/x.split('/')[-1]) )  for x in file_paths if not (tmp_dir/x.split('/')[-1]).exists() ]  # 生成任务的参数列表,[('/home/longxiaolei/raw_mx/newcust/raw_YM_2023-08-16.pqt','/home/mayongzhi/marshall/sender_word_call_app/applist_tmp/raw_YM_2023-08-16.pqt'),  ...]
# 生成任务的参数列表,[('/home/longxiaolei/raw_mx/newcust/raw_YM_2023-08-16.pqt','/home/mayongzhi/marshall/sender_word_call_app/applist_tmp/raw_YM_2023-08-16.pqt'),  ...]


In [30]:
len(task_list)

385

In [31]:
parallel_process(computer_sms_woe_daily,task_list,30)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  user_word.loc[:,'words'] = df['sms_data'].apply(get_words)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  user_word.loc[:,'words'] = df['sms_data'].apply(get_words)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  user_word.loc[:,'words'] = df['sms_data'].apply(get_words)
A value is trying to be set 

任务完成，总计385个任务,耗时1556.030321598053s


In [53]:
from concurrent.futures import ProcessPoolExecutor

tmp_dir = work_dir/'sms_tmp'
files = list(tmp_dir.glob('*'))
with ProcessPoolExecutor(16) as executor:
    results = list(tqdm(executor.map(pd.read_parquet, files), total=len(files)))

# 合并所有结果
word_woe_info_20240827 = pd.concat(results, ignore_index=True)

# 进行 groupby 和聚合操作
word_woe_info_20240827 = word_woe_info_20240827.groupby(['apply_month', 'word']).agg({
    'bad': 'sum',
    'order': 'sum',
    'user': 'sum'
}).reset_index()


100%|██████████| 1214/1214 [00:10<00:00, 115.06it/s]


In [54]:
word_woe_info_20240827.shape

(11640982, 5)

In [55]:
word_woe_info_20240827.head()

Unnamed: 0,apply_month,word,bad,order,user
0,2023-01,aaa,961,2580,2580
1,2023-01,aaaa,245,570,570
2,2023-01,aaaaa,144,314,314
3,2023-01,aaaaaa,78,150,150
4,2023-01,aaaaaaa,21,43,43


In [57]:
word_woe_info_20240827[word_woe_info_20240827['word'].str.contains('á')]

Unnamed: 0,apply_month,word,bad,order,user
73,2023-01,aaaaaáaáaaaaaaaaaaaaa,1,1,1
121,2023-01,aaaaáa,3,3,3
287,2023-01,aacá,1,1,1
1055,2023-01,aaá,0,4,4
1056,2023-01,aaáaa,3,3,3
...,...,...,...,...,...
11640776,2024-08,ávila,0,3,3
11640777,2024-08,ávisar,1,1,1
11640778,2024-08,ávise,1,2,2
11640779,2024-08,áviso,1,2,2


In [64]:
word_woe_info_20240827.to_parquet(work_dir/'word_woe_info_20240827.pqt',compression='zstd')

In [65]:
word_woe_info_20240827 = pd.read_parquet(work_dir/'word_woe_info_20240827.pqt')

## sender

In [32]:
def get_senders(sms_data):
    """
    获取每行的phone
    :param sms_data: df 中的'sms_data'列
    :return:
    """
    # row = pd.DataFrame(row).T
    # print(row)
    # 如果‘sms_data'列为空或者为'[]'，则返回空字典

    if sms_data == '[]' or sms_data == '' or sms_data is None:
        return []
    parse_df = pd.DataFrame(json.loads(sms_data))
    if 'src_phone' not in parse_df.columns:
        print('src_phone not in columns')
        return []
    # if len(parse_df[parse_df['src_phone'].str.len()>20]) > 0:
    #     print(parse_df[parse_df['src_phone'].str.len()>20]['src_phone'])
    parse_df = parse_df[parse_df['src_phone'].str.len() <= 30]
    parse_df['national_phone'] = parse_df['src_phone'].apply(lambda x: normalize_phone(x,'MX'))
    phones = parse_df['national_phone'].tolist()
    return phones

def computer_sender_woe_daily(file_path:str,save_path:str):
    try:
        raw_df = pd.read_parquet(file_path) #
        raw_df['apply_month'] = raw_df['apply_time'].dt.strftime('%Y-%m')
        raw_df['bad'] = np.where(raw_df['agr_pd7']==1,raw_df['def_pd7'],None)
        df = raw_df[raw_df['bad'].notnull()]
        if len(df) == 0:
            return 0
        user_word = df[['def_pd7','user_id','bad','apply_month']]
        user_word.loc[:,'senders'] = df['sms_data'].apply(get_senders)
        user_word = user_word.explode('senders')

        tmp_arr = []
        for key,grp in user_word.groupby(['apply_month','senders']):
            apply_month, sender = key
            rs = {}
            bad = grp['bad'].sum()
            order = grp['bad'].count()
            user = grp['user_id'].nunique()
            rs['apply_month'] = apply_month
            rs['sender'] = sender
            rs['bad'] = bad
            rs['order'] = order
            rs['user'] = user
            tmp_arr.append(rs)
        df = pd.DataFrame(tmp_arr)
        df.to_parquet(save_path,compression='zstd')
        return 0
    except:
        print(f'{file_path}计算失败 \n', traceback.format_exc())
        return 1

In [33]:
tmp_dir = work_dir/'sender_tmp' # 设定计算结果保存的路径
if not tmp_dir.exists():
    tmp_dir.mkdir()
file_paths = data_of_dir(raw_dir,['_YM','_M','_AM'])
# 如果tmp_dir/file_name存在，就不再计算
task_list = [ (x,  str(tmp_dir/x.split('/')[-1]) )  for x in file_paths if not (tmp_dir/x.split('/')[-1]).exists() ]  #

In [None]:
parallel_process(computer_sender_woe_daily,task_list,30)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  user_word.loc[:,'senders'] = df['sms_data'].apply(get_senders)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  user_word.loc[:,'senders'] = df['sms_data'].apply(get_senders)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  user_word.loc[:,'senders'] = df['sms_data'].apply(get_senders)
A value is tryin

## 滑动窗口计算的woe