In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, count, sum as _sum, avg, min as _min, max as _max,
    round as _round, abs as _abs, when, dayofweek, weekofyear,
    year, month, dayofmonth, date_format, current_timestamp,
    countDistinct, first, desc, row_number
)
from pyspark.sql.window import Window
from datetime import datetime
import logging


class Config:
    """Configuration for the Gold Layer ETL job"""
    
    def __init__(self):
        # MinIO/S3 Configuration
        self.silver_base_path = "s3a://silver"
        self.transactions_path = f"{self.silver_base_path}/transactions"
        self.cards_path = f"{self.silver_base_path}/cards"
        self.users_path = f"{self.silver_base_path}/users"
        self.mcc_codes_path = f"{self.silver_base_path}/mcc_codes"
        
        # PostgreSQL Configuration
        self.postgres_host = "postgres"
        self.postgres_port = "5432"
        self.postgres_db = "gold_db"
        self.postgres_user = "postgres"
        self.postgres_password = "postgres"
        self.postgres_table = "daily_transaction_summary"
        
        # JDBC URL
        self.jdbc_url = f"jdbc:postgresql://{self.postgres_host}:{self.postgres_port}/{self.postgres_db}"
        
        # JDBC Properties
        self.jdbc_properties = {
            "user": self.postgres_user,
            "password": self.postgres_password,
            "driver": "org.postgresql.Driver"
        }


class SilverDataReader:
    """Reads data from Silver layer"""
    
    def __init__(self, spark: SparkSession, config: Config):
        self.spark = spark
        self.config = config
        self.logger = logging.getLogger(__name__)
    
    def read_transactions(self):
        """Read transactions from silver layer"""
        self.logger.info(f"Reading transactions from {self.config.transactions_path}")
        return self.spark.read.parquet(self.config.transactions_path)
    
    def read_cards(self):
        """Read cards from silver layer"""
        self.logger.info(f"Reading cards from {self.config.cards_path}")
        return self.spark.read.parquet(self.config.cards_path)
    
    def read_users(self):
        """Read users from silver layer"""
        self.logger.info(f"Reading users from {self.config.users_path}")
        return self.spark.read.parquet(self.config.users_path)
    
    def read_mcc_codes(self):
        """Read MCC codes from silver layer"""
        self.logger.info(f"Reading MCC codes from {self.config.mcc_codes_path}")
        return self.spark.read.parquet(self.config.mcc_codes_path)


