<a href="https://colab.research.google.com/github/joaohsr/spark-challenge/blob/master/Spark%20Challenge.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-eu.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark
!pip install -q pyspark

In [0]:
import os

os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [0]:
from pyspark import SparkFiles

In [0]:
import pandas as pd

In [0]:
spark.sparkContext.addFile("https://noverde-data-engineering-test.s3.amazonaws.com/loans_sample.csv")

In [0]:
spark.sparkContext.addFile("https://noverde-data-engineering-test.s3.amazonaws.com/installments_sample.json")

In [0]:
spark.sparkContext.addFile("https://noverde-data-engineering-test.s3.amazonaws.com/payments_sample.parquet")

In [0]:
loans = spark.read.csv(SparkFiles.get('loans_sample.csv'), header=True)

In [0]:
payments = spark.read.parquet(SparkFiles.get('payments_sample.parquet'))

As I cannot build a solution using Spark to read json file schema from "schema" field, I decided to use pandas.

In [0]:
installments = spark.createDataFrame(pd.read_json(SparkFiles.get('installments_sample.json'), orient='table'))

In [0]:
from pyspark.sql import functions as F

In [0]:
installments = installments.orderBy(['loan_id', 'number'])

In [0]:
new_installments = installments.withColumnRenamed('loan_id', 'id')

In [0]:
new_installments = new_installments.groupBy('id').agg(F.collect_list(F.create_map('number', 'due_date')).alias('installments'))

In [0]:
loan_documents = loans.join(new_installments, loans.loan_id == new_installments.id, how='inner').drop('id')

In [0]:
new_payments = payments.withColumnRenamed('loan_id', 'id')

In [0]:
new_payments = new_payments.groupBy("id").agg(F.collect_list(F.struct([new_payments.payment_id, new_payments.payment_date, new_payments.payment_method, new_payments.paid_amount])).alias("payments"))

In [0]:
new_payments.printSchema()

root
 |-- id: long (nullable = true)
 |-- payments: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- payment_id: string (nullable = true)
 |    |    |-- payment_date: date (nullable = true)
 |    |    |-- payment_method: string (nullable = true)
 |    |    |-- paid_amount: double (nullable = true)



In [0]:
str_schema = "array<struct<id:string,payment_date:date,method:string,amount:double>>"

In [0]:
new_payments = new_payments.withColumn("payments", new_payments["payments"].cast(str_schema))

In [0]:
new_payments.printSchema()

root
 |-- id: long (nullable = true)
 |-- payments: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- payment_date: date (nullable = true)
 |    |    |-- method: string (nullable = true)
 |    |    |-- amount: double (nullable = true)



In [0]:
loan_documents = loan_documents.join(new_payments,loan_documents.loan_id == new_payments.id).drop('id')

In [0]:
loan_documents.printSchema()

root
 |-- loan_id: string (nullable = true)
 |-- period: string (nullable = true)
 |-- accepted_at: string (nullable = true)
 |-- payday: string (nullable = true)
 |-- interest_rate: string (nullable = true)
 |-- installments: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: long
 |    |    |-- value: timestamp (valueContainsNull = true)
 |-- payments: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- payment_date: date (nullable = true)
 |    |    |-- method: string (nullable = true)
 |    |    |-- amount: double (nullable = true)



In [0]:
from pyspark.sql.types import StructType, StructField, ArrayType, BooleanType

In [0]:
#str_schema = StructType([
                         #StructField('latency', ArrayType(BooleanType(), True), True),
                         #StructField('over30', ArrayType(BooleanType(), True) , True)
                         #])

In [0]:
#loan_documents = loan_documents.withColumn('metrics', F.lit(None).cast(str_schema))

In [0]:
loan_documents.printSchema()

root
 |-- loan_id: string (nullable = true)
 |-- period: string (nullable = true)
 |-- accepted_at: string (nullable = true)
 |-- payday: string (nullable = true)
 |-- interest_rate: string (nullable = true)
 |-- installments: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: long
 |    |    |-- value: timestamp (valueContainsNull = true)
 |-- payments: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- payment_date: date (nullable = true)
 |    |    |-- method: string (nullable = true)
 |    |    |-- amount: double (nullable = true)



In [0]:
from pyspark.sql.window import Window

In [0]:
w = Window.partitionBy(payments.loan_id).orderBy(['loan_id', 'payment_date'])

In [0]:
payments = payments.withColumn("number", F.row_number().over(w)).sort("payment_date")

In [0]:
loan_documents.createOrReplaceTempView('loan_documents')

loan = spark.sql("SELECT loan_id as l_id, sequence(to_date(accepted_at), current_date(), interval 1 day) as i_date from loan_documents").withColumn("i_date", F.explode(F.col("i_date")))

