In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.window import Window
from pyspark.sql.types import *
from pyspark.sql import Row
from datetime import datetime
import pyspark.sql.functions as F
import os
import json

In [2]:
spark = SparkSession.builder.appName('x').getOrCreate()

In [3]:
lines = sc.textFile('file:///root/notebooks/installments_sample.json')
loanDF = spark.read.csv('file:///root/notebooks/loans_sample.csv',header=True)
paymentsDF = spark.read.parquet('file:///root/notebooks/payments_sample.parquet')

In [4]:
def to_list(a):
    return [a]
def append(a,b):
    a.append(b)
    return a
def extend(a,b):
    a.extend(b)
    return a
def loadsJson():
    loan = [x for x in json.loads(lines.collect()[0])['loan_id'].items()]
    number = [x for x in json.loads(lines.collect()[0])['number'].items()]
    due_date = [x for x in json.loads(lines.collect()[0])['due_date'].items()]
    installment_value = [x for x in json.loads(lines.collect()[0])['installment_value'].items()]
    return loan,number,due_date,installment_value
def schemaJson():
    schemaLoan = StructType([\
        StructField("id", StringType()),\
        StructField("loan_id", IntegerType(),True)])

    schemaNumber = StructType([\
        StructField("id", StringType(),True),\
        StructField("qtd", IntegerType(),True)])

    schemaDate = StructType([\
        StructField("id", StringType(),True),\
        StructField("due_date", StringType(),True)])

    schemaValue = StructType([\
        StructField("id", StringType(),True),\
        StructField("installment_value", FloatType(),True)])
    return schemaLoan, schemaNumber, schemaDate, schemaValue
def schemaPayments():
    columns_struct_fields = list()
    columns_struct_fields.append(StructField("loan_id",StringType(), True))
    columns_struct_fields.append(StructField("payments",ArrayType(StructType([
                                    StructField("id",StringType(),True),
                                    StructField("payment_date",StringType(),True),
                                    StructField("method",StringType(), True),
                                    StructField("amount",DoubleType(), True)
                                    ]))))
    schema = StructType(columns_struct_fields)
    return schema
def schemaInstallments():
    columns_struct_fields = list()
    columns_struct_fields.append(StructField("loan_id",StringType(), True))
    columns_struct_fields.append(StructField("installments",ArrayType(StructField("element",\
                                             MapType(IntegerType(),StringType()), True))))
    schema = StructType(columns_struct_fields)
    return schema

def createDFs(*items):
    dfs = list()
    schemaLoan, schemaNumber, schemaDate, schemaValue = schemaJson()
    dfs.append(spark.createDataFrame(items[0],schemaJson()[0]))
    dfs.append(spark.createDataFrame(items[1],schemaJson()[1]))
    dfs.append(spark.createDataFrame(items[2],schemaJson()[2]))
    dfs.append(spark.createDataFrame(items[3],schemaJson()[3]))
    return dfs
def installmentsFormat(key,value):
    dic = dict()
    dic[int(key)] = value
    return dic
def paymentsFormat(*cols):
    return (cols[0],str(cols[1]),cols[2],cols[3])
def calcDays(date0,date1):
    return (date0 - date1).days
def initMetrics(date):
    import datetime
    latency = [False]
    over = [False]
    lastDay = datetime.datetime(2019, 9, 3)
    result = (lastDay  - date).days
    latency *= result
    over *= result
    return (latency,over)

def calcMetricLatency(metrics,installments,payments,accepted_at):
    installments = sorted(installments, key = lambda i: i[list(i.keys())[0]])
    payments = sorted(a,key= lambda i: i.payment_date)
    while payments:
        dtInstall =  list(installments.pop().values())[0][:-3]
        dtPayments = payments.pop().payment_date
        
        dtPayments = datetime.strptime(dtPayments, '%Y-%m-%d')
        dtInstall = datetime.fromtimestamp(eval(dtInstall))
        day = (dtInstall - dtPayments).days
        if day > 0:
            metrics.latency[day] = True
    return (metrics.latency,metrics.over)
