In [1]:
from pyspark.sql.types import *
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import *
from pyspark.sql.functions import isnan

In [2]:
sc

# Base 1: Cadastro

In [3]:
cadastral = sc.textFile('../data_neurotech/CADASTRAL.txt')

In [4]:
header1 = cadastral.first()
header1

u'CODIGO_CLIENTE\tDATA_NASCIMENTO\tSEXO\tESTADO_CIVIL\tCIDADE\tUF\tBAIRRO\tCEP\tQTD_DEPENDENTES\tDATA_CADASTRO\tDATA_ADMISSAO\tCATEGORIAL_PROFISSAO\tTIPO_RESIDENCIA\tRENDA_TITULAR\tRENDA_CJ\tOUTRAS_RENDAS\tQTD_CARTOES_ADICIONAIS\tFLAG_CONTA_BANCO\tVALOR_HISTORICO_COMPRAS'

In [5]:
cadastral = cadastral.filter(lambda line: line != header1)
cadastral_df = cadastral.map(lambda k: k.split("\t")).toDF(header1.split("\t"))

In [6]:
cadastral_df.filter((isnan(cadastral_df["DATA_CADASTRO"])) | (cadastral_df["DATA_CADASTRO"] == "") | (cadastral_df["DATA_CADASTRO"].isNull())).count()

1433665

In [7]:
cadastral_df = cadastral_df.withColumn("DATA_CADASTRO", to_timestamp("DATA_CADASTRO", "yyyy/MM/dd HH:mm:ss"))
cadastral_df.show(5)

+--------------+--------------------+----+------------+-------------+---+--------+--------+---------------+-------------------+--------------------+--------------------+---------------+-------------+--------+-------------+----------------------+----------------+-----------------------+
|CODIGO_CLIENTE|     DATA_NASCIMENTO|SEXO|ESTADO_CIVIL|       CIDADE| UF|  BAIRRO|     CEP|QTD_DEPENDENTES|      DATA_CADASTRO|       DATA_ADMISSAO|CATEGORIAL_PROFISSAO|TIPO_RESIDENCIA|RENDA_TITULAR|RENDA_CJ|OUTRAS_RENDAS|QTD_CARTOES_ADICIONAIS|FLAG_CONTA_BANCO|VALOR_HISTORICO_COMPRAS|
+--------------+--------------------+----+------------+-------------+---+--------+--------+---------------+-------------------+--------------------+--------------------+---------------+-------------+--------+-------------+----------------------+----------------+-----------------------+
|             3|1951/10/21 00:00:...|   F|          CA|   SANTA ROSA| RS|  GLORIA|98900000|              1|1998-10-28 00:00:00|1990/01/01 0

In [8]:
cadastral_df = cadastral_df.select(['CODIGO_CLIENTE', 'DATA_CADASTRO'])

### Regra 1) Cadastrados a partir de 2017

In [9]:
cadastral_df = cadastral_df.where(cadastral_df["DATA_CADASTRO"] >= "2017-01-01 00:00:00")

### Regra 2) Cadastrados até dezembro de 2017

In [10]:
cadastral_df = cadastral_df.where(cadastral_df["DATA_CADASTRO"] < "2018-01-01 00:00:00")
cadastral_df.count()

178403

In [11]:
cadastral_df.show(5)

+--------------+-------------------+
|CODIGO_CLIENTE|      DATA_CADASTRO|
+--------------+-------------------+
|           697|2017-01-20 00:00:00|
|           875|2017-10-31 00:00:00|
|          1415|2017-09-15 00:00:00|
|          1689|2017-11-30 00:00:00|
|          1871|2017-02-23 00:00:00|
+--------------+-------------------+
only showing top 5 rows



### Regra 3) Data limite: 6 meses após cadastro

In [12]:
cadastral_df = cadastral_df.withColumn("DATA_LIMITE", add_months(cadastral_df['DATA_CADASTRO'], 6))

In [13]:
cadastral_df.show(5)

+--------------+-------------------+-----------+
|CODIGO_CLIENTE|      DATA_CADASTRO|DATA_LIMITE|
+--------------+-------------------+-----------+
|           697|2017-01-20 00:00:00| 2017-07-20|
|           875|2017-10-31 00:00:00| 2018-04-30|
|          1415|2017-09-15 00:00:00| 2018-03-15|
|          1689|2017-11-30 00:00:00| 2018-05-31|
|          1871|2017-02-23 00:00:00| 2017-08-23|
+--------------+-------------------+-----------+
only showing top 5 rows