loan = loan.join(installments, on=[loan.l_id == installments.loan_id, loan.i_date == installments.due_date], how='left').orderBy('i_date').drop('loan_id', 'installment_value')

loan = loan.withColumn('i_number', F.when(loan.number.isNotNull(), loan.number).otherwise(F.last(loan.number, ignorenulls=True).over(Window.partitionBy('l_id').orderBy('i_date')))).drop('number')
loan = loan.withColumn('new_due_date', F.when(loan.due_date.isNotNull(), loan.due_date).otherwise(F.last(loan.due_date, ignorenulls=True).over(Window.partitionBy('l_id').orderBy('i_date'))))

loan = loan.drop('due_date')
loan = loan.withColumnRenamed('new_due_date', 'due_date')

loan_payments = spark.sql("SELECT loan_id as p_id, sequence(to_date(accepted_at), current_date(), interval 1 day) as p_date from loan_documents").withColumn("p_date", F.explode(F.col("p_date")))

loan_payments = loan_payments.join(payments, on=[loan_payments.p_id == payments.loan_id, loan_payments.p_date == payments.payment_date], how='left').orderBy('p_date').drop('loan_id', 'installment_id', 'payment_method', 'payment_id', 'paid_amount')
loan_payments = loan_payments.withColumn('p_number', F.when(loan_payments.number.isNotNull(), loan_payments.number).otherwise(F.last(loan_payments.number, ignorenulls=True).over(Window.partitionBy('p_id').orderBy('p_date')))).drop('number')
loan_payments = loan_payments.withColumn('new_payment_date', F.when(loan_payments.payment_date.isNotNull(), loan_payments.payment_date).otherwise(F.last(loan_payments.payment_date, ignorenulls=True).over(Window.partitionBy('p_id').orderBy('p_date'))))
                                                                                                                                                                                                            
loan_payments = loan_payments.drop('payment_date')
loan_payments = loan_payments.withColumnRenamed('new_payment_date', 'payment_date')

df = loan.join(loan_payments, on=[loan.l_id == loan_payments.p_id, loan.i_date == loan_payments.p_date], how='inner').orderBy('i_date').drop('p_id', 'p_date')
df = df.withColumnRenamed('i_date', 'date')
df = df.withColumnRenamed('l_id', 'id')

df = df.fillna({'i_number': 0, 'p_number': 0})

In [0]:
def calculate_latency(i_number, p_number):
  return F.when(i_number > p_number, True).otherwise(False)

In [0]:
def calculate_over30(i_number, p_number):
  return F.when(((df.i_number - df.p_number) > 1), True).otherwise(False)

In [0]:
latency = F.udf(lambda row: calculate_latency(), BooleanType())
over30 = F.udf(lambda row: calculate_over30(), BooleanType())

In [0]:
df = df.withColumn('latency', calculate_latency(df.i_number, df.p_number))

In [0]:
df = df.withColumn('over30', calculate_over30(df.i_number, df.p_number))

In [0]:
df.createOrReplaceTempView("df")

In [0]:
metrics = spark.sql('select id, struct(collect_list(latency) as latency, collect_list(over30) as over30) as metrics from df group by id')

In [0]:
str_schema = StructType([
                         StructField('latency', ArrayType(BooleanType(), True), True),
                         StructField('over30', ArrayType(BooleanType(), True) , True)
                         ])

In [0]:
loan_documents = loan_documents.join(metrics, loan_documents.loan_id == metrics.id, how='inner').drop('id')

In [0]:
#loan_documents.show(5)

In [0]:
loan_documents.printSchema()

root
 |-- loan_id: string (nullable = true)
 |-- period: string (nullable = true)
 |-- accepted_at: string (nullable = true)
 |-- payday: string (nullable = true)
 |-- interest_rate: string (nullable = true)
 |-- installments: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: long
 |    |    |-- value: timestamp (valueContainsNull = true)
 |-- payments: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- payment_date: date (nullable = true)
 |    |    |-- method: string (nullable = true)
 |    |    |-- amount: double (nullable = true)
 |-- metrics: struct (nullable = false)
 |    |-- latency: array (nullable = true)
 |    |    |-- element: boolean (containsNull = true)
 |    |-- over30: array (nullable = true)
 |    |    |-- element: boolean (containsNull = true)



In [0]:
loan_documents.write.parquet('output/loan_documents')

# TODO


1. Create dataframes with the report's data
4. Plot data using seaborn library
3. Data cleansing (validate data types, empty rows, check for duplicated data) on pre-processing
5. Save parquet file partitioned by accepted_at's year

