In [1]:
import os
import sys
import time
import datetime

spark_home = '/opt/cloudera/parcels/SPARK2_INCLUDE_SPARKR/lib/spark2'
os.environ['SPARK_HOME'] = spark_home
os.environ['PYSPARK_DRIVER_PYTHON'] = 'python'
os.environ['PYSPARK_PYTHON'] = '/opt/cloudera/parcels/PYENV.ZNO20008661/bin/python'
sys.path.insert(0, os.path.join (spark_home,'python'))
sys.path.insert(0, os.path.join (spark_home,'python/lib/py4j-0.10.4-src.zip'))
from pyspark import SparkContext, SparkConf, HiveContext

N_INSTANCES = 100
N_CORES = 1
PARALLELISM = N_INSTANCES*N_CORES*10
conf = SparkConf().setAppName('Prep-SET').setMaster("yarn-client")
conf.setAll(
    [
        ('spark.driver.memory','20g'),
        ('spark.executor.memory','8g'),
        ('spark.driver.maxResultSize','5g'),
        ('spark.port.maxRetries', '150'),
        ('spark.executor.cores', N_CORES),
        ('spark.executor.instances',N_INSTANCES),
        ('spark.default.parallelism',PARALLELISM),
        ('spark.sql.shuffle.partitions',PARALLELISM),
        ('spark.yarn.executor.memoryOverhead','20g'),
        ('spark.dynamicAllocation.enabled', 'false'),
        ('spark.dynamicAllocation.enabled', 'false')
        #('spark.kryoserializer.buffer.max','1g'),
        #('spark.dynamicAllocation.minExecutors',50),
        #('spark.dynamicAllocation.maxExecutors',100),
    ]
)

sc = SparkContext.getOrCreate(conf=conf)
sqlc=HiveContext(sc)
sqlContext=HiveContext(sc)
print('Context is ready {DTTM}'.format(DTTM = datetime.datetime.fromtimestamp(time.time()).isoformat()))   

Context is ready 2018-04-26T14:19:34.189465


In [2]:
import os
import sys
import time
import datetime
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from matplotlib.cm import inferno
from matplotlib.backends import backend_pdf
from dateutil.relativedelta import relativedelta
from gc import collect
%matplotlib inline
print('libs ready {}' .format(datetime.datetime.fromtimestamp(time.time()).isoformat()))

libs ready 2018-04-26T14:20:09.366170


In [34]:
#Згрузим данные из HDFS в Pandas
paym =  sqlContext.sql('select * from t_team_k7m_aux_p.set_paym')
tr=paym.toPandas()
wro =  sqlContext.sql('select * from t_team_k7m_aux_p.set_wro')
wo=wro.toPandas()
tc4 =  sqlContext.sql('select * from t_team_k7m_aux_p.set_cred_enriched')
deals=tc4.toPandas()
print('step {}' .format(datetime.datetime.fromtimestamp(time.time()).isoformat()))

step 2018-04-26T16:12:09.706741


In [35]:
#переведем даты в даты
tr['c_date_notunix'] = pd.to_datetime(tr['c_date'])
tr = tr.sort_values('c_date_notunix')
tr = tr.reset_index(drop = True)

deals['c_date_ending'] = pd.to_datetime(deals['c_date_ending'])
deals['ddog'] = pd.to_datetime(deals['ddog'])
deals['date_close'] = pd.to_datetime(deals['date_close'])


wo['c_date_notunix'] = pd.to_datetime(wo['c_date'])
wo = wo.sort_values('c_date_notunix')
wo = wo.reset_index(drop = True)
print('step {}' .format(datetime.datetime.fromtimestamp(time.time()).isoformat()))

step 2018-04-26T16:12:10.709666


In [36]:
#на всякий пожарный оставим в транзакциях только транзакции которые относятся к нашим договорам
tr = tr[ tr['pr_cred_id'].isin(deals['pr_cred_id'].unique()) ].reset_index(drop = True)
print('step {}' .format(datetime.datetime.fromtimestamp(time.time()).isoformat()))

step 2018-04-26T16:12:16.357069


In [37]:
#создадис датасет = ответ
agg=deals.copy().reset_index()
print('step {}' .format(datetime.datetime.fromtimestamp(time.time()).isoformat()))