In [14]:
tcad = cadastral_df.alias('tcad')

# Base 2: Carnê

In [15]:
carne = sc.textFile('../data_neurotech/CARNE.txt')

In [16]:
header = carne.first()
header

u'DATA_CONTRATO\tCODIGO_CONTRATO\tCODIGO_CLIENTE\tDATA_VENCIMENTO\tDATA_PAGAMENTO\tVALOR_PARCELA\tVALOR_ENCARGO\tVALOR_VAGO\tNUMERO_PARCELA\tQTD_PAGAMENTOS\tMODALIDADE\tTIPO_PRODUTO'

In [17]:
#filter out the header, make sure the rest looks correct
carne = carne.filter(lambda line: line != header)

In [18]:
#here's where the changes take place
#this creates a dataframe using whatever pyspark feels like using (I think string is the default). the header.split is providing the names of the columns
carne_df = carne.map(lambda k: k.split("\t")).toDF(header.split("\t"))

In [19]:
carne_df = carne_df.withColumn("DATA_VENCIMENTO", to_timestamp("DATA_VENCIMENTO", "yyyy/MM/dd HH:mm:ss"))
carne_df = carne_df.withColumn("DATA_PAGAMENTO", to_timestamp("DATA_PAGAMENTO", "yyyy/MM/dd HH:mm:ss"))

In [20]:
carne_df = carne_df.select(['CODIGO_CLIENTE','DATA_VENCIMENTO', 'DATA_PAGAMENTO'])

In [22]:
carne_df = carne_df.join(cadastral_df, carne_df.CODIGO_CLIENTE == cadastral_df.CODIGO_CLIENTE, 'inner').select(carne_df.CODIGO_CLIENTE,
                                                                                        carne_df.DATA_VENCIMENTO,
                                                                                        carne_df.DATA_PAGAMENTO,
                                                                                        cadastral_df.DATA_CADASTRO,
                                                                                        cadastral_df.DATA_LIMITE)

In [23]:
carne_df = carne_df.withColumn("PAGAMENTO_LIMITE", add_months(carne_df['DATA_VENCIMENTO'], 6))

In [24]:
carne_df = carne_df.withColumn("FLAG_LIMITE", (carne_df.DATA_VENCIMENTO <= carne_df.DATA_LIMITE).cast('integer'))

In [41]:
carne_df = carne_df.withColumn("FLAG_ATRASO",\
                               ((carne_df.DATA_PAGAMENTO > carne_df.PAGAMENTO_LIMITE) | \
                               (isnull(carne_df.DATA_PAGAMENTO))).cast('integer'))

In [42]:
carne_df.show(10)

+--------------+-------------------+-------------------+-------------------+-----------+----------------+-----------+-----------+
|CODIGO_CLIENTE|    DATA_VENCIMENTO|     DATA_PAGAMENTO|      DATA_CADASTRO|DATA_LIMITE|PAGAMENTO_LIMITE|FLAG_LIMITE|FLAG_ATRASO|
+--------------+-------------------+-------------------+-------------------+-----------+----------------+-----------+-----------+
|       1061056|2018-06-08 00:00:00|2018-06-08 00:00:00|2017-12-08 00:00:00| 2018-06-08|      2018-12-08|          1|          0|
|       1061056|2018-06-08 00:00:00|2018-06-11 00:00:00|2017-12-08 00:00:00| 2018-06-08|      2018-12-08|          1|          0|
|       1061056|2018-06-11 00:00:00|2018-06-11 00:00:00|2017-12-08 00:00:00| 2018-06-08|      2018-12-11|          0|          0|
|      10812385|2017-05-10 00:00:00|2017-05-12 00:00:00|2017-03-31 00:00:00| 2017-09-30|      2017-11-10|          1|          0|
|      10812385|2017-06-10 00:00:00|2017-06-07 00:00:00|2017-03-31 00:00:00| 2017-09-30|  

In [65]:
carne_df = carne_df.select(['CODIGO_CLIENTE', 'FLAG_LIMITE', 'FLAG_ATRASO'])\
.groupBy(['CODIGO_CLIENTE']).agg({'FLAG_LIMITE': 'max', 'FLAG_ATRASO': 'max'})

In [66]:
carne_df.show(10)

