In [1]:
from data_io.oss.obs_client import ObsLib
from data_io.mongo.mongo_client import MongoPoolDao
from data_io.mysql.mysql_client import MysqlPoolDao
from tqdm.notebook import tqdm

from tqdm.contrib.concurrent import thread_map

import pandas as pd
import time
from collections import defaultdict

from sqlalchemy import create_engine

%reload_ext autoreload
%autoreload 2

In [2]:
# db_incash = MysqlPoolDao(202103)
db_super = MysqlPoolDao(202102)

In [3]:
class IpContactBlackList:
    
    @classmethod
    def total_order_first(cls):
        db_super = MysqlPoolDao(202102)
        total_orders=db_super.get_many('''
        select
            repay_plan.order_id,
            check_status,
            FROM_UNIXTIME(repay_plan.repay_date/1000, "%Y-%m-%d") repay_date,
            is_reloan,
            repay_plan.overdue_days
        from orders inner join repay_plan on orders.id=repay_plan.order_id
        where check_status in (8,9) and is_reloan=0
        '''
        )
        return total_orders
    
    @classmethod
    # 首贷到期订单（14天增量）
    def increase_orders_first(cls):
        db_super = MysqlPoolDao(202102)
        
        current_date = pd.Timestamp.now().strftime('%Y-%m-%d')
        current_date = int(time.mktime(time.strptime(current_date, '%Y-%m-%d')))
        
        increase_orders = db_super.get_many('''
        select
            repay_plan.order_id,
            check_status,
            FROM_UNIXTIME(repay_plan.repay_date/1000, "%Y-%m-%d") repay_date,
            is_reloan,
            repay_plan.overdue_days
        from orders inner join repay_plan on orders.id=repay_plan.order_id
        where check_status in (8,9) and is_reloan=0 and repay_plan.repay_date/1000 <= '{current_date}' and repay_plan.repay_date/1000 > UNIX_TIMESTAMP((now()-INTERVAL 14 DAY))
        '''.format(current_date = current_date))
        return increase_orders
    
    @classmethod
    #获取订单对应的IP地址
    def get_ip(cls, bx):
        mongo_client = MongoPoolDao().get_mongo_client()
        origin_dao = mongo_client['mexico']['origin_data']
        i = bx['order_id']
        bean = origin_dao.find_one({'order_id':i}, {'file_key':1})
        if not bean:
            bx['ip']=None
            return
        file_key = bean['file_key']
        d = ObsLib.obs_get(file_key)['user_auth']['base']
        if d['ip']:
            bx['ip'] = d['ip']
        else:
            bx['ip'] = None
            
    @classmethod
    # 获取订单对应的紧急联系人
    def fill_contact_info(cls, bx):
        mongo_client = MongoPoolDao().get_mongo_client()
        risk_dao = mongo_client['mexico']['risk_data']
        i = bx['order_id']
        bean = risk_dao.find_one({'order_id':i},{'req':1})
        if not bean:
            bx['contact1']=None
            bx['contact2']=None
            return
        user_info=bean['req']['hyrule_req_info']['user_info']
        if user_info['contact1']:
            bx['contact1']=user_info['contact1']
        else:
            bx['contact1']=None
        if user_info['contact2']:
            bx['contact2']=user_info['contact2']
        else:
            bx['contact2']=None
            
    @classmethod
    def get_df_ip(cls, orders_table, ips):
        ip_apply_dic = defaultdict(int)
        ip_overdue_dic = defaultdict(int)

        for i in tqdm(orders_table):
            ip = i['ip']
            if ip in ips:
                ip_apply_dic[ip]+=1
                if i['overdue_days']>3:
                    ip_overdue_dic[ip]+=1
        beans_ip = []
        for i in ips:
            beans_ip.append({
                'ip_id':i,
                'apply_cnt':ip_apply_dic[i],
                'overdue_cnt':ip_overdue_dic[i]
            })
        df_ip = pd.DataFrame(beans_ip)
        df_ip['rate'] = df_ip['overdue_cnt']/df_ip['apply_cnt']
        return df_ip
    
    @classmethod
    def get_df_contact(cls, orders_table, contacts):
        contact_apply_dic = defaultdict(int)
        contact_overdue_dic = defaultdict(int)

        for i in tqdm(orders_table):
            contact1 = i['contact1']
            contact2 = i['contact2']
            if contact1 in contacts:
                contact_apply_dic[contact1]+=1
                if i['overdue_days']>3:
                    contact_overdue_dic[contact1]+=1
            if contact2 in contacts:
                contact_apply_dic[contact2]+=1
                if i['overdue_days']>3:
                    contact_overdue_dic[contact2]+=1
        beans_contacts  = []
        for i in contacts:
            beans_contacts.append({
                'contact_id':i,
                'apply_cnt':contact_apply_dic[i],
                'overdue_cnt':contact_overdue_dic[i]
            })
        df_contact = pd.DataFrame(beans_contacts)
        df_contact['rate'] = df_contact['overdue_cnt']/df_contact['apply_cnt']
        return df_contact

    @classmethod
    def get_black_list(cls, orders_table):
        _ = thread_map(cls.get_ip,orders_table,max_workers=10)
        _ = thread_map(cls.fill_contact_info,orders_table,max_workers=10)
        df_orders = pd.DataFrame(orders_table)
        
        ips = set(list(df_orders[df_orders.overdue_days>3].ip.unique()))
        contacts = set(list(df_orders[df_orders.overdue_days>3].contact1.unique()) + list(df_orders[df_orders.overdue_days>3].contact2.unique()))
        if None in ips:
            ips.remove(None)
        if None in contacts:
            contacts.remove(None)
        
        df_ip = cls.get_df_ip(orders_table, ips)
        df_contact = cls.get_df_contact(orders_table, contacts)
        df_ip_blacklist = df_ip[(df_ip['apply_cnt']>10) & (df_ip['rate']>.5)]
        df_contact_balcklist = df_contact[(df_contact.rate>.5)&(df_contact.apply_cnt>4)]
        return df_ip, df_contact, df_ip_blacklist, df_contact_balcklist
    
    @classmethod
    def update_result(cls):
        global df_total_ip, df_total_contact, df_total_ip_blacklist, df_total_contact_balcklist
        increase_orders = cls.increase_orders_first()
        df_increase_result = cls.get_black_list(increase_orders)
        
        df_increase_ip = df_increase_result[0]
        df_total_ip = pd.concat([df_total_ip, df_increase_ip]).groupby('ip_id').sum().reset_index()
        df_total_ip['rate'] = df_total_ip['overdue_cnt']/df_total_ip['apply_cnt']
        df_total_ip.to_csv('super_ip_distribution.csv', index=False)
        cls.save_to_sql(df_total_ip, 'super_ip_distribution')
        
        df_increase_contact = df_increase_result[1]
        df_total_contact = pd.concat([df_total_contact, df_increase_contact]).groupby('contact_id').sum().reset_index()
        df_total_contact['rate'] = df_total_contact['overdue_cnt']/df_total_contact['apply_cnt']
        df_total_contact.to_csv('super_contact_distribution.csv', index=False)
        cls.save_to_sql(df_total_contact, 'super_contact_distribution')
        
        
        df_increase_ip_blacklist = df_increase_result[2]
        df_total_ip_blacklist = pd.concat([df_total_ip_blacklist, df_increase_ip_blacklist]).groupby('ip_id').sum().reset_index()
        df_total_ip_blacklist['rate'] = df_total_ip_blacklist['overdue_cnt']/df_total_ip_blacklist['apply_cnt']
        df_total_ip_blacklist.to_csv('super_ip_blacklist.csv', index=False)
        cls.save_to_sql(df_total_ip_blacklist, 'super_ip_blacklist')
        
        df_increase_contact_balcklist = df_increase_result[3]
        df_total_contact_balcklist = pd.concat([df_total_contact_balcklist, df_increase_contact_balcklist]).groupby('contact_id').sum().reset_index()
        df_total_contact_balcklist['rate'] = df_total_contact_balcklist['overdue_cnt']/df_total_contact_balcklist['apply_cnt']
        df_total_contact_balcklist.to_csv('super_contact_blacklist.csv', index=False)
        cls.save_to_sql(df_total_contact_balcklist, 'super_contact_blacklist')
        
    @classmethod
    def save_to_sql(cls, df, table_name):
        print(f">>> saving to sql {table_name}...")
        engine = create_engine(
            'mysql+pymysql://mexico_risk:BupQ$H4UFNgvy5!#@10.10.1.153:3306/risk_center', pool_pre_ping=True)
        df.to_sql(table_name, engine, index=False, if_exists='replace')
        