step 2018-04-26T16:12:16.670384


In [38]:
#сагрегируем данные о транзакциях гашения
trs = tr.groupby('pr_cred_id')['c_summa'].sum().reset_index()
trs['c_summa_in_cr_v'] = tr.groupby(['pr_cred_id'])['c_summa_in_cr_v'].sum().values
trs['start_date'] = tr.groupby(['pr_cred_id'])['c_date_notunix'].min().values
trs['end_date'] = tr.groupby(['pr_cred_id'])['c_date_notunix'].max().values
trs['count'] = tr.groupby(['pr_cred_id']).size().values
agg = agg.merge(trs, on = 'pr_cred_id', how='left')
print('step {}' .format(datetime.datetime.fromtimestamp(time.time()).isoformat()))

step 2018-04-26T16:12:54.879599


In [39]:
#сагрегируем данные о транзакциях списания
trs_wo = wo.groupby('pr_cred_id')['c_summa'].sum().reset_index()[['pr_cred_id','c_summa']]
trs_wo['c_summa_in_cr_v'] = wo.groupby(['pr_cred_id'])['c_summa_in_cr_v'].sum().values
trs_wo.rename(columns={'c_summa':'wo_summa', 'c_summa_in_cr_v':'wo_summa_in_cr_v'}, inplace=True)
agg = agg.merge(trs_wo, on = 'pr_cred_id', how='left')
agg['wo_summa'].fillna(0,inplace=True)
agg['wo_summa_in_cr_v'].fillna(0,inplace=True)
print('step {}' .format(datetime.datetime.fromtimestamp(time.time()).isoformat()))

step 2018-04-26T16:12:55.107875


In [40]:
#продолжим формировать агрегаты. сумма по транзакциям, накопленную сумму по транзакциям, когда мы прошли 5%, 10%, 15% уровень.
tr['total'] = tr.groupby('pr_cred_id')['c_summa'].transform('sum')
tr['total_in_cr_v'] = tr.groupby('pr_cred_id')['c_summa_in_cr_v'].transform('sum')
tr['c_summa'] = pd.to_numeric(tr['c_summa'])
tr['cumsum'] = tr.groupby('pr_cred_id')['c_summa'].cumsum()
tr['c_summa_in_cr_v'] = pd.to_numeric(tr['c_summa_in_cr_v'])
tr['cumsum_in_cr_v'] = tr.groupby('pr_cred_id')['c_summa_in_cr_v'].cumsum()
tr['total_in_cr_v'] = pd.to_numeric(tr['total_in_cr_v'])
tr['alpha_005'] = (tr['cumsum_in_cr_v']>tr['total_in_cr_v']*float(0.05))
tr['alpha_01'] = (tr['cumsum_in_cr_v']>tr['total_in_cr_v']*0.1)
tr['alpha_015'] = (tr['cumsum_in_cr_v']>tr['total_in_cr_v']*0.15)
print('step {}' .format(datetime.datetime.fromtimestamp(time.time()).isoformat()))

step 2018-04-26T16:13:34.892329


In [41]:
#для  5%, 10%, 15% уровеня подсчитаем сумму до и дату
argmax_custom = lambda x : x.argmax()

trs2 = tr.groupby('pr_cred_id')['alpha_005'].apply( argmax_custom ).reset_index()
trs2.rename(columns={'alpha_005':'alpha_005_id'}, inplace=True)

trs2['alpha_01_id'] = tr.groupby('pr_cred_id')['alpha_01'].apply( argmax_custom ).values
trs2['alpha_015_id'] = tr.groupby('pr_cred_id')['alpha_015'].apply( argmax_custom ).values
trs2['total']=tr.groupby('pr_cred_id')['c_summa_in_cr_v'].sum().values

trs2['alpha_005_cumsum'] = tr.ix[trs2['alpha_005_id'], 'cumsum_in_cr_v'].values
trs2['alpha_01_cumsum'] = tr.ix[trs2['alpha_01_id'], 'cumsum_in_cr_v'].values
trs2['alpha_015_cumsum'] = tr.ix[trs2['alpha_015_id'], 'cumsum_in_cr_v'].values

