In [1]:
import os
import sys

os.environ['JAVA_HOME'] = r'C:\Program Files\Java\jdk-21'
#os.environ['PYSPARK_HOME'] = r'C:\Users\lpaschoal\pyspark\spark-4.0.1-bin-hadoop3'
os.environ['SPARK_HOME'] = r'C:\Users\lpaschoal\pyspark\spark-4.0.1-bin-hadoop3'
os.environ['HADOOP_HOME'] = r'C:\Users\lpaschoal\pyspark\hadoop\winutils\hadoop-3.3.6'

sys.path.insert(0, os.getenv('SPARK_HOME') + r"\python\pyspark")
sys.path.insert(0, os.getenv('SPARK_HOME') + r"\python\lib\pyspark.zip")
sys.path.insert(0, os.getenv('SPARK_HOME') + r"\python\lib\py4j-0.10.9.9-src.zip")

import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Financial Project")\
    .config("spark.driver.memory", "12g") \
    .config("spark.driver.extraJavaOptions", "--add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED") \
    .config("spark.executor.extraJavaOptions", "--add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED") \
    .getOrCreate()

spark.conf.set("spark.sql.execution.pyspark.udf.faulthandler.enabled", "true")


In [2]:
#Reading CSVs
card_data = spark.read.csv(r"C:\Users\lpaschoal\data_src\financial_data\cards_data.csv",header=True, inferSchema=True).createOrReplaceTempView('cards_data')
trans_data = spark.read.csv(r"C:\Users\lpaschoal\data_src\financial_data\transactions_data.csv",header=True, inferSchema=True).createOrReplaceTempView('trans_data')
user_data = spark.read.csv(r"C:\Users\lpaschoal\data_src\financial_data\users_data.csv",header=True, inferSchema=True).createOrReplaceTempView('users_data')

In [3]:
#Reading JSON
#mcc_codes = spark.read.json(r"C:\Users\lpaschoal\data_src\financial_data\mcc_codes.json",multiLine=True)
#fraud_labels = spark.read.json(r"C:\Users\lpaschoal\data_src\financial_data\train_fraud_labels.json",multiLine=True)

In [4]:
spark.sql('SELECT * FROM users_data').show()

+----+-----------+--------------+----------+-----------+------+--------------------+--------+---------+-----------------+-------------+----------+------------+----------------+
|  id|current_age|retirement_age|birth_year|birth_month|gender|             address|latitude|longitude|per_capita_income|yearly_income|total_debt|credit_score|num_credit_cards|
+----+-----------+--------------+----------+-----------+------+--------------------+--------+---------+-----------------+-------------+----------+------------+----------------+
| 825|         53|            66|      1966|         11|Female|       462 Rose Lane|   34.15|  -117.76|           $29278|       $59696|   $127613|         787|               5|
|1746|         53|            68|      1966|         12|Female|3606 Federal Boul...|   40.76|   -73.74|           $37891|       $77254|   $191349|         701|               5|
|1718|         81|            67|      1938|         11|Female|     766 Third Drive|   34.02|  -117.89|           $

In [5]:
spark.sql('SELECT * FROM trans_data').show()

+-------+-------------------+---------+-------+-------+------------------+-----------+---------------+--------------+-------+----+------+
|     id|               date|client_id|card_id| amount|          use_chip|merchant_id|  merchant_city|merchant_state|    zip| mcc|errors|
+-------+-------------------+---------+-------+-------+------------------+-----------+---------------+--------------+-------+----+------+
|7475327|2010-01-01 00:01:00|     1556|   2972|$-77.00| Swipe Transaction|      59935|         Beulah|            ND|58523.0|5499|  NULL|
|7475328|2010-01-01 00:02:00|      561|   4575| $14.57| Swipe Transaction|      67570|     Bettendorf|            IA|52722.0|5311|  NULL|
|7475329|2010-01-01 00:02:00|     1129|    102| $80.00| Swipe Transaction|      27092|          Vista|            CA|92084.0|4829|  NULL|
|7475331|2010-01-01 00:05:00|      430|   2860|$200.00| Swipe Transaction|      27092|    Crown Point|            IN|46307.0|4829|  NULL|
|7475332|2010-01-01 00:06:00|     

