In [1]:
from pyspark.sql import SparkSession
import utils as utils
import spark_utils as spark_utils
#.master("spark://spark-master:7077") \

problem_data_path = 'problematic_transactions.csv'

utils.verify_path(problem_data_path)
import os
os.getcwd()

Path exists!


'/home/jovyan/work'

In [2]:
# Load data: 
jdbc_url = "jdbc:postgresql://psql-postgres-1:5432/mydatabase"
table_name = "transactions"
user = "myuser"
password = "mysecretpassword"

psql_df = spark_utils.get_psql_data(jdbc_url, table_name, user, password)
psql_df.show()

csv_file_path = 'problematic_transactions.csv'

csv_df = spark_utils.get_csv_data(csv_file_path)
csv_df.show()


+--------------+-------+------+--------+-------------------+
|transaction_id|user_id|amount|    type|          timestamp|
+--------------+-------+------+--------+-------------------+
|    1704169733|      9|361.83|transfer|2024-01-01 20:28:53|
|    1706848193|     77| 55.28|transfer|2024-02-01 20:29:53|
|    1709353853|     69| 82.96|purchase|2024-03-01 20:30:53|
|    1712028713|     88| 36.10|transfer|2024-04-01 20:31:53|
|    1714620773|     22| 94.44|transfer|2024-05-01 20:32:53|
|    1717299233|     73|408.42|purchase|2024-06-01 20:33:53|
|    1719891293|     64|954.82|transfer|2024-07-01 20:34:53|
|    1722569753|     59|410.94|purchase|2024-08-01 20:35:53|
|    1725248213|     16|797.11|purchase|2024-09-01 20:36:53|
|    1727840273|     42|802.94|purchase|2024-10-01 20:37:53|
|    1730518733|     43| 35.93|transfer|2024-11-01 20:38:53|
|    1733114393|     69|555.90|  refund|2024-12-01 20:39:53|
|    1704170453|     55| 15.16|transfer|2024-01-01 20:40:53|
|    1706848913|     24|

In [33]:
from pyspark.sql.types import StructType, StructField, TimestampType, IntegerType, FloatType, StringType

# Define the expected schema
expected_schema = StructType([
    StructField("transaction_id", IntegerType(), True),
    StructField("user_id", IntegerType(), True),  # corrected "user_d" to "user_id"
    StructField("amount", FloatType(), True),
    StructField("type", StringType(), True),
    StructField("timestamp", TimestampType(), True)
])

# Function to validate schema (only compares column names)
def validate_schema(df, expected_schema):
    actual_columns = df.columns
    expected_columns = [field.name for field in expected_schema]
    
    missing_columns = [col for col in expected_columns if col not in actual_columns]
    extra_columns = [col for col in actual_columns if col not in expected_columns]
    
    if missing_columns:
        print(f"Missing columns: {missing_columns}")
    if extra_columns:
        print(f"Unexpected columns: {extra_columns}")
    
    return not missing_columns and not extra_columns

# Example usage
print("Database Schema Valid:", validate_schema(psql_df, expected_schema))
print("CSV Schema Valid:", validate_schema(csv_df, expected_schema))


Database Schema Valid: True
CSV Schema Valid: True


In [4]:
from pyspark.sql.functions import count, when, col

def check_nulls(df):
    """
    Checks for null or NaN values in a Spark DataFrame and prints the count of such values for each column.
    :param df: Spark DataFrame to check for nulls
    """
    # Identify numeric and non-numeric columns
    numeric_types = {"double", "float"}
    numeric_cols = [f.name for f in df.schema.fields if f.dataType.simpleString() in numeric_types]
    
    # Build the condition for each column
    null_counts = df.select([
        count(
            when(
                col(c).isNull() | (col(c).isNaN() if c in numeric_cols else col(c).isNull()),
                c
            )
        ).alias(c)
        for c in df.columns
    ])
    
    # Show the results
    null_counts.show()

# Assuming psql_df and csv_df are your DataFrames loaded from PostgreSQL and CSV respectively
print("Null/Empty Values in Database Data:")
check_nulls(psql_df)

print("Null/Empty Values in CSV Data:")
check_nulls(csv_df)


Null/Empty Values in Database Data:
+--------------+-------+------+----+---------+
|transaction_id|user_id|amount|type|timestamp|
+--------------+-------+------+----+---------+
|             0|      0|     0|   0|        0|
+--------------+-------+------+----+---------+

Null/Empty Values in CSV Data:
+--------------+-------+------+----+---------+
|transaction_id|user_id|amount|type|timestamp|
+--------------+-------+------+----+---------+
|             0|      5|     9|   0|        0|
+--------------+-------+------+----+---------+



In [5]:
# Null/Missing data handling

In [6]:
# Check for Duplicates
def check_duplicates(df, key_columns):
    duplicate_count = df.groupBy(key_columns).count().filter(col("count") > 1).count()
    print(f"Duplicate Records Found: {duplicate_count}")

print("Database Duplicate Check:")
check_duplicates(psql_df, ["transaction_id"])

print("CSV Duplicate Check:")
check_duplicates(csv_df, ["transaction_id"])


