In [1]:
import pyspark
import os
import json
import argparse
from dotenv import load_dotenv
from pathlib import Path
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import *

In [2]:
from pyspark.sql import SparkSession
spark_session = SparkSession.builder\
  .appName('dibimbing_finpro')\
  .master('local')\
  .getOrCreate()
# spark_context_new = spark_session.sparkContext
# spark_context_new

In [3]:
# Created schema for calender.csv and  extract into a dataframe

schema_date = StructType([
    StructField('TransactionID', StringType()),
    StructField('CustomerID', StringType()),
    StructField('CustomerDOB', StringType()),
    StructField('CustGender', StringType()),
    StructField('CustLocation', StringType()),
    StructField('CustAccountBalance', FloatType()),
    StructField('TransactionDate', StringType()),
    StructField('TransactionTime', StringType()),
    StructField('TransactionAmoun', FloatType())
])
df_data = spark_session.read.csv('../..//resources/data/bank_transactions.csv', header=True, schema=schema_date)

In [4]:
df_data.show(5)

+-------------+----------+-----------+----------+------------+------------------+---------------+---------------+----------------+
|TransactionID|CustomerID|CustomerDOB|CustGender|CustLocation|CustAccountBalance|TransactionDate|TransactionTime|TransactionAmoun|
+-------------+----------+-----------+----------+------------+------------------+---------------+---------------+----------------+
|           T1|  C5841053|    10/1/94|         F|  JAMSHEDPUR|          17819.05|         2/8/16|         143207|            25.0|
|           T2|  C2142763|     4/4/57|         M|     JHAJJAR|           2270.69|         2/8/16|         141858|         27999.0|
|           T3|  C4417068|   26/11/96|         F|      MUMBAI|          17874.44|         2/8/16|         142712|           459.0|
|           T4|  C5342380|    14/9/73|         F|      MUMBAI|          866503.2|         2/8/16|         142714|          2060.0|
|           T5|  C9031234|    24/3/88|         F| NAVI MUMBAI|           6714.43|  

In [5]:
df_data = df_data.withColumn("CustomerDOB", F.expr("replace(CustomerDOB, '/', '-')"))
df_data = df_data.withColumn("TransactionDate", F.expr("replace(TransactionDate, '/', '-')"))

In [6]:
df_data.show(10)

+-------------+----------+-----------+----------+------------+------------------+---------------+---------------+----------------+
|TransactionID|CustomerID|CustomerDOB|CustGender|CustLocation|CustAccountBalance|TransactionDate|TransactionTime|TransactionAmoun|
+-------------+----------+-----------+----------+------------+------------------+---------------+---------------+----------------+
|           T1|  C5841053|    10-1-94|         F|  JAMSHEDPUR|          17819.05|         2-8-16|         143207|            25.0|
|           T2|  C2142763|     4-4-57|         M|     JHAJJAR|           2270.69|         2-8-16|         141858|         27999.0|
|           T3|  C4417068|   26-11-96|         F|      MUMBAI|          17874.44|         2-8-16|         142712|           459.0|
|           T4|  C5342380|    14-9-73|         F|      MUMBAI|          866503.2|         2-8-16|         142714|          2060.0|
|           T5|  C9031234|    24-3-88|         F| NAVI MUMBAI|           6714.43|  

left_df = df.withColumn("left_text_2", F.expr("substring(text, length(text)-1, 2)"))

(df
    .withColumn('y', F.substring('col', 0, 2).cast('int'))
    .withColumn('y', F
        .when(F.col('y') <= 68, F.col('y') + 2000)
        .otherwise(F.col('y') + 1900)
    )
    .withColumn('t_date', F.concat('y', F.regexp_replace('col', '(\d{2})(\d{2})(\d{2})', '-$2-$3')))
    .show()
)

 Output
 +---+------+----+----------+
 | id|   col|   y|    t_date|
 +---+------+----+----------+
 |  1|210927|2021|2021-09-27|
 |  2|910927|1991|1991-09-27|
 +---+------+----+----------+

In [7]:
df_data = df_data.withColumn("y_DOB", F.expr("substring(CustomerDOB, length(CustomerDOB)-1, 2)").cast('int'))
df_data = df_data.withColumn("y_transac", F.expr("substring(TransactionDate, length(TransactionDate)-1, 2)").cast('int'))

In [8]:
df_data.show(10)

+-------------+----------+-----------+----------+------------+------------------+---------------+---------------+----------------+-----+---------+
|TransactionID|CustomerID|CustomerDOB|CustGender|CustLocation|CustAccountBalance|TransactionDate|TransactionTime|TransactionAmoun|y_DOB|y_transac|
+-------------+----------+-----------+----------+------------+------------------+---------------+---------------+----------------+-----+---------+
|           T1|  C5841053|    10-1-94|         F|  JAMSHEDPUR|          17819.05|         2-8-16|         143207|            25.0|   94|       16|
|           T2|  C2142763|     4-4-57|         M|     JHAJJAR|           2270.69|         2-8-16|         141858|         27999.0|   57|       16|
|           T3|  C4417068|   26-11-96|         F|      MUMBAI|          17874.44|         2-8-16|         142712|           459.0|   96|       16|
|           T4|  C5342380|    14-9-73|         F|      MUMBAI|          866503.2|         2-8-16|         142714|     