class DailyTransactionAggregator:
    """Transforms silver data into daily transaction summary"""
    
    def __init__(self, spark: SparkSession):
        self.spark = spark
        self.logger = logging.getLogger(__name__)
    
    def aggregate_daily_summary(self, transactions_df, users_df, cards_df, mcc_df):
        """
        Aggregate transactions with user, card, and MCC data into daily summary
        
        Args:
            transactions_df: Transactions DataFrame from silver layer
            users_df: Users DataFrame from silver layer
            cards_df: Cards DataFrame from silver layer
            mcc_df: MCC codes DataFrame from silver layer
            
        Returns:
            DataFrame with daily aggregated metrics
        """
        self.logger.info("Starting daily aggregation with all tables")
        
        # Join all tables
        enriched_df = transactions_df \
            .join(users_df, transactions_df.client_id == users_df.user_id, "left") \
            .join(cards_df, transactions_df.txn_card_id == cards_df.card_id, "left") \
            .join(mcc_df, transactions_df.mcc == mcc_df.mcc_code, "left")

        enriched_df = enriched_df.cache()
        
        self.logger.info(f"Enriched dataset created with {enriched_df.count()} records")
        
        # Cast transaction date
        enriched_df = enriched_df.withColumn(
            "transaction_date", 
            col("date").cast("date")
        )
        
        # Add derived columns
        enriched_df = enriched_df.withColumn(
            "is_chip",
            when(col("use_chip").contains("Chip"), 1).otherwise(0)
        )
        
        enriched_df = enriched_df.withColumn(
            "is_swipe",
            when(col("use_chip").contains("Swipe"), 1).otherwise(0)
        )
        
        enriched_df = enriched_df.withColumn(
            "has_error",
            when((col("errors").isNotNull()) & (col("errors") != ""), 1).otherwise(0)
        )
        
        enriched_df = enriched_df.withColumn(
            "is_credit_card",
            when(col("card_type").contains("Credit"), 1).otherwise(0)
        )
        
        enriched_df = enriched_df.withColumn(
            "is_debit_card",
            when(col("card_type").contains("Debit"), 1).otherwise(0)
        )
        
        enriched_df = enriched_df.withColumn(
            "is_visa",
            when(col("card_brand") == "Visa", 1).otherwise(0)
        )
        
        enriched_df = enriched_df.withColumn(
            "is_mastercard",
            when(col("card_brand") == "Mastercard", 1).otherwise(0)
        )
        
        enriched_df = enriched_df.withColumn(
            "is_dark_web",
            when(col("card_on_dark_web") == "Yes", 1).otherwise(0)
        )
        
        enriched_df = enriched_df.withColumn(
            "is_male",
            when(col("gender") == "Male", 1).otherwise(0)
        )
        
        enriched_df = enriched_df.withColumn(
            "is_female",
            when(col("gender") == "Female", 1).otherwise(0)
        )
        
        # Calculate debt to income ratio
        enriched_df = enriched_df.withColumn(
            "debt_to_income_ratio",
            when(col("yearly_income") > 0, col("total_debt") / col("yearly_income")).otherwise(0)
        )
        
        # Calculate credit utilization (transaction amount / credit limit)
        enriched_df = enriched_df.withColumn(
            "credit_utilization",
            when(col("credit_limit") > 0, 
                 _abs(col("amount")) / col("credit_limit") * 100).otherwise(0)
        )
        
        # High risk transaction flag (swipe > $1000 or dark web card)
        enriched_df = enriched_df.withColumn(
            "is_high_risk",
            when((col("is_swipe") == 1) & (_abs(col("amount")) > 1000), 1)
            .when(col("is_dark_web") == 1, 1)
            .otherwise(0)
        )
        
        # Identify restaurant and gas station transactions
        enriched_df = enriched_df.withColumn(
            "is_restaurant",
            when(col("mcc") == 5812, 1).otherwise(0)
        )
        
        enriched_df = enriched_df.withColumn(
            "is_gas_station",
            when(col("mcc") == 5541, 1).otherwise(0)
        )
        
        # Group by date and aggregate with metrics
        daily_summary = enriched_df.groupBy("transaction_date").agg(
            # ===== BASIC TRANSACTION METRICS =====
            count("*").alias("total_transactions"),
            _sum("amount").alias("total_revenue"),
            _sum(_abs(col("amount"))).alias("total_revenue_absolute"),
            _round(avg("amount"), 2).alias("average_transaction_amt"),
            _min("amount").alias("min_transaction_amt"),
            _max("amount").alias("max_transaction_amt"),
            
            # ===== USER DEMOGRAPHICS =====
            _round(avg("current_age"), 2).alias("avg_customer_age"),
            _round(avg("credit_score"), 2).alias("avg_credit_score"),
            _round(avg("yearly_income"), 2).alias("avg_yearly_income"),
            _round(avg("debt_to_income_ratio"), 2).alias("avg_debt_to_income_ratio"),
            _round((_sum("is_male") / count("*")) * 100, 2).alias("pct_male"),
            _round((_sum("is_female") / count("*")) * 100, 2).alias("pct_female"),
            
            # ===== CARD INSIGHTS =====
            _round((_sum("is_credit_card") / count("*")) * 100, 2).alias("pct_credit_cards"),
            _round((_sum("is_debit_card") / count("*")) * 100, 2).alias("pct_debit_cards"),
            _round((_sum("is_visa") / count("*")) * 100, 2).alias("pct_visa"),
            _round((_sum("is_mastercard") / count("*")) * 100, 2).alias("pct_mastercard"),
            _sum("is_dark_web").alias("cards_on_dark_web_count"),
            _round(avg("credit_limit"), 2).alias("avg_credit_limit"),
            _round(avg("credit_utilization"), 2).alias("avg_credit_utilization"),
            
            # ===== MERCHANT CATEGORY =====
            _sum(when(col("is_restaurant") == 1, col("amount")).otherwise(0)).alias("restaurant_revenue"),
            _sum(when(col("is_gas_station") == 1, col("amount")).otherwise(0)).alias("gas_station_revenue"),
            
            # ===== GEOGRAPHIC =====
            countDistinct("merchant_state").alias("unique_states"),
            countDistinct("merchant_city").alias("unique_cities"),
            
            # ===== RISK INDICATORS =====
            _sum("is_high_risk").alias("high_risk_transactions"),
            _sum(when((col("is_swipe") == 1) & (_abs(col("amount")) > 1000), 1).otherwise(0)).alias("swipe_over_1000_count"),
            _sum("has_error").alias("failed_transactions"),
            
            # ===== EXISTING METRICS =====
            countDistinct("client_id").alias("active_users"),
            countDistinct("txn_card_id").alias("active_cards"),
            countDistinct("merchant_id").alias("unique_merchants"),
            _sum("is_chip").alias("chip_transactions"),
            _sum("is_swipe").alias("swipe_transactions"),
            _sum(when(col("is_chip") == 1, col("amount")).otherwise(0)).alias("chip_revenue"),
            _sum(when(col("is_swipe") == 1, col("amount")).otherwise(0)).alias("swipe_revenue"),
            _sum("has_error").alias("total_errors")
        )
        
        # Calculate top state and city (most transactions)
        # Create temp views for window functions
        state_window = Window.partitionBy("transaction_date").orderBy(desc("state_count"))
        city_window = Window.partitionBy("transaction_date").orderBy(desc("city_count"))
        
        # Get top state per day
        top_states = enriched_df.groupBy("transaction_date", "merchant_state") \
            .agg(count("*").alias("state_count")) \
            .withColumn("state_rank", row_number().over(state_window)) \
            .filter(col("state_rank") == 1) \
            .select(
                col("transaction_date"),
                col("merchant_state").alias("top_state")
            )
        
        # Get top city per day
        top_cities = enriched_df.groupBy("transaction_date", "merchant_city") \
            .agg(count("*").alias("city_count")) \
            .withColumn("city_rank", row_number().over(city_window)) \
            .filter(col("city_rank") == 1) \
            .select(
                col("transaction_date"),
                col("merchant_city").alias("top_city")
            )
        
        # Get top MCC category per day
        mcc_window = Window.partitionBy("transaction_date").orderBy(desc("mcc_revenue"))
        
        top_mcc = enriched_df.groupBy("transaction_date", "description") \
            .agg(_sum("amount").alias("mcc_revenue")) \
            .withColumn("mcc_rank", row_number().over(mcc_window)) \
            .filter(col("mcc_rank") == 1) \
            .select(
                col("transaction_date"),
                col("description").alias("top_mcc_category"),
                col("mcc_revenue").alias("top_mcc_revenue")
            )
        
        # Join top values back to daily summary
        daily_summary = daily_summary \
            .join(top_states, "transaction_date", "left") \
            .join(top_cities, "transaction_date", "left") \
            .join(top_mcc, "transaction_date", "left")
        
        # Calculate error rate and success rate
        daily_summary = daily_summary.withColumn(
            "error_rate",
            _round((col("total_errors") / col("total_transactions")) * 100, 2)
        )
        
        daily_summary = daily_summary.withColumn(
            "success_rate",
            _round(100 - col("error_rate"), 2)
        )
        
        # Add date components
        daily_summary = daily_summary.withColumn("year", year(col("transaction_date")))
        # daily_summary = daily_summary.withColumn("month", month(col("transaction_date")))
        daily_summary = daily_summary.withColumn("month", date_format(col("transaction_date"), "MMM"))
        daily_summary = daily_summary.withColumn("week_of_year", weekofyear(col("transaction_date")))
        daily_summary = daily_summary.withColumn(
            "day_of_week", 
            date_format(col("transaction_date"), "EEEE")
        )
        
        # Calculate is_weekend
        daily_summary = daily_summary.withColumn(
            "is_weekend",
            when(dayofweek(col("transaction_date")).isin([1, 7]), True).otherwise(False)
        )
        
        # Add processing timestamp
        daily_summary = daily_summary.withColumn(
            "created_at",
            current_timestamp()
        )
        
        # Select columns in the correct order
        final_columns = [
            # Date dimensions
            "transaction_date",
            "year",
            "month",
            "week_of_year",
            "day_of_week",
            "is_weekend",
            
            # Transaction metrics
            "total_transactions",
            "total_revenue",
            "total_revenue_absolute",
            "average_transaction_amt",
            "min_transaction_amt",
            "max_transaction_amt",
            
            # User demographics
            "avg_customer_age",
            "avg_credit_score",
            "pct_male",
            "pct_female",
            "avg_yearly_income",
            "avg_debt_to_income_ratio",
            
            # Card insights
            "pct_credit_cards",
            "pct_debit_cards",
            "pct_visa",
            "pct_mastercard",
            "cards_on_dark_web_count",
            "avg_credit_limit",
            "avg_credit_utilization",
            
            # Merchant category
            "top_mcc_category",
            "top_mcc_revenue",
            "restaurant_revenue",
            "gas_station_revenue",
            
            # Geographic
            "unique_states",
            "unique_cities",
            "top_state",
            "top_city",
            
            # Risk indicators
            "high_risk_transactions",
            "swipe_over_1000_count",
            "failed_transactions",
            
            # Existing metrics
            "active_users",
            "active_cards",
            "unique_merchants",
            "chip_transactions",
            "swipe_transactions",
            "chip_revenue",
            "swipe_revenue",
            "total_errors",
            "error_rate",
            "success_rate",
            
            # Metadata
            "created_at"
        ]
        
        daily_summary = daily_summary.select(*final_columns)
        
        # Sort by date
        daily_summary = daily_summary.orderBy("transaction_date")

        daily_summary = daily_summary.cache()
        
        self.logger.info(f"aggregation completed: {daily_summary.count()} daily records")
        
        return daily_summary


