In [1]:
df_transaction = (spark
      .read
      .option("header", "true")
      .option("inferSchema", "true")
      .option("delimiter", "\t")
      .json("s3://aws-project-1-mskish/bronze/transactions-topic/"))


Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.8 
Trying to create a Glue session for the kernel.
Session Type: glueetl
Session ID: 0647b4c3-d5e8-4517-bb36-97a9ed3fb523
Applying the following default arguments:
--glue_kernel_version 1.0.8
--enable-glue-datacatalog true
Waiting for session 0647b4c3-d5e8-4517-bb36-97a9ed3fb523 to get into ready status...
Session 0647b4c3-d5e8-4517-bb36-97a9ed3fb523 has been created.



In [2]:
df_transaction.columns

['amount', 'card_id', 'client_id', 'date', 'errors', 'id', 'mcc', 'merchant_city', 'merchant_id', 'merchant_state', 'use_chip', 'zip']


In [3]:
df_transaction.show(50)

+--------+-------+---------+-------------------+------+--------+----+--------------+-----------+--------------+------------------+-------+
|  amount|card_id|client_id|               date|errors|      id| mcc| merchant_city|merchant_id|merchant_state|          use_chip|    zip|
+--------+-------+---------+-------------------+------+--------+----+--------------+-----------+--------------+------------------+-------+
| $263.43|   3348|      619|2012-11-23 16:03:00|      |11978328|4814|        Austin|      54850|            MN| Swipe Transaction|55912.0|
|  $38.26|   4576|      456|2012-07-07 13:57:00|      |11363233|5411|    Cape Coral|      68135|            FL| Swipe Transaction|33909.0|
|  $52.57|   4676|      209|2010-06-10 16:59:00|      | 8117710|5912|       El Paso|      81833|            TX| Swipe Transaction|79928.0|
|  $40.00|   1133|     1605|2013-04-13 12:08:00|      |12606562|4829|        Amelia|      27092|            OH| Swipe Transaction|45102.0|
|   $4.58|   5247|      144

In [4]:
from pyspark.sql import functions as F
from pyspark.sql.types import FloatType, IntegerType

# 1. Clean 'amount' column
df_transaction = df_transaction.withColumn(
    'amount',
    F.regexp_replace('amount', '[$|]', '').cast(FloatType()))





In [5]:
# 2. Convert 'amount' to float
df_transaction = df_transaction.withColumn('amount',F.col('amount').cast(FloatType()))




In [6]:
# 3. Convert 'zip' to integer, handle empty strings as nulls
df_transaction = df_transaction.withColumn('zip',F.when(F.col('zip') == '', None).otherwise(F.col('zip').cast(IntegerType())))




In [7]:
# 4. Optional: Trim whitespace from string columns
string_cols = ['card_id','client_id','errors','use_chip','merchant_city','merchant_state','merchant_id','mcc']
for col in string_cols:
    df_transaction = df_transaction.withColumn(col, F.trim(F.col(col)))




In [8]:
# 5. Clean 'amount' column
df_transaction = df_transaction.withColumn('amount',F.regexp_replace('amount', '[$|]', ''))




In [9]:
# 6. Convert 'amount' to float
df_transaction = df_transaction.withColumn('amount',F.col('amount').cast(FloatType()))




In [10]:
# 7. Convert 'zip' to integer
df_transaction = df_transaction.withColumn('zip',F.when(F.col('zip') == '', None).otherwise(F.col('zip').cast(IntegerType())))




In [11]:
# 8. Handle null/empty values
df_transaction = df_transaction.withColumn('merchant_state',F.when((F.col('merchant_state').isNull()) | (F.col('merchant_state') == ''), 'NA').otherwise(F.col('merchant_state')))




In [12]:
df_transaction = df_transaction.withColumn('client_id',F.when(F.col('client_id') == 0, 'YTD').otherwise(F.col('client_id')))




In [13]:
df_transaction = df_transaction.withColumn('zip',F.when(F.col('zip').isNull(), 0).otherwise(F.col('zip')))




In [14]:
df_transaction = df_transaction.withColumn('errors',F.when((F.col('errors').isNull()) | (F.col('errors') == ''), 'N').otherwise(F.col('errors')))




In [15]:
df_transaction.show()


+------+-------+---------+-------------------+------+--------+----+-------------+-----------+--------------+------------------+-----+
|amount|card_id|client_id|               date|errors|      id| mcc|merchant_city|merchant_id|merchant_state|          use_chip|  zip|
+------+-------+---------+-------------------+------+--------+----+-------------+-----------+--------------+------------------+-----+
|263.43|   3348|      619|2012-11-23 16:03:00|     N|11978328|4814|       Austin|      54850|            MN| Swipe Transaction|55912|
| 38.26|   4576|      456|2012-07-07 13:57:00|     N|11363233|5411|   Cape Coral|      68135|            FL| Swipe Transaction|33909|
| 52.57|   4676|      209|2010-06-10 16:59:00|     N| 8117710|5912|      El Paso|      81833|            TX| Swipe Transaction|79928|
|  40.0|   1133|     1605|2013-04-13 12:08:00|     N|12606562|4829|       Amelia|      27092|            OH| Swipe Transaction|45102|
|  4.58|   5247|      144|2013-04-18 10:00:00|     N|12628171|