In [9]:
df_data = df_data.withColumn("f_CustomerDOB", 
                             F.when(F.col("y_DOB") <=  30, F.col('y_DOB') + 2000)
                             .otherwise(F.col('y_DOB') + 1900))

df_data = df_data.withColumn("f_TransactionDate", 
                             F.when(F.col("y_transac") <=  30, F.col('y_transac') + 2000)
                             .otherwise(F.col('y_transac') + 1900))

In [10]:
df_data.show(10)

+-------------+----------+-----------+----------+------------+------------------+---------------+---------------+----------------+-----+---------+-------------+-----------------+
|TransactionID|CustomerID|CustomerDOB|CustGender|CustLocation|CustAccountBalance|TransactionDate|TransactionTime|TransactionAmoun|y_DOB|y_transac|f_CustomerDOB|f_TransactionDate|
+-------------+----------+-----------+----------+------------+------------------+---------------+---------------+----------------+-----+---------+-------------+-----------------+
|           T1|  C5841053|    10-1-94|         F|  JAMSHEDPUR|          17819.05|         2-8-16|         143207|            25.0|   94|       16|         1994|             2016|
|           T2|  C2142763|     4-4-57|         M|     JHAJJAR|           2270.69|         2-8-16|         141858|         27999.0|   57|       16|         1957|             2016|
|           T3|  C4417068|   26-11-96|         F|      MUMBAI|          17874.44|         2-8-16|        

In [11]:
df_data = df_data.withColumn("f_CustomerDOB", F.concat(F.expr("substring(CustomerDOB, 1, length(CustomerDOB)-2)"), df_data.f_CustomerDOB))

df_data = df_data.withColumn("f_TransactionDate", F.concat(F.expr("substring(TransactionDate, 1, length(TransactionDate)-2)"), df_data.f_TransactionDate))

In [13]:
df_data = df_data.withColumn("TransactionTime", 
                             F.expr("concat(substring(TransactionTime, 1, 2), ':', substring(TransactionTime, 3, 2), ':', substring(TransactionTime, 5, 2))"))

In [14]:
df_data.show(10)

+-------------+----------+-----------+----------+------------+------------------+---------------+---------------+----------------+-----+---------+-------------+-----------------+
|TransactionID|CustomerID|CustomerDOB|CustGender|CustLocation|CustAccountBalance|TransactionDate|TransactionTime|TransactionAmoun|y_DOB|y_transac|f_CustomerDOB|f_TransactionDate|
+-------------+----------+-----------+----------+------------+------------------+---------------+---------------+----------------+-----+---------+-------------+-----------------+
|           T1|  C5841053|    10-1-94|         F|  JAMSHEDPUR|          17819.05|         2-8-16|       14:32:07|            25.0|   94|       16|    10-1-1994|         2-8-2016|
|           T2|  C2142763|     4-4-57|         M|     JHAJJAR|           2270.69|         2-8-16|       14:18:58|         27999.0|   57|       16|     4-4-1957|         2-8-2016|
|           T3|  C4417068|   26-11-96|         F|      MUMBAI|          17874.44|         2-8-16|       1

In [15]:
df_data = df_data.drop("CustomerDOB","TransactionDate", "y_DOB","y_transac")

In [17]:
df_data.show(5)

+-------------+----------+----------+------------+------------------+---------------+----------------+-------------+-----------------+
|TransactionID|CustomerID|CustGender|CustLocation|CustAccountBalance|TransactionTime|TransactionAmoun|f_CustomerDOB|f_TransactionDate|
+-------------+----------+----------+------------+------------------+---------------+----------------+-------------+-----------------+
|           T1|  C5841053|         F|  JAMSHEDPUR|          17819.05|       14:32:07|            25.0|    10-1-1994|         2-8-2016|
|           T2|  C2142763|         M|     JHAJJAR|           2270.69|       14:18:58|         27999.0|     4-4-1957|         2-8-2016|
|           T3|  C4417068|         F|      MUMBAI|          17874.44|       14:27:12|           459.0|   26-11-1996|         2-8-2016|
|           T4|  C5342380|         F|      MUMBAI|          866503.2|       14:27:14|          2060.0|    14-9-1973|         2-8-2016|
|           T5|  C9031234|         F| NAVI MUMBAI|     

In [18]:
df_dim_transaction = df_data.select("TransactionID","f_TransactionDate", "TransactionTime", "TransactionAmoun")

In [22]:
df_dim_Customer = df_data.select("CustomerID","CustGender", "CustLocation", "CustAccountBalance", "f_CustomerDOB")\
                         .dropDuplicates()

In [26]:
df_fact_transac = df_data.select("TransactionID", "CustomerID")

In [27]:
# Stop the Spark session
spark_session.stop()