class PostgreSQLWriter:
    """Writes data to PostgreSQL"""
    
    def __init__(self, config: Config):
        self.config = config
        self.logger = logging.getLogger(__name__)
    
    def write_dataframe(self, df, mode="overwrite"):
        """
        Write DataFrame to PostgreSQL
        
        Args:
            df: DataFrame to write
            mode: Write mode (overwrite, append)
        """
        self.logger.info(f"Writing {df.count()} records to PostgreSQL table: {self.config.postgres_table}")
        
        try:
            df.write \
                .jdbc(
                    url=self.config.jdbc_url,
                    table=self.config.postgres_table,
                    mode=mode,  # 'overwrite' will truncate and reload
                    properties=self.config.jdbc_properties
                )
            
            self.logger.info("Successfully wrote data to PostgreSQL")
            
        except Exception as e:
            self.logger.error(f"Failed to write to PostgreSQL: {str(e)}")
            raise


class GoldLayerETL:
    """Main ETL orchestrator for Gold Layer"""
    
    def __init__(self, app_name="GoldLayerETL"):
        self.app_name = app_name
        self.spark = None
        self.config = None
        self.logger = self._setup_logger()
    
    def _setup_logger(self):
        """Setup logging"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        return logging.getLogger(__name__)
    
    def _create_spark_session(self):
        """Create Spark session with necessary configurations"""
        self.logger.info("Creating Spark session")
        
        self.spark = SparkSession.builder \
                        .appName("app_silver_to_gold_dly_txn_smry") \
                        .master("spark://spark-master:7077") \
                        .config("spark.executor.memory", "1g") \
                        .config("spark.executor.cores", "1") \
                        .config("spark.cores.max", "2") \
                        .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
                        .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
                        .config("spark.hadoop.fs.s3a.secret.key", "minioadmin123") \
                        .config("spark.hadoop.fs.s3a.path.style.access", "true") \
                        .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
                        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
                        .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
                        .config("spark.jars.packages", 
                                "org.apache.hadoop:hadoop-aws:3.3.4,"
                                "com.amazonaws:aws-java-sdk-bundle:1.12.262,"
                                "org.postgresql:postgresql:42.5.4") \
                        .getOrCreate()
        
        self.logger.info(f"Spark session created: {self.spark.version}")
    
    def run(self):
        """Execute the complete ETL pipeline"""
        try:
            start_time = datetime.now()
            self.logger.info("="*70)
            self.logger.info("Starting Gold Layer ETL Job")
            self.logger.info("="*70)
            
            # Step 1: Initialize
            self.config = Config()
            self._create_spark_session()
            
            # Step 2: Read ALL data from Silver layer
            reader = SilverDataReader(self.spark, self.config)
            
            transactions_df = reader.read_transactions()
            transactions_df = transactions_df.withColumnRenamed("id", "txn_id").withColumnRenamed("card_id", "txn_card_id")
            
            users_df = reader.read_users()
            users_df = users_df.withColumnRenamed("id", "user_id")
            
            cards_df = reader.read_cards()
            cards_df = cards_df.withColumnRenamed("id", "card_id").withColumnRenamed("client_id", "card_client_id")
            
            mcc_df = reader.read_mcc_codes()
            
            self.logger.info(f"Loaded - Transactions: {transactions_df.count()}, "
                           f"Users: {users_df.count()}, "
                           f"Cards: {cards_df.count()}, "
                           f"MCC Codes: {mcc_df.count()}")
            
            # Step 3: Transform data with metrics
            aggregator = DailyTransactionAggregator(self.spark)
            daily_summary_df = aggregator.aggregate_daily_summary(
                transactions_df, users_df, cards_df, mcc_df
            )            
            
            # Step 4: Show sample data
            self.logger.info("Sample aggregated data:")
            daily_summary_df.show(3, truncate=False)
            
            # Step 5: Write to PostgreSQL (truncate and reload)
            writer = PostgreSQLWriter(self.config)
            writer.write_dataframe(daily_summary_df, mode="overwrite")
            
            # Step 6: Summary
            end_time = datetime.now()
            duration = (end_time - start_time).total_seconds()
            
            self.logger.info("="*70)
            self.logger.info("ETL Job Completed Successfully!")
            self.logger.info(f"Total Duration: {duration} seconds")
            self.logger.info(f"Daily Records Created: {daily_summary_df.count()}")
            self.logger.info("="*70)
            
        except Exception as e:
            self.logger.error(f"ETL Job Failed: {str(e)}", exc_info=True)
            raise
        
        finally:
            if self.spark:
                self.spark.stop()
                self.logger.info("Spark session stopped")


def main():
    """Entry point for the ETL job"""
    etl = GoldLayerETL(app_name="CreditCard_Gold_ETL")
    etl.run()


if __name__ == "__main__":
    main()

2025-12-27 08:28:24,503 - __main__ - INFO - Starting Gold Layer ETL Job
2025-12-27 08:28:24,505 - __main__ - INFO - Creating Spark session
2025-12-27 08:28:30,215 - __main__ - INFO - Spark session created: 3.5.0
2025-12-27 08:28:30,216 - __main__ - INFO - Reading transactions from s3a://silver/transactions
2025-12-27 08:28:47,364 - __main__ - INFO - Reading users from s3a://silver/users
2025-12-27 08:28:50,100 - __main__ - INFO - Reading cards from s3a://silver/cards
2025-12-27 08:28:50,419 - __main__ - INFO - Reading MCC codes from s3a://silver/mcc_codes
2025-12-27 08:28:54,674 - __main__ - INFO - Loaded - Transactions: 13305915, Users: 2000, Cards: 6146, MCC Codes: 109
2025-12-27 08:28:54,676 - __main__ - INFO - Starting daily aggregation with all tables
2025-12-27 08:30:56,381 - __main__ - INFO - Enriched dataset created with 13305915 records
2025-12-27 08:34:59,186 - __main__ - INFO - aggregation completed: 3591 daily records
2025-12-27 08:34:59,190 - __main__ - INFO - Sample aggre

+----------------+----+-----+------------+-----------+----------+------------------+-------------+----------------------+-----------------------+-------------------+-------------------+----------------+----------------+--------+----------+-----------------+------------------------+----------------+---------------+--------+--------------+-----------------------+----------------+----------------------+----------------+---------------+------------------+-------------------+-------------+-------------+---------+--------+----------------------+---------------------+-------------------+------------+------------+----------------+-----------------+------------------+------------+-------------+------------+----------+------------+--------------------------+
|transaction_date|year|month|week_of_year|day_of_week|is_weekend|total_transactions|total_revenue|total_revenue_absolute|average_transaction_amt|min_transaction_amt|max_transaction_amt|avg_customer_age|avg_credit_score|pct_male|pct_female|av

2025-12-27 08:35:01,276 - __main__ - INFO - Writing 3591 records to PostgreSQL table: daily_transaction_summary
2025-12-27 08:35:04,354 - __main__ - INFO - Successfully wrote data to PostgreSQL
2025-12-27 08:35:04,357 - __main__ - INFO - ETL Job Completed Successfully!
2025-12-27 08:35:04,357 - __main__ - INFO - Total Duration: 399.852901 seconds
2025-12-27 08:35:04,831 - __main__ - INFO - Daily Records Created: 3591
2025-12-27 08:35:05,198 - __main__ - INFO - Spark session stopped