Database Duplicate Check:
Duplicate Records Found: 0
CSV Duplicate Check:
Duplicate Records Found: 30


In [28]:
# Type check

3

In [27]:
# Combine data and run all tests

aligned_csv_df = csv_df.select(psql_df.columns)
concatenated_df = psql_df.union(aligned_csv_df)
concatenated_df = concatenated_df.distinct()
# print(concatenated_df.show())


print("concatenated_df Schema Valid:", validate_schema(concatenated_df, expected_schema))
(check_nulls(concatenated_df))
(check_duplicates(concatenated_df, ["transaction_id"]))

# Handle duplicates
deduplicated_df = concatenated_df.dropDuplicates(["transaction_id"])
(check_duplicates(deduplicated_df, ["transaction_id"]))



# Clean data

# Optimization



concatenated_df Schema Valid: True
+--------------+-------+------+----+---------+
|transaction_id|user_id|amount|type|timestamp|
+--------------+-------+------+----+---------+
|             0|      5|     9|   0|        0|
+--------------+-------+------+----+---------+

Duplicate Records Found: 12
Duplicate Records Found: 0


In [7]:
# Consistency Check
def compare_data(df1, df2, key_column):
    joined_df = df1.join(df2, on=key_column, how="inner")
    mismatched_df = joined_df.filter(df1["amount"] != df2["amount"])
    mismatched_df.show()

print("Consistency Check Between Database and CSV:")
compare_data(psql_df, csv_df, "transaction_id")


Consistency Check Between Database and CSV:
+--------------+-------+------+----+---------+-------+------+----+---------+
|transaction_id|user_id|amount|type|timestamp|user_id|amount|type|timestamp|
+--------------+-------+------+----+---------+-------+------+----+---------+
+--------------+-------+------+----+---------+-------+------+----+---------+



In [8]:
# Post-Ingestion
print("Row Count in Database:", psql_df.count())
print("Row Count in CSV:", csv_df.count())

psql_df.show(5)
csv_df.show(5)


Row Count in Database: 495
Row Count in CSV: 129
+--------------+-------+------+--------+-------------------+
|transaction_id|user_id|amount|    type|          timestamp|
+--------------+-------+------+--------+-------------------+
|    1704169733|      9|361.83|transfer|2024-01-01 20:28:53|
|    1706848193|     77| 55.28|transfer|2024-02-01 20:29:53|
|    1709353853|     69| 82.96|purchase|2024-03-01 20:30:53|
|    1712028713|     88| 36.10|transfer|2024-04-01 20:31:53|
|    1714620773|     22| 94.44|transfer|2024-05-01 20:32:53|
+--------------+-------+------+--------+-------------------+
only showing top 5 rows

+--------------+-------+--------------+--------+--------------------+
|transaction_id|user_id|        amount|    type|           timestamp|
+--------------+-------+--------------+--------+--------------------+
|    1704314682|     45|960.52 dollars|  refund|2024-01-03T12:44:42Z|
|    1706993142|     70|434.41 dollars|purchase|2024-02-03T12:45:42Z|
|    1709498802|     80|856

In [9]:
psql_df.describe().show()
csv_df.describe().show()


+-------+-------------------+------------------+------------------+--------+
|summary|     transaction_id|           user_id|            amount|    type|
+-------+-------------------+------------------+------------------+--------+
|  count|                495|               495|               495|     495|
|   mean|1.718286145418182E9|51.183838383838385|        501.332970|    NULL|
| stddev|  9198036.564008962| 28.80171886413176|277.83947109554646|    NULL|
|    min|         1704169733|                 1|             10.71|purchase|
|    max|         1733203479|               100|            997.41|transfer|
+-------+-------------------+------------------+------------------+--------+

+-------+--------------------+-----------------+------------------+--------+--------------------+
|summary|      transaction_id|          user_id|            amount|    type|           timestamp|
+-------+--------------------+-----------------+------------------+--------+--------------------+
|  count|   

In [10]:
def log_issues(issue_type, details):
    print(f"[LOG] {issue_type}: {details}")

log_issues("Missing Columns", "Transaction_date missing in CSV")
log_issues("Null Values", "Null values in 'amount' column")


[LOG] Missing Columns: Transaction_date missing in CSV
[LOG] Null Values: Null values in 'amount' column


Note: you may need to restart the kernel to use updated packages.


In [18]:
import great_expectations as ge
import pandas as pd

# Convert Spark DataFrame to Pandas DataFrame
psql_df_pd = psql_df.toPandas()

# Create a new Expectations Suite
context = ge.data_context.DataContext("/path/to/great_expectations/directory")
suite = context.create_expectation_suite("my_suite")

# Add expectations
# Example: Expect column values to be in a set
batch = ge.dataset.PandasDataset(psql_df_pd)  # Wrap the pandas DataFrame with a batch
batch.expect_column_values_to_be_in_set("column_name", ["value1", "value2"])

# Validate the dataset
validation_results = batch.validate()
print(validation_results)


  if LooseVersion(pandas.__version__) < LooseVersion(minimum_pandas_version):
  if LooseVersion(pandas.__version__) < LooseVersion(minimum_pandas_version):


AttributeError: module 'great_expectations.data_context' has no attribute 'DataContext'