In [16]:
df_cards = (spark
      .read
      .option("header", "true")
      .option("inferSchema", "true")
      .option("delimiter", "\t")
      .json("s3://aws-project-1-mskish/bronze/cards-topic/"))




In [17]:
df_cards.show()

+--------------+----------+----------------+----------------+---------------+---------+------------+---+-------+--------+----+----------------+---------------------+
|acct_open_date|card_brand|     card_number|card_on_dark_web|      card_type|client_id|credit_limit|cvv|expires|has_chip|  id|num_cards_issued|year_pin_last_changed|
+--------------+----------+----------------+----------------+---------------+---------+------------+---+-------+--------+----+----------------+---------------------+
|       09/2002|      Visa|4344676511950444|              No|          Debit|      825|      $24295|623|12/2022|     YES|4524|               2|                 2008|
|       04/2014|      Visa|4956965974959986|              No|          Debit|      825|      $21968|393|12/2020|     YES|2731|               2|                 2014|
|       07/2003|      Visa|4582313478255491|              No|          Debit|      825|      $46414|719|02/2024|     YES|3701|               2|                 2004|
|   

In [18]:
from pyspark.sql.functions import regexp_replace, col

df_cards = df_cards.withColumn("credit_limit",regexp_replace(col("credit_limit"), "[$]", "").cast("double"))





In [19]:
df_cards = df_cards.fillna({"client_id": 0})




In [20]:
df_cards = df_cards.fillna({"card_type": "NA", "card_brand": "NA"})




In [21]:
df_cards = df_cards.fillna({"has_chip": "NO"})




In [22]:
from pyspark.sql.functions import to_date, unix_timestamp
df_cards = df_cards.withColumn("acct_open_date",to_date(unix_timestamp("acct_open_date", "MM/yyyy").cast("timestamp")))




In [23]:
df_cards = df_cards.withColumn("expires",to_date(unix_timestamp("expires", "MM/yyyy").cast("timestamp")))




In [24]:
df_cards = df_cards.fillna({"cvv": 0})




In [25]:
df_cards = df_cards.fillna({"num_cards_issued": 1, "year_pin_last_changed": 0})




In [26]:
df_cards = df_cards.fillna({"card_on_dark_web": "No"})




In [27]:
df_cards.show()

+--------------+----------+----------------+----------------+---------------+---------+------------+---+----------+--------+----+----------------+---------------------+
|acct_open_date|card_brand|     card_number|card_on_dark_web|      card_type|client_id|credit_limit|cvv|   expires|has_chip|  id|num_cards_issued|year_pin_last_changed|
+--------------+----------+----------------+----------------+---------------+---------+------------+---+----------+--------+----+----------------+---------------------+
|    2002-09-01|      Visa|4344676511950444|              No|          Debit|      825|     24295.0|623|2022-12-01|     YES|4524|               2|                 2008|
|    2014-04-01|      Visa|4956965974959986|              No|          Debit|      825|     21968.0|393|2020-12-01|     YES|2731|               2|                 2014|
|    2003-07-01|      Visa|4582313478255491|              No|          Debit|      825|     46414.0|719|2024-02-01|     YES|3701|               2|         

In [28]:
df_users = (spark
      .read
      .option("header", "true")
      .option("inferSchema", "true")
      .option("delimiter", "\t")
      .json("s3://aws-project-1-mskish/bronze/users-topic/"))




In [29]:
df_users.show()

+--------------------+-----------+----------+------------+-----------+------+----+--------+---------+----------------+-----------------+--------------+----------+-------------+
|             address|birth_month|birth_year|credit_score|current_age|gender|  id|latitude|longitude|num_credit_cards|per_capita_income|retirement_age|total_debt|yearly_income|
+--------------------+-----------+----------+------------+-----------+------+----+--------+---------+----------------+-----------------+--------------+----------+-------------+
|       462 Rose Lane|         11|      1966|         787|         53|Female| 825|   34.15|  -117.76|               5|           $29278|            66|   $127613|       $59696|
|3606 Federal Boul...|         12|      1966|         701|         53|Female|1746|   40.76|   -73.74|               5|           $37891|            68|   $191349|       $77254|
|     766 Third Drive|         11|      1938|         698|         81|Female|1718|   34.02|  -117.89|              

In [30]:
money_cols = ["yearly_income", "total_debt", "per_capita_income"]

for c in money_cols:
    df_users = df_users.withColumn(c, regexp_replace(col(c), "[$]", "").cast("float"))




In [31]:
df_users = df_users.fillna({
    "birth_month": 0,
    "birth_year": 0,
    "credit_score": 0,
    "num_credit_cards": 0})




In [32]:
from pyspark.sql.functions import col, regexp_replace, when
df_users = df_users.withColumn("errors", when(col("yearly_income").isNull(), "Y").otherwise("N"))




In [33]:
df_users.show()