trs2['alpha_005_date'] = tr.ix[trs2['alpha_005_id'], 'c_date_notunix'].values
trs2['alpha_01_date'] = tr.ix[trs2['alpha_01_id'], 'c_date_notunix'].values
trs2['alpha_015_date'] = tr.ix[trs2['alpha_015_id'], 'c_date_notunix'].values

agg = agg.merge(trs2, on = 'pr_cred_id', how='left')
print('step {}' .format(datetime.datetime.fromtimestamp(time.time()).isoformat()))

.ix is deprecated. Please use
.loc for label based indexing or
.iloc for positional indexing

See the documentation here:
http://pandas.pydata.org/pandas-docs/stable/indexing.html#deprecate_ix


step 2018-04-26T16:14:05.371028


In [42]:
#определим баллон, и его дату
tr3 = tr.groupby('pr_cred_id')['c_date_notunix'].last().reset_index()
tr3.rename(columns={'c_date_notunix':'balon_date'}, inplace=True)
agg = agg.merge(tr3, on = 'pr_cred_id', how='left')

agg['balon_date']=agg['c_date_ending'].where(agg['balon_date'].isnull(),other = agg['balon_date'])

agg['days_after_balon'] = (agg['balon_date'] - agg['ddog']).apply( lambda x :\
                                                 relativedelta(days = int(x.days*0.9)).days)
agg['date_balon_starts'] = (agg['balon_date'] - agg['ddog']).apply( lambda x :\
                                                 relativedelta(days = int(x.days*0.9)))
agg['days_balon_starts'] = agg.apply( lambda x : x['ddog'] + x['date_balon_starts'], axis = 1)

tr = tr.merge(agg[['pr_cred_id', 'days_balon_starts'] ], on = 'pr_cred_id', how='left')

sum_balon = lambda x : x['c_summa_in_cr_v']*(x['c_date_notunix']>=x['days_balon_starts'])
tr['isin_balon'] = tr['c_date_notunix']>=tr['days_balon_starts']
tr['balon_part'] = tr.apply( lambda x : sum_balon(x), axis = 1)

tr4 = tr.groupby('pr_cred_id')['balon_part'].sum().reset_index()
tr4.rename(columns={'balon_part':'balon_sum'}, inplace=True)
agg = agg.merge(tr4, on = 'pr_cred_id', how='left')
print('step {}' .format(datetime.datetime.fromtimestamp(time.time()).isoformat()))

step 2018-04-26T16:18:24.284325


In [43]:
#переведем даты в дистанции от ddog
agg['days_over_lim'] = (agg['c_date_ending'] - agg['end_date']).apply( lambda x : x.days)
print('step {}' .format(datetime.datetime.fromtimestamp(time.time()).isoformat()))

step 2018-04-26T16:18:25.264318


In [44]:
agg['start_date'] = agg['start_date'] - agg['ddog']
agg['end_date']  = (agg['end_date'] - agg['ddog']).apply( lambda x : x.days)
agg['avp_in_days'].fillna(0,inplace=True)
print('step {}' .format(datetime.datetime.fromtimestamp(time.time()).isoformat()))

step 2018-04-26T16:18:26.276878


In [45]:

agg['alpha_005_date'] = (agg['alpha_005_date'] - agg['ddog'])# - agg['avp_in_days'].apply(lambda x: timedelta(days = x))
agg['alpha_01_date'] = (agg['alpha_01_date'] - agg['ddog'] )#- agg['avp_in_days'].apply(lambda x: timedelta(days = x ))
agg['alpha_015_date'] = agg['alpha_015_date'] - agg['ddog']#- agg['avp_in_days'].apply(lambda x: timedelta(days = x))
agg['balon_date'] = agg['balon_date'] - agg['ddog']
print('step {}' .format(datetime.datetime.fromtimestamp(time.time()).isoformat()))

step 2018-04-26T16:18:26.303266


In [46]:
#определим параметры гаммы
tr['graph_y'] = 1-(tr['cumsum_in_cr_v']-tr['c_summa_in_cr_v'])/tr['total_in_cr_v']
tr = tr.merge(deals[['pr_cred_id', 'ddog', 'dur_in_days']], on = 'pr_cred_id')
tr['graph_x'] = (tr['c_date_notunix'] - tr['ddog']).apply( lambda x : x.days)/tr['dur_in_days']
tr['ro'] = np.sqrt( (1-tr['graph_y'])**2 + (1-tr['graph_x'])**2 )
 
