In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace, col
from pyspark.sql.types import *

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("DevUsersLoader") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin123") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()

# Define schema
schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("current_age", IntegerType(), False),
    StructField("retirement_age", IntegerType(), False),
    StructField("birth_year", IntegerType(), False),
    StructField("birth_month", IntegerType(), False),
    StructField("gender", StringType(), False),
    StructField("address", StringType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("per_capita_income", StringType(), True),  # Read as string first
    StructField("yearly_income", StringType(), True),      # Read as string first
    StructField("total_debt", StringType(), True),         # Read as string first
    StructField("credit_score", IntegerType(), True),
    StructField("num_credit_cards", IntegerType(), True)
])

# Read CSV from MinIO
print("Reading users data from MinIO...")
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv("s3a://dev/users_data.csv")

# Transform data - strip $ signs and convert to decimal
print("Transforming data...")
df_transformed = df \
    .withColumn("per_capita_income", 
                regexp_replace(col("per_capita_income"), "\\$", "").cast(DecimalType(12, 2))) \
    .withColumn("yearly_income", 
                regexp_replace(col("yearly_income"), "\\$", "").cast(DecimalType(12, 2))) \
    .withColumn("total_debt", 
                regexp_replace(col("total_debt"), "\\$", "").cast(DecimalType(12, 2))) \
    .withColumn("latitude", col("latitude").cast(DecimalType(10, 6))) \
    .withColumn("longitude", col("longitude").cast(DecimalType(10, 6)))

# Show sample data
print("Sample transformed data:")
df_transformed.show(5, truncate=False)

# Print schema
print("Final Schema:")
df_transformed.printSchema()

# Write to PostgreSQL
print("Writing to PostgreSQL...")
df_transformed.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://postgres:5432/gold_db") \
    .option("dbtable", "users") \
    .option("user", "postgres") \
    .option("password", "postgres") \
    .option("driver", "org.postgresql.Driver") \
    .mode("append") \
    .save()

print(f"Successfully loaded {df_transformed.count()} users to PostgreSQL!")

# Stop Spark session
spark.stop()

Reading users data from MinIO...
Transforming data...
Sample transformed data:
+----+-----------+--------------+----------+-----------+------+------------------------+---------+-----------+-----------------+-------------+----------+------------+----------------+
|id  |current_age|retirement_age|birth_year|birth_month|gender|address                 |latitude |longitude  |per_capita_income|yearly_income|total_debt|credit_score|num_credit_cards|
+----+-----------+--------------+----------+-----------+------+------------------------+---------+-----------+-----------------+-------------+----------+------------+----------------+
|825 |53         |66            |1966      |11         |Female|462 Rose Lane           |34.150000|-117.760000|29278.00         |59696.00     |127613.00 |787         |5               |
|1746|53         |68            |1966      |12         |Female|3606 Federal Boulevard  |40.760000|-73.740000 |37891.00         |77254.00     |191349.00 |701         |5               |
|