In [4]:
total_orders = IpContactBlackList.total_order_first()

In [5]:
df_total_result = IpContactBlackList.get_black_list(total_orders)

  0%|          | 0/603123 [00:00<?, ?it/s]

  0%|          | 0/603123 [00:00<?, ?it/s]

  0%|          | 0/603123 [00:00<?, ?it/s]

  0%|          | 0/603123 [00:00<?, ?it/s]

In [6]:
df_total_ip = df_total_result[0]
df_total_contact = df_total_result[1]
df_total_ip_blacklist = df_total_result[2]
df_total_contact_balcklist = df_total_result[3]

In [7]:
from prefect import task, Flow, Parameter
from prefect.executors import LocalDaskExecutor
from prefect.schedules import IntervalSchedule
from datetime import timedelta, datetime
import pendulum

In [8]:
@task(max_retries=3, retry_delay=timedelta(seconds=10), log_stdout=True)
def result_output():
    IpContactBlackList.update_result()

In [10]:
executor = LocalDaskExecutor(scheduler="threads")

schedule = IntervalSchedule(
    start_date=pendulum.now().utcnow().add(seconds=3),
    interval=timedelta(days=1)
)

with Flow(
    "mx_get_ip_contact_blacklist",
    executor=executor,
    schedule=schedule
)as flow:
    result_output()
#     flow.run()
flow.register(project_name='mx-server')

Flow URL: https://cloud.prefect.io/risk-doowintech-com-s-account/flow/2c5f813a-f079-4164-9311-bccf11422bae
 └── ID: ed414ffe-8d78-40b0-b985-587dff45ca1f
 └── Project: mx-server
 └── Labels: ['ecs-64b9']


'ed414ffe-8d78-40b0-b985-587dff45ca1f'