agg = agg.merge(tr[tr['ro'] == tr.groupby('pr_cred_id')['ro'].transform('min')\
            ][['pr_cred_id', 'graph_x', 'graph_y']], on = 'pr_cred_id', how='left')
agg['bias'] = agg['graph_x'] + agg['graph_y']
agg['gamma']= -1 + agg['bias']
print('step {}' .format(datetime.datetime.fromtimestamp(time.time()).isoformat()))

step 2018-04-26T16:18:48.251393


In [47]:
#переведем суммы во флоат
agg['summa_ru'] = agg['summa_ru'].astype('float')
agg['summa_base'] = agg['summa_base'].astype('float')
print('step {}' .format(datetime.datetime.fromtimestamp(time.time()).isoformat()))

step 2018-04-26T16:18:48.296815


In [48]:
#определим накопленные суммы в долях от суммы
agg['alpha_005_cumsum_t'] = agg['alpha_005_cumsum']/agg['total']
agg['alpha_01_cumsum_t'] = agg['alpha_01_cumsum']/agg['total']
agg['alpha_015_cumsum_t'] = agg['alpha_015_cumsum']/agg['total']
agg['balon_sum_t'] = agg['balon_sum']/agg['total']

agg['alpha_005_cumsum_d'] = agg['alpha_005_cumsum']/agg['summa_base']
agg['alpha_01_cumsum_d'] = agg['alpha_01_cumsum']/agg['summa_base']
agg['alpha_015_cumsum_d'] = agg['alpha_015_cumsum']/agg['summa_base']
agg['balon_sum_d'] = agg['balon_sum']/agg['summa_base']
print('step {}' .format(datetime.datetime.fromtimestamp(time.time()).isoformat()))

step 2018-04-26T16:18:48.322482


In [49]:
#а сроки в полях от дюрации
for c in ['start_date', 'alpha_005_date', 'alpha_01_date', 'alpha_015_date', 'balon_date']:
    agg[c] = agg[c].apply( lambda x : x.days)/(agg['dur_in_days']) 
    print('step {}' .format(datetime.datetime.fromtimestamp(time.time()).isoformat()))

step 2018-04-26T16:18:49.405559
step 2018-04-26T16:18:51.010931
step 2018-04-26T16:18:52.564736
step 2018-04-26T16:18:54.237262
step 2018-04-26T16:18:55.348287


In [50]:
#сохранение
agg2=agg.copy()
agg2=agg2.drop('date_balon_starts',axis=1)
print('step {}' .format(datetime.datetime.fromtimestamp(time.time()).isoformat()))

step 2018-04-26T16:18:55.870470


In [70]:
#сохраняем результат в файл
from pyspark.sql.types import *