calcMetricLatencyUDF = F.udf(calcMetricLatency, StructType([
                                            StructField("latency",ArrayType(BooleanType())),\
                                            StructField("over",ArrayType(BooleanType()))
                                              ])) 

initMetricsUDF = F.udf(initMetrics,StructType([StructField("latency",ArrayType(BooleanType())),\
                                   StructField("over",ArrayType(BooleanType()))
                                  ]))


paymentsFormatUDF = F.udf(paymentsFormat,StructType([
                                            StructField("id",StringType(),True),
                                            StructField("payment_date",StringType(),True),
                                            StructField("method",StringType(), True),
                                            StructField("amount",DoubleType(), True)
                                            ]))
installmentsFormatUDF = F.udf(installmentsFormat,MapType(IntegerType(),StringType()))

In [5]:
dfs = list()
dfs.append(spark.createDataFrame(loadsJson()[0],schemaJson()[0]))
dfs.append(spark.createDataFrame(loadsJson()[1],schemaJson()[1]))
dfs.append(spark.createDataFrame(loadsJson()[2],schemaJson()[2]))
dfs.append(spark.createDataFrame(loadsJson()[3],schemaJson()[3]))
installmentsDF = dfs[0].join(dfs[1],'id').join(dfs[2],'id').join(dfs[3],'id')

In [6]:
loanDF = loanDF.drop('_c0')

In [7]:
installmentsDF.printSchema()
loanDF.printSchema()
paymentsDF.printSchema()

root
 |-- id: string (nullable = true)
 |-- loan_id: integer (nullable = true)
 |-- qtd: integer (nullable = true)
 |-- due_date: string (nullable = true)
 |-- installment_value: float (nullable = true)

root
 |-- loan_id: string (nullable = true)
 |-- period: string (nullable = true)
 |-- accepted_at: string (nullable = true)
 |-- payday: string (nullable = true)
 |-- interest_rate: string (nullable = true)