+--------------+----------------+----------------+
|CODIGO_CLIENTE|max(FLAG_ATRASO)|max(FLAG_LIMITE)|
+--------------+----------------+----------------+
|       1061056|               0|               1|
|      10812385|               0|               1|
|      11126048|               0|               1|
|       1114919|               0|               1|
|      11158644|               0|               1|
|      11254267|               0|               1|
|      11336822|               0|               1|
|      11337289|               0|               1|
|      11346688|               0|               1|
|      11363677|               0|               1|
+--------------+----------------+----------------+
only showing top 10 rows



In [69]:
carne_df.count()

50493

# Base 3: Cartão

In [26]:
cartao = sc.textFile('../data_neurotech/CARTAO.txt')

In [27]:
header3 = cartao.first()
header3

u'CODIGO_CLIENTE\tCODIGO_CARTAO\tVALOR_MORA\tVALOR_MULTA\tVALOR_TOTAL\tVALOR_MIN\tDATA_VENCIMENTO\tVALOR_PAGO\tTARIFA\tOUTROS_VALORES\tLIMITE\tSOMA_VALOR_DIVIDAS_FUTURAS\tDATA_PAGAMENTO\tQTD_CONTRATOS'

In [28]:
cartao = cartao.filter(lambda line: line != header3)

In [29]:
cartao_df = cartao.map(lambda k: k.split("\t")).toDF(header3.split("\t"))
cartao_df.show(5)

+--------------+-------------+----------+-----------+-----------+---------+--------------------+----------+------+--------------+------+--------------------------+--------------------+-------------+
|CODIGO_CLIENTE|CODIGO_CARTAO|VALOR_MORA|VALOR_MULTA|VALOR_TOTAL|VALOR_MIN|     DATA_VENCIMENTO|VALOR_PAGO|TARIFA|OUTROS_VALORES|LIMITE|SOMA_VALOR_DIVIDAS_FUTURAS|      DATA_PAGAMENTO|QTD_CONTRATOS|
+--------------+-------------+----------+-----------+-----------+---------+--------------------+----------+------+--------------+------+--------------------------+--------------------+-------------+
|            23|    105028440|       0.0|        0.0|     298.79|    64.55|2016/06/10 00:00:...|    298.79|   0.0|          5.99|2604.0|                    8118.6|2016/06/09 00:00:...|            5|
|            23|    105670225|       0.0|        0.0|     307.34|    66.26|2016/07/10 00:00:...|    307.34|   0.0|          5.99|2604.0|                   7811.26|2016/07/08 00:00:...|            6|
|    

In [30]:
cartao_df = cartao_df.withColumn("DATA_VENCIMENTO", to_timestamp("DATA_VENCIMENTO", "yyyy/MM/dd HH:mm:ss"))
cartao_df = cartao_df.withColumn("DATA_PAGAMENTO", to_timestamp("DATA_PAGAMENTO", "yyyy/MM/dd HH:mm:ss"))

In [31]:
cartao_df = cartao_df.select(['CODIGO_CLIENTE','DATA_VENCIMENTO', 'DATA_PAGAMENTO'])

In [33]:
cartao_df = cartao_df.join(cadastral_df, cartao_df.CODIGO_CLIENTE == cadastral_df.CODIGO_CLIENTE, 'inner').select(cartao_df.CODIGO_CLIENTE,
                                                                                        cartao_df.DATA_VENCIMENTO,
                                                                                        cartao_df.DATA_PAGAMENTO,
                                                                                        cadastral_df.DATA_CADASTRO,
                                                                                        cadastral_df.DATA_LIMITE)


In [34]:
cartao_df = cartao_df.withColumn("PAGAMENTO_LIMITE", add_months(cartao_df['DATA_VENCIMENTO'], 6))

In [35]:
cartao_df = cartao_df.withColumn("FLAG_LIMITE", (cartao_df.DATA_VENCIMENTO <= cartao_df.DATA_LIMITE).cast('integer'))

In [43]:
cartao_df = cartao_df.withColumn("FLAG_ATRASO",\
                               ((cartao_df.DATA_PAGAMENTO > cartao_df.PAGAMENTO_LIMITE) | \
                               (isnull(cartao_df.DATA_PAGAMENTO))).cast('integer'))

In [44]:
cartao_df.show(10)

