In [1]:
!pip install delta-spark==3.1.0

Collecting delta-spark==3.1.0
  Downloading delta_spark-3.1.0-py3-none-any.whl.metadata (1.9 kB)
Collecting py4j==0.10.9.7 (from pyspark<3.6.0,>=3.5.0->delta-spark==3.1.0)
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl.metadata (1.5 kB)
Downloading delta_spark-3.1.0-py3-none-any.whl (21 kB)
Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: py4j, delta-spark
Successfully installed delta-spark-3.1.0 py4j-0.10.9.7


In [3]:
from delta import *
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *


builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [16]:
#check if any duplicate records
df = spark.sql("""
  SELECT originationId, 
      count(originationId) noOfDuplicates, 
      'fact_originations' table
  FROM delta.`/data/delta/fact_originations/` fo
  GROUP BY originationId
  HAVING COUNT(originationId) > 1
  UNION ALL
  SELECT paymentId, 
      count(paymentId) noOfDuplicates, 
      'fact_payments' table
  FROM delta.`/data/delta/fact_payments/`
  GROUP BY paymentId
  HAVING COUNT(paymentId) > 1
""")
df.show()

+--------------------+--------------------+-----------------+
|       originationId|count(originationId)|            table|
+--------------------+--------------------+-----------------+
|1b7d38015747cf95f...|                   2|fact_originations|
|0aa4ed0bd95f5c014...|                   2|    fact_payments|
+--------------------+--------------------+-----------------+



In [13]:
#check if all originations have installments
df = spark.sql("""
  SELECT fo.originationId, fo.clientId, fo.registerDate, fo.value, fi.installmentValue
  FROM delta.`/data/delta/fact_originations/` fo
  LEFT JOIN delta.`/data/delta/fact_installments/` fi
  ON fo.originationId = fi.originationId
  WHERE fi.installmentValue IS NULL
""")
df.show()

+-------------+--------+------------+-----+----------------+
|originationId|clientId|registerDate|value|installmentValue|
+-------------+--------+------------+-----+----------------+
+-------------+--------+------------+-----+----------------+



In [12]:
#check if all origination amount and total installment amount match
df = spark.sql("""
  SELECT fo.originationId, fo.clientId, fo.registerDate, fo.value, SUM(fi.installmentValue) installmentValue
  FROM delta.`/data/delta/fact_originations/` fo
  LEFT JOIN delta.`/data/delta/fact_installments/` fi
  ON fo.originationId = fi.originationId
  GROUP BY fo.originationId, fo.clientId, fo.registerDate, fo.value
  HAVING fo.value != SUM(fi.installmentValue)
""")
df.show()

+--------------------+--------------------+------------+------------+----------------+
|       originationId|            clientId|registerDate|       value|installmentValue|
+--------------------+--------------------+------------+------------+----------------+
|1b7d38015747cf95f...|c67d5489eff1b9adf...|  2021-06-27|149.97000000|    299.94000000|
|1b7d38015747cf95f...|                  55|  2021-06-27|149.97000000|    299.94000000|
+--------------------+--------------------+------------+------------+----------------+

