Importing Kaggle Dataset

In [2]:
from kagglehub import dataset_download
#remove warnings
import warnings
warnings.filterwarnings("ignore")

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, to_timestamp, split, when, lower, round as sp_round, from_utc_timestamp, date_format, udf
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType, LongType
import re

path: str = dataset_download("jinquan/cc-sample-data")

print(path)

spark = SparkSession.builder.appName("payNet").getOrCreate()


  from .autonotebook import tqdm as notebook_tqdm


/home/jeevin/.cache/kagglehub/datasets/jinquan/cc-sample-data/versions/1


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/07/21 00:59:04 WARN Utils: Your hostname, jeevin, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/07/21 00:59:04 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/07/21 00:59:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Load JSON and clean up Data

In [None]:
import pyspark.sql.functions as sf

# Read the JSON data from the file
df = spark.read.json(path)

def clean_json_string(json_str):
    """
    Clean JSON string by:
    1. Removing all backslashes
    2. Removing quotes around JSON objects (e.g., "{ }" becomes { })
    """
    if json_str is None:
        return None
    
    # Remove all backslashes
    cleaned = json_str.replace("\\", "")
    
    # Remove quotes around JSON objects - pattern: "{ ... }"
    # This regex finds quoted JSON objects and removes the outer quotes
    cleaned = re.sub(r'"\s*\{\s*(.*?)\s*\}\s*"', r'{\1}', cleaned)
    
    return cleaned

# Register UDF
clean_json_udf = udf(clean_json_string, StringType())

# Apply cleaning to the personal_detail column
df_cleaned = df.withColumn("personal_detail", clean_json_udf(sf.col("personal_detail")))