+--------------+-------------------+-------------------+-------------------+-----------+----------------+-----------+-----------+
|CODIGO_CLIENTE|    DATA_VENCIMENTO|     DATA_PAGAMENTO|      DATA_CADASTRO|DATA_LIMITE|PAGAMENTO_LIMITE|FLAG_LIMITE|FLAG_ATRASO|
+--------------+-------------------+-------------------+-------------------+-----------+----------------+-----------+-----------+
|       1061056|2018-06-08 00:00:00|2018-06-11 00:00:00|2017-12-08 00:00:00| 2018-06-08|      2018-12-08|          1|          0|
|      10680802|2017-08-25 00:00:00|2017-08-25 00:00:00|2017-07-29 00:00:00| 2018-01-29|      2018-02-25|          1|          0|
|      10680802|2017-09-25 00:00:00|               null|2017-07-29 00:00:00| 2018-01-29|      2018-03-25|          1|          1|
|      10680802|2017-10-25 00:00:00|               null|2017-07-29 00:00:00| 2018-01-29|      2018-04-25|          1|          1|
|      10680802|2017-11-25 00:00:00|               null|2017-07-29 00:00:00| 2018-01-29|  

In [62]:
cartao_df = cartao_df.select(['CODIGO_CLIENTE', 'FLAG_LIMITE', 'FLAG_ATRASO'])\
.groupBy(['CODIGO_CLIENTE']).agg({'FLAG_LIMITE': 'max', 'FLAG_ATRASO': 'max'})

In [64]:
cartao_df.show(10)

+--------------+----------------+----------------+
|CODIGO_CLIENTE|max(FLAG_ATRASO)|max(FLAG_LIMITE)|
+--------------+----------------+----------------+
|       1061056|               0|               1|
|      10680802|               1|               1|
|       1068245|               1|               1|
|       1114919|               0|               1|
|      11158644|               0|               1|
|      11254267|               1|               1|
|      11336822|               0|               1|
|      11342640|               0|               1|
|      11346688|               1|               1|
|      11363677|               1|               1|
+--------------+----------------+----------------+
only showing top 10 rows



In [68]:
cartao_df.count()

86865

# Concatenação das bases

In [67]:
concat = carne_df.union(cartao_df)

In [70]:
concat.count()

137358

In [None]:
clients = concat.select(col("CODIGO_CLIENTE").alias("CODIGO_CLIENTE"),
                        col("max(FLAG_ATRASO)").alias("CLASS"),
                       col("max(FLAG_LIMITE)").alias("ELEGIBLE"))
clients = clients.select(['CODIGO_CLIENTE', 'CLASS', 'ELEGIBLE'])\
.groupBy(['CODIGO_CLIENTE']).agg({'CLASS': 'max', 'ELEGIBLE': 'max'})
clients.count()
clients = clients.select(['CODIGO_CLIENTE', 'max(CLASS)']).where(clients['max(ELEGIBLE)'] > 0)
clients = clients.select(col("CODIGO_CLIENTE").alias("CODIGO_CLIENTE"),
                        col("max(CLASS)").alias("CLASS"))
base_final = cadastral_df.join(clients, clients.CODIGO_CLIENTE == cadastral_df.CODIGO_CLIENTE, 'inner').select(cadastral_df.CODIGO_CLIENTE,
                                                                                                              cadastral_df.DATA_NASCIMENTO,
                                                                                                              cadastral_df.SEXO,
                                                                                                              cadastral_df.ESTADO_CIVIL,
                                                                                                              cadastral_df.CIDADE,
                                                                                                              cadastral_df.UF,
                                                                                                              cadastral_df.BAIRRO,
                                                                                                              cadastral_df.CEP,
                                                                                                              cadastral_df.QTD_DEPENDENTES,
                                                                                                              cadastral_df.DATA_CADASTRO,
                                                                                                              cadastral_df.DATA_ADMISSAO,
                                                                                                              cadastral_df.CATEGORIAL_PROFISSAO,
                                                                                                              cadastral_df.TIPO_RESIDENCIA,
                                                                                                              cadastral_df.RENDA_TITULAR,
                                                                                                              cadastral_df.RENDA_CJ,
                                                                                                              cadastral_df.OUTRAS_RENDAS,
                                                                                                              cadastral_df.QTD_CARTOES_ADICIONAIS,
                                                                                                              cadastral_df.FLAG_CONTA_BANCO,
                                                                                                              cadastral_df.VALOR_HISTORICO_COMPRAS,
                                                                                                              clients.CLASS)
base_final.show(5)