mySchema = StructType([\
  StructField("index", IntegerType(), True)\
 ,StructField("client_id", StringType(), True)\
 ,StructField("client_name", StringType(), True)\
 ,StructField("inn", StringType(), True)\
 ,StructField("kpp", StringType(), True)\
 ,StructField("crm_segment", StringType(), True)\
 ,StructField("ndog", StringType(), True)\
 ,StructField("ddog", StringType(), True)\
 ,StructField("dur_in_days", IntegerType(), True)\
 ,StructField("dur_in_month", DoubleType(), True)\
 ,StructField("c_date_ending", StringType(), True)\
 ,StructField("date_close", StringType(), True)\
 ,StructField("summa_base", DoubleType(), True)\
 ,StructField("summa_ru", DoubleType(), True)\
 ,StructField("c_ft_credit", StringType(), True)\
 ,StructField("limit_amt", StringType(), True)\
 ,StructField("regime_prod", StringType(), True)\
 ,StructField("cred_flag", IntegerType(), True)\
 ,StructField("over_flag", IntegerType(), True)\
 ,StructField("instrument", StringType(), True)\
 ,StructField("avp_in_days", DoubleType(), True)\
 ,StructField("avp_in_month", DoubleType(), True)\
 ,StructField("target_cred", StringType(), True)\
 ,StructField("product_status", StringType(), True)\
 ,StructField("c_list_pay", StringType(), True)\
 ,StructField("pr_cred_id", StringType(), True)\
 ,StructField("branch", StringType(), True)\
 ,StructField("branch_ru", StringType(), True)\
 ,StructField("int_revenue", StringType(), True)\
 ,StructField("fok_revenue", StringType(), True)\
 ,StructField("report_month_ago", DoubleType(), True)\
 ,StructField("c_summa", StringType(), True)\
 ,StructField("c_summa_in_cr_v", StringType(), True)\
 ,StructField("start_date", DoubleType(), True)\
 ,StructField("end_date", DoubleType(), True)\
 ,StructField("count", DoubleType(), True)\
 ,StructField("wo_summa", StringType(), True)\
 ,StructField("wo_summa_in_cr_v", StringType(), True)\
 ,StructField("alpha_005_id", DoubleType(), True)\
 ,StructField("alpha_01_id", DoubleType(), True)\
 ,StructField("alpha_015_id", DoubleType(), True)\
 ,StructField("total", DoubleType(), True)\
 ,StructField("alpha_005_cumsum", DoubleType(), True)\
 ,StructField("alpha_01_cumsum", DoubleType(), True)\
 ,StructField("alpha_015_cumsum", DoubleType(), True)\
 ,StructField("alpha_005_date", DoubleType(), True)\
 ,StructField("alpha_01_date", DoubleType(), True)\
 ,StructField("alpha_015_date", DoubleType(), True)\
 ,StructField("balon_date", DoubleType(), True)\
 ,StructField("days_after_balon", IntegerType(), True)\
 ,StructField("days_balon_starts", StringType(), True)\
 ,StructField("balon_sum", DoubleType(), True)\
 ,StructField("days_over_lim", DoubleType(), True)\
 ,StructField("graph_x", DoubleType(), True)\
 ,StructField("graph_y", DoubleType(), True)\
 ,StructField("bias", DoubleType(), True)\
 ,StructField("gamma", DoubleType(), True)\
 ,StructField("alpha_005_cumsum_t", DoubleType(), True)\
 ,StructField("alpha_01_cumsum_t", DoubleType(), True)\
 ,StructField("alpha_015_cumsum_t", DoubleType(), True)\
 ,StructField("balon_sum_t", DoubleType(), True)\
 ,StructField("alpha_005_cumsum_d", DoubleType(), True)\
 ,StructField("alpha_01_cumsum_d", DoubleType(), True)\
 ,StructField("alpha_015_cumsum_d", DoubleType(), True)\
 ,StructField("balon_sum_d", DoubleType())\
 ,StructField("nkl_flag", IntegerType(), True)\
 ,StructField("vkl_flag", IntegerType(), True)\
 ,StructField("num_cr_desc", IntegerType(), True)\
 ])
agg_S =agg2[['index','client_id','client_name','inn','kpp','crm_segment','ndog','ddog','dur_in_days','dur_in_month','c_date_ending','date_close','summa_base','summa_ru','c_ft_credit','limit_amt','regime_prod','cred_flag','over_flag','instrument','avp_in_days','avp_in_month','target_cred','product_status','c_list_pay','pr_cred_id','branch','branch_ru','int_revenue','fok_revenue','report_month_ago','c_summa','c_summa_in_cr_v','start_date','end_date','count','wo_summa','wo_summa_in_cr_v','alpha_005_id','alpha_01_id','alpha_015_id','total','alpha_005_cumsum','alpha_01_cumsum','alpha_015_cumsum','alpha_005_date','alpha_01_date','alpha_015_date','balon_date','days_after_balon','days_balon_starts','balon_sum','days_over_lim','graph_x','graph_y','bias','gamma','alpha_005_cumsum_t','alpha_01_cumsum_t','alpha_015_cumsum_t','balon_sum_t','alpha_005_cumsum_d','alpha_01_cumsum_d','alpha_015_cumsum_d','balon_sum_d','nkl_flag','vkl_flag','num_cr_desc']]
sqlc.createDataFrame(agg_S, schema=mySchema).write.format("parquet").mode('overwrite').saveAsTable("t_team_k7m_aux_p.set_bo_agg") 
print('step {}' .format(datetime.datetime.fromtimestamp(time.time()).isoformat()))

step 2018-04-26T16:35:15.912189