# Define schema for address (nested within personal_detail) - all strings initially
address_schema = StructType([
    StructField("street", StringType(), True),
    StructField("city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("zip", StringType(), True)
])

# Define schema for personal_detail - all strings initially
personal_schema = StructType([
    StructField("person_name", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("address", address_schema, True),
    StructField("lat", StringType(), True),
    StructField("long", StringType(), True),
    StructField("city_pop", StringType(), True),
    StructField("job", StringType(), True),
    StructField("dob", StringType(), True)
])

# Parse the cleaned JSON string into proper columns (overwrite the original column)
df_with_parsed_personal = df_cleaned.withColumn("personal_detail", sf.from_json(sf.col("personal_detail"), personal_schema))



df_with_names = df_with_parsed_personal \
    .withColumn("name_parts", sf.split(sf.col("personal_detail.person_name"), ",")) \
    .withColumn("first", 
                # Get the first element if the array is not empty
                sf.when(sf.size(sf.col("name_parts")) >= 1, sf.trim(sf.element_at(sf.col("name_parts"), 1)))
                .otherwise(None)) \
    .withColumn("last", 
                # If there's more than one part, concatenate from the second part onwards
                sf.when(sf.size(sf.col("name_parts")) >= 2, 
                        sf.trim(sf.concat_ws(",", sf.slice(sf.col("name_parts"), 2, sf.size(sf.col("name_parts"))))))
                .otherwise(None)) \
    .drop("name_parts") # Drop the intermediate column used for splitting


# Flatten the personal_detail structure and address structure
df_flattened = df_with_names.select(
    # Original columns in desired order
    sf.col("Unnamed: 0"),
    sf.col("trans_date_trans_time"),
    sf.col("cc_num"),
    sf.col("merchant"),
    sf.col("category"),
    sf.col("amt"),
    
    sf.col("first"),
    sf.col("last"),

    # Personal details
    sf.col("personal_detail.gender").alias("gender"),
    
    # Flattened address details
    sf.col("personal_detail.address.street").alias("street"),
    sf.col("personal_detail.address.city").alias("city"),
    sf.col("personal_detail.address.state").alias("state"),
    sf.col("personal_detail.address.zip").alias("zip"),
    
    # Location and demographic info
    sf.col("personal_detail.lat").alias("lat"),
    sf.col("personal_detail.long").alias("long"),
    sf.col("personal_detail.city_pop").alias("city_pop"),
    sf.col("personal_detail.job").alias("job"),
    sf.col("personal_detail.dob").alias("dob"),
    
    # Transaction details
    sf.col("trans_num"),
    sf.col("merch_lat"),
    sf.col("merch_long"),
    sf.col("is_fraud"),
    sf.col("merch_zipcode"),
    sf.col("merch_last_update_time"),
    sf.col("merch_eff_time"),
    sf.col("cc_bic")
)

# Type conversions and rounding in one operation
df_final = df_flattened.withColumns({
    'Unnamed: 0': sf.col("Unnamed: 0").cast(IntegerType()),
    'trans_date_trans_time': sf.to_timestamp(sf.col("trans_date_trans_time"), "yyyy-MM-dd HH:mm:ss"),
    'amt': sf.round(sf.col("amt").cast(FloatType()), 6),
    'merch_lat': sf.round(sf.col("merch_lat").cast(FloatType()), 6),
    'merch_long': sf.round(sf.col("merch_long").cast(FloatType()), 6),
    'is_fraud': sf.col("is_fraud").cast(IntegerType()),
    'merch_eff_time': sf.col("merch_eff_time").cast(LongType()),
    'merch_last_update_time': sf.col("merch_last_update_time").cast(LongType()),
    'lat': sf.round(sf.col("lat").cast(FloatType()), 6),
    'long': sf.round(sf.col("long").cast(FloatType()), 6),
    'city_pop': sf.col("city_pop").cast(IntegerType())
})

# Handle null values and "NA" strings for all string columns automatically
string_columns = [field.name for field in df_final.schema.fields if field.dataType.typeName() == 'string']

# Create dictionary for null value handling across all string columns
null_handling_dict = {}
for col_name in string_columns:
    null_handling_dict[col_name] = sf.when(
        (sf.lower(sf.col(col_name)) == "na") | 
        (sf.lower(sf.col(col_name)) == "null") | 
        (sf.col(col_name) == ""), 
        None
    ).otherwise(sf.col(col_name))

df_final = df_final.withColumns(null_handling_dict)

# Show final result
df_final.show()

# Show schema to verify structure
df_final.printSchema()

# Optional: Show any conversion failures
print("Checking for conversion failures...")
df_final.filter(sf.col("amt").isNull() | sf.col("lat").isNull() | sf.col("city_pop").isNull()).show()

# Save the cleaned data
# df_final.write.mode("overwrite").json("cleaned_output_path")


                                                                                

+----------+---------------------+-------------------+--------------------+-------------+------+--------------------+----------------+------+--------------------+--------------------+-----+-----+-------+---------+--------+--------------------+----------+--------------------+---------+----------+--------+-------------+----------------------+----------------+-----------+
|Unnamed: 0|trans_date_trans_time|             cc_num|            merchant|     category|   amt|               first|            last|gender|              street|                city|state|  zip|    lat|     long|city_pop|                 job|       dob|           trans_num|merch_lat|merch_long|is_fraud|merch_zipcode|merch_last_update_time|  merch_eff_time|     cc_bic|
+----------+---------------------+-------------------+--------------------+-------------+------+--------------------+----------------+------+--------------------+--------------------+-----+-----+-------+---------+--------+--------------------+----------+--



+----------+---------------------+------+--------+--------+---+-----+----+------+------+----+-----+---+---+----+--------+---+---+---------+---------+----------+--------+-------------+----------------------+--------------+------+
|Unnamed: 0|trans_date_trans_time|cc_num|merchant|category|amt|first|last|gender|street|city|state|zip|lat|long|city_pop|job|dob|trans_num|merch_lat|merch_long|is_fraud|merch_zipcode|merch_last_update_time|merch_eff_time|cc_bic|
+----------+---------------------+------+--------+--------+---+-----+----+------+------+----+-----+---+---+----+--------+---+---+---------+---------+----------+--------+-------------+----------------------+--------------+------+
+----------+---------------------+------+--------+--------+---+-----+----+------+------+----+-----+---+---+----+--------+---+---+---------+---------+----------+--------+-------------+----------------------+--------------+------+



                                                                                

In [10]:
df = spark.read.json(path)

# Initialize Spark session
spark = SparkSession.builder.appName("JSONCleaner").getOrCreate()

def clean_json_string(json_str):
    """
    Clean JSON string by:
    1. Removing all backslashes
    2. Removing quotes around JSON objects (e.g., "{ }" becomes { })
    """
    if json_str is None:
        return None
    
    # Remove all backslashes
    cleaned = json_str.replace("\\", "")
    
    # Remove quotes around JSON objects - pattern: "{ ... }"
    # This regex finds quoted JSON objects and removes the outer quotes
    cleaned = re.sub(r'"\s*\{\s*(.*?)\s*\}\s*"', r'{\1}', cleaned)
    
    return cleaned

# Register UDF
clean_json_udf = udf(clean_json_string, StringType())

# Apply cleaning to the personal_detail column
df_cleaned = df.withColumn("personal_detail", clean_json_udf(col("personal_detail")))

# Define schema for address (nested within personal_detail) - all strings initially
address_schema = StructType([
    StructField("street", StringType(), True),
    StructField("city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("zip", StringType(), True)
])

# Define schema for personal_detail - all strings initially
personal_schema = StructType([
    StructField("person_name", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("address", address_schema, True),
    StructField("lat", StringType(), True),
    StructField("long", StringType(), True),
    StructField("city_pop", StringType(), True),
    StructField("job", StringType(), True),
    StructField("dob", StringType(), True)
])

# Parse the cleaned JSON string into proper columns (overwrite the original column)
df_with_parsed_personal = df_cleaned.withColumn("personal_detail", from_json(col("personal_detail"), personal_schema))

# Flatten the personal_detail structure and address structure
df_flattened = df_with_parsed_personal.select(
    # Original columns in desired order
    col("Unnamed: 0"),
    col("trans_date_trans_time"),
    col("cc_num"),
    col("merchant"),
    col("category"),
    col("amt"),
    
    # Split person_name into first and last names
    split(col("personal_detail.person_name"), ",")[0].alias("first"),
    split(col("personal_detail.person_name"), ",")[1].alias("last"),
    
    # Personal details
    col("personal_detail.gender").alias("gender"),
    
    # Flattened address details
    col("personal_detail.address.street").alias("street"),
    col("personal_detail.address.city").alias("city"),
    col("personal_detail.address.state").alias("state"),
    col("personal_detail.address.zip").alias("zip"),
    
    # Location and demographic info
    col("personal_detail.lat").alias("lat"),
    col("personal_detail.long").alias("long"),
    col("personal_detail.city_pop").alias("city_pop"),
    col("personal_detail.job").alias("job"),
    col("personal_detail.dob").alias("dob"),
    
    # Transaction details
    col("trans_num"),
    col("merch_lat"),
    col("merch_long"),
    col("is_fraud"),
    col("merch_zipcode"),
    col("merch_last_update_time"),
    col("merch_eff_time"),
    col("cc_bic")
)


# Type conversions and rounding in one operation
df_final = df_flattened.withColumns({
    'Unnamed: 0': col("Unnamed: 0").cast(IntegerType()),
    'trans_date_trans_time': to_timestamp(col("trans_date_trans_time"), "yyyy-MM-dd HH:mm:ss"),
    'amt': sp_round(col("amt").cast(FloatType()), 6),
    'merch_lat': sp_round(col("merch_lat").cast(FloatType()), 6),
    'merch_long': sp_round(col("merch_long").cast(FloatType()), 6),
    'is_fraud': col("is_fraud").cast(IntegerType()),
    'merch_eff_time': col("merch_eff_time").cast(LongType()),
    'merch_last_update_time': col("merch_last_update_time").cast(LongType()),
    'lat': sp_round(col("lat").cast(FloatType()), 6),
    'long': sp_round(col("long").cast(FloatType()), 6),
    'city_pop': col("city_pop").cast(IntegerType())
})

# Handle null values and "NA" strings for all string columns automatically
string_columns = [field.name for field in df_final.schema.fields if field.dataType.typeName() == 'string']

# Create dictionary for null value handling across all string columns
null_handling_dict = {}
for col_name in string_columns:
    null_handling_dict[col_name] = when(
        (lower(col(col_name)) == "na") | 
        (lower(col(col_name)) == "null") | 
        (col(col_name) == ""), 
        None
    ).otherwise(col(col_name))

df_final = df_final.withColumns(null_handling_dict)

# Show final result
df_final.show()

# Show schema to verify structure
df_final.printSchema()

# Optional: Show any conversion failures
print("Checking for conversion failures...")
df_final.filter(col("amt").isNull() | col("lat").isNull() | col("city_pop").isNull()).show()

# Save the cleaned data
# df_final.write.mode("overwrite").json("cleaned_output_path")

25/07/21 01:15:41 ERROR Executor: Exception in task 0.0 in stage 15.0 (TID 101) 
org.apache.spark.SparkArrayIndexOutOfBoundsException: [INVALID_ARRAY_INDEX] The index 1 is out of bounds. The array has 1 elements. Use the SQL function `get()` to tolerate accessing element at invalid index and return NULL instead. SQLSTATE: 22003
== DataFrame ==
"__getitem__" was called from
line 65 in cell [10]

	at org.apache.spark.sql.errors.QueryExecutionErrors$.invalidArrayIndexError(QueryExecutionErrors.scala:233)
	at org.apache.spark.sql.errors.QueryExecutionErrors.invalidArrayIndexError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)
	at org.a

ArrayIndexOutOfBoundsException: [INVALID_ARRAY_INDEX] The index 1 is out of bounds. The array has 1 elements. Use the SQL function `get()` to tolerate accessing element at invalid index and return NULL instead. SQLSTATE: 22003
== DataFrame ==
"__getitem__" was called from
line 65 in cell [10]


Infer insights from the data