# Data Engineering Technical Test

Versão do spark:

In [1]:
sc.version

u'2.4.4'

## Carregando arquivos de entrada

In [2]:
payments_sample = spark.read.parquet("input/payments_sample.parquet")

In [3]:
installments_sample = spark.read.json("input/installments_sample.json")

In [4]:
loans_sample = spark.read.csv("input/loans_sample.csv",header=True, inferSchema=True)

## Análise dos dados de entrada

Iniciarei a análise dos dados pelo dataframe payments_sample

In [5]:
payments_sample.printSchema()

root
 |-- loan_id: long (nullable = true)
 |-- payment_date: date (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- payment_id: string (nullable = true)
 |-- paid_amount: double (nullable = true)



In [6]:
payments_sample.show()

+-------+------------+--------------+--------------------+-----------+
|loan_id|payment_date|payment_method|          payment_id|paid_amount|
+-------+------------+--------------+--------------------+-----------+
|    291|  2017-05-30|           ted|ed5a489a-bb07-456...|     547.92|
|    291|  2017-06-26|    creditcard|7abaf860-9632-40e...|     547.92|
|    291|  2017-08-13|    creditcard|e659b8d6-8d7d-48f...|     557.92|
|    291|  2017-10-17|           ted|4f0566cb-c9ef-451...|     552.92|
|    291|  2017-09-11|           ted|8c017813-1df5-459...|     547.92|
|    291|  2018-05-19|    creditcard|9149225e-3d63-4b6...|     570.92|
|    291|  2017-12-09|    creditcard|37f2896e-8d9c-49c...|     547.92|
|    291|  2017-11-04|    creditcard|3caab925-de5a-47f...|     547.92|
|    291|  2018-02-06|    creditcard|7c70cef2-4fc5-4bf...|     547.92|
|    291|  2018-03-26|    creditcard|bf207cf6-6d4f-414...|     547.92|
|    291|  2018-01-09|           ted|3b1d333c-d47e-45c...|     547.92|
|    2

In [7]:
payments_sample.count()

6930

In [8]:
print("Total de valores nulos por coluna:")
for col in payments_sample.columns:
  print(" {}={}".format(col, payments_sample.filter(payments_sample[col].isNull() ).count()) )

Total de valores nulos por coluna:
 loan_id=0
 payment_date=0
 payment_method=0
 payment_id=0
 paid_amount=0


In [9]:
payments_sample.select("loan_id").distinct().count()

775

O dataframe payments_sample apresenta as informações sobre os pagamentos dos emprestimos, possuindo informações como o identificador do emprestimo, data de pagamento, metódo de pagamento e valor pago. Possui 6930 registros e não possui valores nulos. Esses registros são referentes a um total de 775 emprestimos realizados.

In [10]:
loans_sample.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- loan_id: integer (nullable = true)
 |-- period: integer (nullable = true)
 |-- accepted_at: timestamp (nullable = true)
 |-- payday: integer (nullable = true)
 |-- interest_rate: double (nullable = true)



In [11]:
loans_sample.show(truncate=False)

+---+-------+------+-----------------------+------+-------------+
|_c0|loan_id|period|accepted_at            |payday|interest_rate|
+---+-------+------+-----------------------+------+-------------+
|0  |0      |12    |2017-05-19 10:09:47.285|25    |3.12         |
|1  |1      |12    |2017-05-18 20:27:41.197|25    |7.55         |
|2  |2      |12    |2017-05-18 22:04:41.276|25    |7.55         |
|3  |3      |12    |2017-05-17 23:47:24.822|5     |7.55         |
|4  |4      |9     |2017-05-23 00:15:10.971|15    |7.49         |
|5  |5      |12    |2017-05-19 08:42:43.162|25    |3.12         |
|6  |6      |9     |2017-05-16 08:23:54.438|15    |7.49         |
|7  |7      |12    |2017-05-22 17:17:49.956|5     |7.55         |
|8  |8      |12    |2017-05-21 07:03:02.534|15    |7.55         |
|9  |9      |12    |2017-05-23 12:33:22.614|15    |7.55         |
|10 |10     |12    |2017-05-20 18:06:31.745|5     |7.55         |
|11 |11     |12    |2017-05-22 13:36:45.83 |15    |7.55         |
|12 |12   

In [12]:
loans_sample.count()

788

In [13]:
print("Total de valores nulos por coluna:")
for col in loans_sample.columns:
  print(" {}={}".format(col, loans_sample.filter(loans_sample[col].isNull() ).count()) )

Total de valores nulos por coluna:
 _c0=0
 loan_id=0
 period=0
 accepted_at=0
 payday=0
 interest_rate=0


In [14]:
loans_sample.select("loan_id").distinct().count()

788

O dataframe loans_sample apresenta informações como identificador do emprestimo, data aceitação do emprestimo entre outras. Possui 788 registros e não possui valores nulos. Esses registros são referentes a um total de 788 emprestimos realizados.

In [15]:
installments_sample.show()

+--------------------+--------------------+--------------------+--------------------+
|            due_date|   installment_value|             loan_id|              number|
+--------------------+--------------------+--------------------+--------------------+
|[1497225600000, 1...|[547.92, 547.92, ...|[291, 291, 291, 7...|[1, 2, 8, 6, 9, 1...|
+--------------------+--------------------+--------------------+--------------------+



In [16]:
installments_sample.printSchema()

root
 |-- due_date: struct (nullable = true)
 |    |-- 0: long (nullable = true)
 |    |-- 1: long (nullable = true)
 |    |-- 10: long (nullable = true)
 |    |-- 100: long (nullable = true)
 |    |-- 1000: long (nullable = true)
 |    |-- 1001: long (nullable = true)
 |    |-- 1002: long (nullable = true)
 |    |-- 1003: long (nullable = true)
 |    |-- 1004: long (nullable = true)
 |    |-- 1005: long (nullable = true)
 |    |-- 1006: long (nullable = true)
 |    |-- 1007: long (nullable = true)
 |    |-- 1008: long (nullable = true)
 |    |-- 1009: long (nullable = true)
 |    |-- 101: long (nullable = true)
 |    |-- 1010: long (nullable = true)
 |    |-- 1011: long (nullable = true)
 |    |-- 1012: long (nullable = true)
 |    |-- 1013: long (nullable = true)
 |    |-- 1014: long (nullable = true)
 |    |-- 1015: long (nullable = true)
 |    |-- 1016: long (nullable = true)
 |    |-- 1017: long (nullable = true)
 |    |-- 1018: long (nullable = true)
 |    |-- 1019: long (nullabl

O installments_sample apresenta as linha em formato de estrutura, dificultando a visuallização dos valores e do schema. Para poder trabalhar com esse dataframe irei dsaclpar os dados de cada elemento da estrutura e gerar um novo dataframe com os dados em diferentes linhas.

In [17]:
from pyspark.sql import functions as F

In [18]:
#coverter as culunas de struct para array
installments_sample = installments_sample.select(
    F.array(F.expr("loan_id.*")).alias("loan_id"),
    F.array(F.expr("due_date.*")).alias("due_date"),
    F.array(F.expr("installment_value.*")).alias("installment_value"),
    F.array(F.expr("number.*")).alias("number")
)

In [19]:
installments_sample.printSchema()

root
 |-- loan_id: array (nullable = false)
 |    |-- element: long (containsNull = true)
 |-- due_date: array (nullable = false)
 |    |-- element: long (containsNull = true)
 |-- installment_value: array (nullable = false)
 |    |-- element: double (containsNull = true)
 |-- number: array (nullable = false)
 |    |-- element: long (containsNull = true)



In [20]:
#novo dataframe com acess direto aos dados
installments_sample = installments_sample.withColumn("installments", F.explode(F.arrays_zip("loan_id", "due_date","installment_value","number")))\
    .select("installments.loan_id", "installments.due_date","installments.installment_value","installments.number")

In [21]:
installments_sample.printSchema()

root
 |-- loan_id: long (nullable = true)
 |-- due_date: long (nullable = true)
 |-- installment_value: double (nullable = true)
 |-- number: long (nullable = true)



In [26]:
installments_sample.show(truncate=False)

+-------+-------------+-----------------+------+
|loan_id|due_date     |installment_value|number|
+-------+-------------+-----------------+------+
|291    |1497225600000|547.92           |1     |
|291    |1499817600000|547.92           |2     |
|291    |1515715200000|547.92           |8     |
|703    |1513900800000|196.36           |6     |
|150    |1517443200000|1033.75          |9     |
|150    |1520121600000|1033.75          |10    |
|150    |1522540800000|1033.75          |11    |
|150    |1525219200000|1033.75          |12    |
|146    |1496275200000|373.74           |1     |
|146    |1498953600000|373.74           |2     |
|146    |1501545600000|373.74           |3     |
|146    |1504224000000|373.74           |4     |
|146    |1506902400000|373.74           |5     |
|146    |1509494400000|373.74           |6     |
|703    |1516579200000|196.36           |7     |
|146    |1512172800000|373.74           |7     |
|146    |1514764800000|373.74           |8     |
|146    |15174432000

In [22]:
print("Total de valores nulos por coluna:")
for col in installments_sample.columns:
  print(" {}={}".format(col, installments_sample.filter(installments_sample[col].isNull() ).count()) )

Total de valores nulos por coluna:
 loan_id=0
 due_date=0
 installment_value=0
 number=0


In [23]:
installments_sample.count()

8838

In [27]:
installments_sample.select("loan_id").distinct().count()

775

Agora podemos verificar que installments_sample apresenta informações como identificador do emprestimo, data de vencimento da parcela, número e valor da parcela . Possui 8838 registros e não possui valores nulos. Esses registros são referentes a um total de 755 emprestimos realizados, tal como payments_sample.


In [30]:
idsPay = payments_sample.select("loan_id").sort("loan_id").distinct().collect()

In [35]:
idsInst = installments_sample.select("loan_id").sort("loan_id").distinct().collect()

In [31]:
idsLoan = loans_sample.select("loan_id").sort("loan_id").distinct().collect()

In [32]:
idsPay = [ x.__getitem__("loan_id") for x in idsPay]

In [33]:
idsLoan = [ x.__getitem__("loan_id") for x in idsLoan]

In [36]:
idsInst = [ x.__getitem__("loan_id") for x in idsInst]

In [34]:
set(idsLoan).difference(set(idsPay))

{2, 96, 127, 296, 369, 383, 389, 440, 455, 507, 542, 692, 785}

In [37]:
set(idsLoan).difference(set(idsInst))

{2, 96, 127, 296, 369, 383, 389, 440, 455, 507, 542, 692, 785}

In [38]:
set(idsPay).difference(set(idsInst))

set()

como pode se verificar acima payments_sample e installments_sample possuem os mesmo loan_id e loans_sample possui os loan_id {2, 96, 127, 296, 369, 383, 389, 440, 455, 507, 542, 692, 785}. Uma vez que a próxima etapa será feito o join dos 3 dataframes pela coluna loan_id esses loan_id excedentes serão descartados.

## Criando loan_documents

Para criar o novo dataframe loan_documents conforme a especificação do teste irei realizar algumas operações de agrupamento e agregação nos dataframes installments_sample e payments_sample para então junta-los ao loans_sample.

In [39]:
from pyspark.sql.types import IntegerType,StringType

In [40]:
#agrupando valores por loan_id e agregando number e due_date no MAP<INT,STRING>
df_installments = installments_sample.groupBy("loan_id").\
    agg(F.collect_list(F.create_map(F.col("number").cast(IntegerType()),
    F.col("due_date").cast(StringType()))).alias("installments"))

In [41]:
df_installments.show(5,truncate=False)

+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|loan_id|installments                                                                                                                                                                                                                                                               |
+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|26     |[[1 -> 1498089600000], [2 -> 1500681600000], [3 -> 1503360000000], [4 -> 1506038400000], [5 -> 1508630400000], [6 -> 1511308800000], [7 -> 1513900800000], [8

In [42]:
df_installments.printSchema()

root
 |-- loan_id: long (nullable = true)
 |-- installments: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: integer
 |    |    |-- value: string (valueContainsNull = true)



In [43]:
#escrevendo resultado do agrupamento e agregação em um arquivo parquet auxiliar
df_installments.write.parquet("temp/install_group.parquet")

In [44]:
#agrupando valores por loan_id e agregando as demais colunas em um array de struct
df_payments = payments_sample.groupBy("loan_id").\
    agg(F.collect_list(F.struct(F.col("payment_id").alias("id"),
    F.col("payment_date"),F.col("payment_method").alias("method"),F.col("paid_amount").alias("amount"))).alias("payments"))

In [45]:
df_payments.printSchema()

root
 |-- loan_id: long (nullable = true)
 |-- payments: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- payment_date: date (nullable = true)
 |    |    |-- method: string (nullable = true)
 |    |    |-- amount: double (nullable = true)



In [47]:
#escrevendo resultado do agrupamento e agregação em um arquivo parquet auxiliar
df_payments.write.parquet("temp/payments_group.parquet")

In [48]:
df_payments.show(2,truncate=False)

+-------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|loan_id|payments                                                                                                                                                                                                                                                                                      

Nesse momento o kernel do jupyter é reiniciado e teste continua carregando os arquivos parquets escritos anteriormente.

In [1]:
#carregando dataframe com as informaçoes de pagamento agregadas
payments = spark.read.parquet("payments_group.parquet")

In [2]:
#carregando dataframe com as informaçoes dos vencimentos das parcelas
installments = spark.read.parquet("install_group.parquet")

In [3]:
#recarregando loan_sample
loans_sample = spark.read.csv("input/loans_sample.csv",header=True, inferSchema=True)

In [4]:
#join dos 3 dataframes por loan_id
loan_doc = loans_sample.join(installments, ["loan_id"]).join(payments,["loan_id"])

In [5]:
#schema do dataframe resultado do join
loan_doc.printSchema()

root
 |-- loan_id: integer (nullable = true)
 |-- _c0: integer (nullable = true)
 |-- period: integer (nullable = true)
 |-- accepted_at: timestamp (nullable = true)
 |-- payday: integer (nullable = true)
 |-- interest_rate: double (nullable = true)
 |-- installments: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: integer
 |    |    |-- value: string (valueContainsNull = true)
 |-- payments: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- payment_date: date (nullable = true)
 |    |    |-- method: string (nullable = true)
 |    |    |-- amount: double (nullable = true)



In [6]:
#escrevendo resultado do join em um arquivo parquet auxiliar
loan_doc.write.parquet("temp/loan_doc.parquet")

In [7]:
#exemplo de linha
loan_doc.head()

Row(loan_id=53, _c0=53, period=12, accepted_at=datetime.datetime(2017, 5, 23, 8, 48, 32, 318000), payday=25, interest_rate=7.55, installments=[{1: u'1498953600000'}, {2: u'1501545600000'}, {3: u'1504224000000'}, {4: u'1506902400000'}, {6: u'1512172800000'}, {9: u'1520121600000'}, {11: u'1525219200000'}, {12: u'1527811200000'}, {10: u'1522540800000'}, {7: u'1514764800000'}, {8: u'1517443200000'}, {5: u'1509494400000'}], payments=[Row(id=u'40018e79-f4da-40cb-a045-2997f22487e3', payment_date=datetime.date(2017, 6, 15), method=u'creditcard', amount=292.36), Row(id=u'a7c87c2a-98f0-4c71-94a3-da5013a658d5', payment_date=datetime.date(2017, 7, 30), method=u'ted', amount=292.36), Row(id=u'177bf69e-f82a-4679-9385-13607366c160', payment_date=datetime.date(2017, 9, 3), method=u'ted', amount=337.36), Row(id=u'71e207b2-fcd4-4b25-9dea-e246244d2957', payment_date=datetime.date(2017, 9, 21), method=u'ted', amount=292.36), Row(id=u'72e63027-5aef-43c5-90d0-6fa7509d3af1', payment_date=datetime.date(2017, 

In [8]:
#carregando parquet com o join dos 3 dataframes
loan = spark.read.parquet("loan_doc.parquet")

In [9]:
#validando número de linhas
loan.select("loan_id").count()

775

o número de loan_id do arquivo parquet auxiliar é igual ao número de loan_id dos dataframes de entrada

In [10]:
# função que irá ler cada linha dataframe juntado 
# e gerar uma nova linha com os valores da forma descrita na especificação de loan_documents
def parser(line):
    from pyspark.sql import Row
    from collections import defaultdict
    from datetime import datetime
    
    row = defaultdict() # dicionario que ira armazenar os elementos da nova linha que será retornada
    
    #campos que não serão transformados
    row["loan_id"] = line["loan_id"]
    row["period"] = line["period"]
    row["payday"] = line["payday"]
    row["accepted_at"] = line["accepted_at"]
    row["interest_rate"] = line["interest_rate"]
    
    #criar instalments no formato Map
    instDict = defaultdict() 
    vencimentos= [] # lista com as datas de vencimentos das parcelas
    for install in line["installments"]:
        date = datetime.fromtimestamp(long(install.values()[0])/1000.0) #convert timestamp milisegundos para datetime
        vencimentos.append(date) #insere data de vencimento em datetime
        instDict[install.keys()[0]] = date.strftime("%Y-%m-%d") #converte para string formato ano-mes-dia
    row["installments"] = instDict
    
    

        
    vencimentos.sort() #ordena as datas de vencimento
    
    #converter payment_date para string formato ano-mes-dia e pegar datas de pagamentos a serem usadas em metrics
    row["payments"] = line["payments"]
    p = [] 
    pagamentos = [] # lista das datas de pagamentos
    for pay in row["payments"]:
        pagamentos.append(pay["payment_date"])
        p.append(Row(id=pay["id"],
      payment_date=pay["payment_date"].strftime("%Y-%m-%d"),method=pay["method"],amount=pay["amount"])
        )
    row["payments"] = p
    
    #metrics
    pagamentos.sort() #ordena data de pagamentops
    days = (datetime.now() - line["accepted_at"] ).days # total de dias da data de aceitação até hoje
    latency = [False]*days #inicializa array de tamanho days latency com false
    over30 = [False]*days #inicializa array de tamanho days over30 com false
    
    
   
    
    venc_size = len(vencimentos) #total de datas de vencimento
    pag_size = len(pagamentos) #total de datas de pagamento
    
    #Assumindo que não há mais pagamentos do que numero de parelas, caso contrario lança exceção
    if(pag_size > venc_size ):
        raise ValueError('Numero de pagamentos maior que o de parcelas')
        
    # verifica se há data de pagamento maior que a data atual
#     if(datetime.now() - datetime.combine(pagamentos[-1], datetime.min.time())).days < 0:
#         raise ValueError('data de pagamento maior que hoje( {}): loan_id = {}'\
#                          .format(pagamentos[-1].strftime("%Y-%m-%d"),line["loan_id"], ))
        
    
    for v,p in zip(vencimentos,pagamentos): #faz um par das datas de vencimento e pagamento ordenadas
        venc_index = (v -line["accepted_at"] ).days #posicao da data de vencimento no array
        #posicao da data de pagamento no array
        pg = datetime.combine(p, datetime.min.time())
        pag_index = (pg - line["accepted_at"] ).days
        
        pag_index = min(pag_index,days) #tratamento para data de pagamentos maiores que a data atual
        
        for index in range(venc_index,pag_index):# marca como true do dia posterior a data de vencimento até o dia anterior a data de pagamento
            latency[index]=True
            
        for index in range(venc_index+30,pag_index): #marca como true a apartir de 30 dias da data de vencimento até a data de pagamento
            over30[index]=True
    
    #Nos casos que o numero de pagamentos é inferior ao total de parcelas existentes 
    # preenche do dia da primeira parcela não até o dia atual como true
    if( venc_size > pag_size):
        venc_index = (vencimentos[pag_size] - line["accepted_at"]).days
        for index in range(venc_index,days):
            latency[index]=True
        for index in range(venc_index+30,days):
            over30[index]=True
            
    row["metrics"] = Row(latency=latency,over30=over30)
            
    return Row(**row)
    
        
        
    

In [11]:
#faz parser das linhas e adiciona metrics
loan_parsed = loan.rdd.map(parser).toDF()

In [12]:
loan_parsed.printSchema()

root
 |-- accepted_at: timestamp (nullable = true)
 |-- installments: map (nullable = true)
 |    |-- key: long
 |    |-- value: string (valueContainsNull = true)
 |-- interest_rate: double (nullable = true)
 |-- loan_id: long (nullable = true)
 |-- metrics: struct (nullable = true)
 |    |-- latency: array (nullable = true)
 |    |    |-- element: boolean (containsNull = true)
 |    |-- over30: array (nullable = true)
 |    |    |-- element: boolean (containsNull = true)
 |-- payday: long (nullable = true)
 |-- payments: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- amount: double (nullable = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- method: string (nullable = true)
 |    |    |-- payment_date: string (nullable = true)
 |-- period: long (nullable = true)



In [13]:
loan_parsed.show()

+--------------------+--------------------+-------------+-------+--------------------+------+--------------------+------+
|         accepted_at|        installments|interest_rate|loan_id|             metrics|payday|            payments|period|
+--------------------+--------------------+-------------+-------+--------------------+------+--------------------+------+
|2017-05-23 08:48:...|[1 -> 2017-07-01,...|         7.55|     53|[[false, false, f...|    25|[[292.36, 40018e7...|    12|
|2017-04-30 18:46:...|[1 -> 2017-06-11,...|         7.49|    171|[[false, false, f...|     5|[[439.84, 2680a71...|     9|
|2017-05-11 20:11:...|[1 -> 2017-06-21,...|         7.55|    312|[[false, false, f...|    15|[[582.65, 896568e...|    12|
|2017-05-24 04:35:...|[1 -> 2017-07-11,...|         3.12|    466|[[false, false, f...|     5|[[916.97, b395237...|    12|
|2017-06-07 18:29:...|[1 -> 2017-07-11,...|         3.12|    747|[[false, false, f...|     5|[[517.13, 053a125...|    12|
|2017-06-06 10:57:...|[1

Como se pode observar no schema depois do parser ainda é necessário fazer algums conversções de tipo e por em ordem as colunas

In [18]:
#gerando dataframe loan_documents
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType,StringType,MapType
loan_documents = loan_parsed.select(F.col("loan_id").cast(IntegerType()),F.col("period").cast(IntegerType()),\
         F.col("accepted_at"),F.col("payday").cast(IntegerType()),F.col("interest_rate"),\
         F.col("installments").cast(MapType(IntegerType(),StringType() )),\
        F.col("payments"),F.col("metrics"))

In [19]:
#escrevendo loan_documents em um parquet de saida
loan_documents.write.parquet("output/loan_documents.parquet")

## Validação

In [20]:
pagamentos2019 = loan_documents.select(F.explode("payments").alias("payments") )\
    .filter(F.year("payments.payment_date") == 2019)\
    .groupBy(F.month("payments.payment_date").alias("month"),F.year("payments.payment_date").alias("year"))\
    .agg(F.sum("payments.amount").alias("amount")).sort("month")

In [21]:
#total do valor pago a cada mês de 2019
pagamentos2019.show()

+-----+----+------+
|month|year|amount|
+-----+----+------+
|    1|2019|377.88|
|    2|2019|377.88|
|    3|2019|420.88|
|    4|2019|755.76|
|    5|2019|770.49|
|    6|2019|392.61|
|    8|2019|392.61|
+-----+----+------+