+--------------------+-----------+----------+------------+-----------+------+----+--------+---------+----------------+-----------------+--------------+----------+-------------+------+
|             address|birth_month|birth_year|credit_score|current_age|gender|  id|latitude|longitude|num_credit_cards|per_capita_income|retirement_age|total_debt|yearly_income|errors|
+--------------------+-----------+----------+------------+-----------+------+----+--------+---------+----------------+-----------------+--------------+----------+-------------+------+
|       462 Rose Lane|         11|      1966|         787|         53|Female| 825|   34.15|  -117.76|               5|          29278.0|            66|  127613.0|      59696.0|     N|
|3606 Federal Boul...|         12|      1966|         701|         53|Female|1746|   40.76|   -73.74|               5|          37891.0|            68|  191349.0|      77254.0|     N|
|     766 Third Drive|         11|      1938|         698|         81|Female|171

In [36]:
import boto3
from awsglue.context import GlueContext
from pyspark.context import SparkContext

# ----------------------------
# Initialize Spark & Glue
# ----------------------------
sc = SparkContext.getOrCreate()   # Use getOrCreate to avoid multiple SparkContext errors
glueContext = GlueContext(sc)
spark = glueContext.spark_session

# ----------------------------
# S3 paths
# ----------------------------
bronze_bucket = "s3://aws-project-1-mskish/bronze"
silver_bucket = "s3://aws-project-1-mskish/silver"

# Paths for parquet write
silver_transactions_path = f"{silver_bucket}/transactions-topic/"
silver_users_path = f"{silver_bucket}/users-topic/"
silver_cards_path = f"{silver_bucket}/cards-topic/"

# ----------------------------
# 1️⃣ Write cleaned DataFrames to Silver in Parquet
# ----------------------------
df_transaction.write.mode("overwrite").parquet(silver_transactions_path)
df_users.write.mode("overwrite").parquet(silver_users_path)
df_cards.write.mode("overwrite").parquet(silver_cards_path)
print("Cleaned CSV files written to Silver in Parquet format.")

# ----------------------------
# 2️⃣ Copy JSON files from Bronze → Silver
# ----------------------------
s3 = boto3.client('s3')
files_to_copy = {
    "mcc-topic/combined_mcc-topic.json": "mcc-topic/combined_mcc-topic.json",
    "train_fraud_labels_topic/train_fraud_labels.json": "train_fraud_labels_topic/train_fraud_labels.json"
}

for source_key, dest_key in files_to_copy.items():
    s3.copy_object(
        Bucket="aws-project-1-mskish",
        CopySource={"Bucket": "aws-project-1-mskish", "Key": source_key},
        Key=f"silver/{dest_key}"
    )
    print(f"{source_key} copied to silver/{dest_key}")

print("All files are now in Silver layer.")


NoSuchKey: An error occurred (NoSuchKey) when calling the CopyObject operation: The specified key does not exist.


In [37]:
s3 = boto3.client('s3')

files_to_copy = {
    "bronze/mcc-topic/combined_mcc-topic.json": "silver/mcc-topic/combined_mcc-topic.json",
    "bronze/train_fraud_labels_topic/train_fraud_labels.json": "silver/train_fraud_labels_topic/train_fraud_labels.json"
}

for source_key, dest_key in files_to_copy.items():
    # Strip the "bronze/" prefix from source_key for CopySource
    bucket_name = "aws-project-1-mskish"
    s3.copy_object(
        Bucket=bucket_name,
        CopySource={"Bucket": bucket_name, "Key": source_key},
        Key=dest_key
    )
    print(f"{source_key} copied to {dest_key}")


{'ResponseMetadata': {'RequestId': 'A8A8ZA7H72B2CFYB', 'HostId': 'X9M/r5sQd6jLFoFmCBiUdw4ZMa+eSnis1+i1b4vh+4nHYYFuXTqmS+LhCmaysLzaZafoLpTuyCE=', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amz-id-2': 'X9M/r5sQd6jLFoFmCBiUdw4ZMa+eSnis1+i1b4vh+4nHYYFuXTqmS+LhCmaysLzaZafoLpTuyCE=', 'x-amz-request-id': 'A8A8ZA7H72B2CFYB', 'date': 'Sat, 20 Sep 2025 13:44:42 GMT', 'x-amz-server-side-encryption': 'AES256', 'content-type': 'application/xml', 'content-length': '263', 'server': 'AmazonS3'}, 'RetryAttempts': 0}, 'ServerSideEncryption': 'AES256', 'CopyObjectResult': {'ETag': '"3225250abd99fdd1666ed9e9edcd9cc7"', 'LastModified': datetime.datetime(2025, 9, 20, 13, 44, 42, tzinfo=tzlocal()), 'ChecksumCRC32': 'TNyOmQ=='}}
bronze/mcc-topic/combined_mcc-topic.json copied to silver/mcc-topic/combined_mcc-topic.json
{'ResponseMetadata': {'RequestId': 'A8A903DWA39J9T6V', 'HostId': 'eZYkHk1mOLUu9oJYmw/dC/KYehmsADk/by3pTZbS9FqHjj0HkO14r5jLqSQFOl24d5iJIZLTTdk=', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amz-id-