root
 |-- loan_id: long (nullable = true)
 |-- payment_date: date (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- payment_id: string (nullable = true)
 |-- paid_amount: double (nullable = true)



In [8]:
installmentsDF.show(5)
loanDF.show(5,False)
paymentsDF.show(5)

+----+-------+---+-------------+-----------------+
|  id|loan_id|qtd|     due_date|installment_value|
+----+-------+---+-------------+-----------------+
|1090|    151| 10|1520121600000|           233.81|
|1159|    170| 10|1519257600000|           261.29|
|1436|    202| 10|1519257600000|           494.02|
|1512|    216|  5|1506902400000|           394.18|
|1572|    264|  5|1508630400000|           505.79|
+----+-------+---+-------------+-----------------+
only showing top 5 rows

+-------+------+--------------------------+------+--------------------+
|loan_id|period|accepted_at               |payday|interest_rate       |
+-------+------+--------------------------+------+--------------------+
|0      |12    |2017-05-19 10:09:47.285105|25    |3.120000000000000000|
|1      |12    |2017-05-18 20:27:41.197904|25    |7.550000000000000000|
|2      |12    |2017-05-18 22:04:41.276742|25    |7.550000000000000000|
|3      |12    |2017-05-17 23:47:24.822334|5     |7.550000000000000000|
|4      |9  

# Format Loan

In [9]:
loanDF = loanDF.withColumn('accepted_at',F.unix_timestamp('accepted_at').cast('timestamp')).\
        withColumn('interest_rate',F.col('interest_rate').cast('double')).\
        withColumn('payday',F.col('payday').cast('int'))

# Format Installments

In [10]:
installmentsDF = installmentsDF.withColumn('installments',installmentsFormatUDF(F.col('id'),F.col('due_date')))

In [11]:
installmentsDF = installmentsDF.select('loan_id','installments')

installmentsDF = installmentsDF.rdd.map(lambda x: (x.loan_id,x.installments)).combineByKey(to_list, append, extend)
installments = installmentsDF.collect()
installmentsDF = spark.createDataFrame(installments,['loan_id','installments']).\
                withColumn('loan_id',F.col('loan_id').cast(StringType()))

In [12]:
installmentsDF.printSchema()
installmentsDF.show(2)

root
 |-- loan_id: string (nullable = true)
 |-- installments: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: long
 |    |    |-- value: string (valueContainsNull = true)

+-------+--------------------+
|loan_id|        installments|
+-------+--------------------+
|    600|[Map(4838 -> 1504...|
|      0|[Map(2587 -> 1512...|
+-------+--------------------+
only showing top 2 rows



# Format Payments

In [13]:
paymentsDF = paymentsDF.withColumn('x',paymentsFormatUDF(F.col('payment_id'),
                                           F.col('payment_date'),
                                           F.col('payment_method'),
                                           F.col('paid_amount'),
                                          )).select('loan_id','x')
paymentsDF = paymentsDF.rdd.combineByKey(to_list, append, extend)
payments = paymentsDF.collect()
paymentsDF = spark.createDataFrame(payments,schemaPayments())

In [14]:
paymentsDF.printSchema()
paymentsDF.show(2)

root
 |-- loan_id: string (nullable = true)
 |-- payments: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- payment_date: string (nullable = true)
 |    |    |-- method: string (nullable = true)
 |    |    |-- amount: double (nullable = true)

+-------+--------------------+
|loan_id|            payments|
+-------+--------------------+
|    291|[[ed5a489a-bb07-4...|
|    670|[[439ba70a-c831-4...|
+-------+--------------------+
only showing top 2 rows



# Create - loan_documents

In [15]:
print (loanDF.count())
print(loanDF.select('loan_id').distinct().count())

print (paymentsDF.count())
print(paymentsDF.select('loan_id').distinct().count())

print (installmentsDF.count())
print(installmentsDF.select('loan_id').distinct().count())

788
788
775
775
775
775


In [16]:
installmentsDF = installmentsDF.select('loan_id','installments')

loan_documentsDF = loanDF.join(installmentsDF,'loan_id').join(paymentsDF,'loan_id').cache()
loan_documentsDF = loan_documentsDF.withColumn('metrics',initMetricsUDF(F.col('accepted_at'))).cache()

In [17]:
installmentsDF = installmentsDF.select('loan_id','installments')

In [18]:
loan_documentsDF = loanDF.join(installmentsDF,'loan_id').join(paymentsDF,'loan_id').cache()

In [19]:
loan_documentsDF.printSchema()
loan_documentsDF.limit(3).toPandas()

root
 |-- loan_id: string (nullable = true)
 |-- period: string (nullable = true)
 |-- accepted_at: timestamp (nullable = true)
 |-- payday: integer (nullable = true)
 |-- interest_rate: double (nullable = true)
 |-- installments: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: long
 |    |    |-- value: string (valueContainsNull = true)
 |-- payments: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- payment_date: string (nullable = true)
 |    |    |-- method: string (nullable = true)
 |    |    |-- amount: double (nullable = true)



Unnamed: 0,loan_id,period,accepted_at,payday,interest_rate,installments,payments
0,467,12,2017-05-23 11:33:11,15,7.55,"[{3517: '1498089600000'}, {3526: '151925760000...","[(358aa6f7-7c5f-4e1e-a4a8-97a38fe40020, 2017-0..."
1,675,12,2017-06-07 12:18:06,15,7.55,"[{5523: '1521676800000'}, {5520: '151390080000...","[(0d531144-694f-4227-ac9d-4e00bbc4c969, 2017-0..."
2,691,12,2017-06-05 00:51:46,25,7.55,"[{296: '1498953600000'}, {6582: '1504224000000...","[(35fddc8b-cb4c-4034-844c-91eb71576136, 2017-0..."


In [None]:
loan_documentsDF = loan_documentsDF.withColumn('metrics',calcMetricLatencyUDF(F.col('metrics'),
                                                    F.col('installments'),
                                                    F.col('payments'),
                                                    F.col('accepted_at')))