# Validate Records

In [1]:
import findspark
findspark.init()
import pyspark
import pyspark.sql.functions as F

from pyspark.sql.types import *

import duckdb

In [2]:
spark = pyspark.sql.SparkSession.builder.appName("Python").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/20 16:08:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/08/20 16:08:53 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


## Load records CSV

In [3]:
schema = StructType([
    StructField("reference", IntegerType(), False), 
    StructField("account_number", StringType(), False),
    StructField("description", StringType(), False),
    StructField("start_balance", FloatType(), False),
    StructField("mutation", FloatType(), False),
    StructField("end_balance", FloatType(), False)
])

In [4]:
records = (
    spark.read
    .option("encoding", "ISO-8859-1")
    .schema(schema)
    .csv("../assignment/records 1.csv")
)
records.show()

+---------+------------------+--------------------+-------------+--------+-----------+
|reference|    account_number|         description|start_balance|mutation|end_balance|
+---------+------------------+--------------------+-------------+--------+-----------+
|     NULL|    Account Number|         Description|         NULL|    NULL|       NULL|
|   176104|NL32RABO0195610843|Flowers for Peter...|       101.84|   13.76|      115.6|
|   112806|NL69ABNA0433647324|Subscription from...|         10.2|   -47.4|      -37.2|
|   109169|NL91RABO0315273637|Tickets from Vinc...|        80.53|  -23.66|      56.87|
|   156539|NL43AEGO0773393871|Candy for Jan Bakker|        88.22|    3.82|      92.04|
|   112806|NL32RABO0195610843|Flowers for Wille...|        16.59|   -7.37|       9.22|
|   112806|NL69ABNA0433647324|Flowers for Danië...|         37.1|   28.81|      65.91|
|   178261|NL27SNSB0917829871|Subscription from...|        50.75|  -21.65|       29.1|
|   100723|NL93ABNA0585619023|Tickets from 

In [5]:
# step not necessarily needed
records.printSchema()

root
 |-- reference: integer (nullable = true)
 |-- account_number: string (nullable = true)
 |-- description: string (nullable = true)
 |-- start_balance: float (nullable = true)
 |-- mutation: float (nullable = true)
 |-- end_balance: float (nullable = true)



## Validations
1. all transaction references should be unique
2. the end balance needs to be validated

In [6]:
# all transaction references should be unique
reference_count = records.groupby("reference").count().withColumnRenamed("count", "reference_count")

In [7]:
# the end balance needs to be validated
records = records.withColumn("validated_end_balance", F.round(F.col("start_balance") + F.col("mutation"), 2))

In [8]:
records = records.join(reference_count, "reference", "left")

## Connect to Database

In [9]:
con = duckdb.connect("../database/records.db")

### Insert validated records into records table

In [10]:
validated = records.where((records.reference_count == 1) & (records.validated_end_balance == records.end_balance))

# DuckDB Spark API is still in experimental phase, therefore conversion to pandas is needed
validated_df = validated.select("reference", "account_number", "description", "start_balance", "mutation", "end_balance").toPandas()

In [11]:
con.sql("insert into validated_records select * from validated_df on conflict do nothing")
con.sql("select * from validated_records")

┌───────────┬────────────────────┬───────────────────────────────┬───────────────┬──────────┬─────────────┐
│ reference │   account_number   │          description          │ start_balance │ mutation │ end_balance │
│   int32   │      varchar       │            varchar            │     float     │  float   │    float    │
├───────────┼────────────────────┼───────────────────────────────┼───────────────┼──────────┼─────────────┤
│    176104 │ NL32RABO0195610843 │ Flowers for Peter King        │        101.84 │    13.76 │       115.6 │
│    109169 │ NL91RABO0315273637 │ Tickets from Vincent de Vries │         80.53 │   -23.66 │       56.87 │
│    156539 │ NL43AEGO0773393871 │ Candy for Jan Bakker          │         88.22 │     3.82 │       92.04 │
│    178261 │ NL27SNSB0917829871 │ Subscription from Rik Dekker  │         50.75 │   -21.65 │        29.1 │
│    100723 │ NL93ABNA0585619023 │ Tickets from Daniël de Vries  │         77.91 │    47.13 │      125.04 │
│    145361 │ NL93ABNA058561

### Insert failed records into invalid records table

In [12]:
invalid = records.where((records.reference_count > 1) | (records.validated_end_balance != records.end_balance))

# DuckDB Spark API is still in experimental phase, therefore conversion to pandas is needed
invalid_df = invalid.select("reference", "account_number", "description", "start_balance", "mutation", "end_balance").toPandas()

In [13]:
con.sql("insert into invalid_records select * from invalid_df")
con.sql("select * from invalid_records")

┌───────────┬────────────────────┬─────────────────────────────┬───────────────┬──────────┬─────────────┐
│ reference │   account_number   │         description         │ start_balance │ mutation │ end_balance │
│   int32   │      varchar       │           varchar           │     float     │  float   │    float    │
├───────────┼────────────────────┼─────────────────────────────┼───────────────┼──────────┼─────────────┤
│    112806 │ NL69ABNA0433647324 │ Subscription from Rik Theuß │          10.2 │    -47.4 │       -37.2 │
│    112806 │ NL32RABO0195610843 │ Flowers for Willem de Vries │         16.59 │    -7.37 │        9.22 │
│    112806 │ NL69ABNA0433647324 │ Flowers for Daniël de Vries │          37.1 │    28.81 │       65.91 │
└───────────┴────────────────────┴─────────────────────────────┴───────────────┴──────────┴─────────────┘

### Close Database Connection

In [14]:
con.close()

24/08/20 16:09:04 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