In [6]:
spark.sql("""SELECT * FROM cards_data""").show()

+----+---------+----------+---------------+----------------+-------+---+--------+----------------+------------+--------------+---------------------+----------------+
|  id|client_id|card_brand|      card_type|     card_number|expires|cvv|has_chip|num_cards_issued|credit_limit|acct_open_date|year_pin_last_changed|card_on_dark_web|
+----+---------+----------+---------------+----------------+-------+---+--------+----------------+------------+--------------+---------------------+----------------+
|4524|      825|      Visa|          Debit|4344676511950444|12/2022|623|     YES|               2|      $24295|       09/2002|                 2008|              No|
|2731|      825|      Visa|          Debit|4956965974959986|12/2020|393|     YES|               2|      $21968|       04/2014|                 2014|              No|
|3701|      825|      Visa|          Debit|4582313478255491|02/2024|719|     YES|               2|      $46414|       07/2003|                 2004|              No|
|  4

In [7]:
df_trans_init = spark.sql("""SELECT date_format(date, 'yyyy-MM') as trans_date, client_id,
SUM(
    COALESCE(
        TRY_CAST(
            CASE
                WHEN REGEXP_REPLACE(amount, '[^0-9.-]', '') LIKE '%-'
                THEN CONCAT('-', REGEXP_REPLACE(REGEXP_REPLACE(amount, '[^0-9.-]', ''), '-', ''))
                ELSE REGEXP_REPLACE(amount, '[^0-9.-]', '')
            END
            AS FLOAT
        ),
    0
    )
) as sum
FROM trans_data
GROUP BY client_id, date_format(date, 'yyyy-MM')""").createOrReplaceTempView('trans_sum')

In [8]:
# Informations about the transactions total amount per year and month per user between 50 and 60 years 

df_users = spark.sql("SELECT * from users_data \
                    where current_age between 50 and 60 order by current_age").createOrReplaceTempView('df_users')

df_trans_year_month = spark.sql("select client_id, users.gender, users.per_capita_income, sum, trans_date from trans_sum\
                                left join df_users as users on trans_sum.client_id = users.id").createOrReplaceTempView('users_percapta_income')

In [13]:
spark.sql("""SELECT distinct users_percapta_income.client_id, gender, per_capita_income, sum, trans_date, ca.card_brand
          FROM users_percapta_income 
          JOIN cards_data as ca ON users_percapta_income.client_id = ca.client_id
          where gender IS NOT NULL
          """).createOrReplaceTempView('trans_amount_year')

In [15]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType

sc = spark.sparkContext
map1 = {"Mastercard":"M", "Visa":"V", "Amex":"A"}

bcv_card = sc.broadcast(map1)

spark.sql("""SELECT distinct client_id, gender, per_capita_income, sum, trans_date, card_brand
          FROM trans_amount_year""")

def convert(card_brand):
    return bcv_card.value.get(str(card_brand), card_brand)  # Force str


convert_status = udf(convert,StringType())
df = spark.sql("""SELECT distinct client_id, gender, per_capita_income, sum, trans_date, card_brand
          FROM trans_amount_year""")
final_df = df.withColumn("card_brand", convert_status(col("card_brand")))

In [16]:
final_df.show()


+---------+------+-----------------+------------------+----------+----------+
|client_id|gender|per_capita_income|               sum|trans_date|card_brand|
+---------+------+-----------------+------------------+----------+----------+
|      216|Female|           $16273|2565.7799944877625|   2010-03|         V|
|      216|Female|           $16273|2565.7799944877625|   2010-03|         M|
|     1782|Female|           $19438|  2920.31004011631|   2010-06|         M|
|     1782|Female|           $19438|  2920.31004011631|   2010-06|         V|
|     1300|  Male|           $11324| 2387.100007534027|   2010-06|         M|
|      375|Female|           $18444|1813.5099789500237|   2010-01|         M|
|     1749|Female|            $8658|1104.5699954032898|   2010-01|         M|
|      100|  Male|           $24005| 2740.220016002655|   2010-02|         M|
|      100|  Male|           $24005| 2740.220016002655|   2010-02|         V|
|      692|Female|           $20942|3608.3399965167046|   2010-0

In [None]:
final_df.count()

69557

: 