Extract Data 1.1 

For “Credit Card System,” create a Python and PySpark SQL program to read/extract the following JSON files according to the specifications found in the mapping document.

In [None]:
# Import neccessary Pyspark modules
import pyspark
from pyspark.sql import SparkSession 
# Create or retrieve an existing SparkSession.
# This is the entry point to use DataFrame and SQL functionality in PySpark.
# The 'appName' is used to name your Spark application, which helps when monitoring jobs in Spark UI.
spark = SparkSession.builder.appName('Credit Card Data Loader').getOrCreate()
# Load the 'cdw_sapp_branch.json' JSON file into a DataFrame.
# Load the 'cdw_sapp_credit.json' JSON file into a DataFrame.
# Load the 'cdw_sapp_customer.json' JSON file into a DataFrame.
# The 'option("multiline", "true")' tells Spark to handle JSON entries that span multiple lines.
# This is useful when each record is formatted across several lines for readability.
df_branch = spark.read.option("multiline", "true").json(r"C:\Users\malik.alston\Desktop\Data\Credit Card Dataset Overview\cdw_sapp_branch.json")
df_cc = spark.read.option("multiline", "true").json(r"C:\Users\malik.alston\Desktop\Data\Credit Card Dataset Overview\cdw_sapp_credit.json")
df_customer = spark.read.option("multiline", "true").json(r"C:\Users\malik.alston\Desktop\Data\Credit Card Dataset Overview\cdw_sapp_customer.json")
# The below lines are currently commented out but can be used to visually inspect
# The contents of each DataFrame. They display the top 20 rows by default in a tabular format.
# df_customer.show()
# df_branch.show()
# df_cc.show()

Transform Data

In [2]:
from pyspark.sql.functions import initcap, lower, concat_ws, format_string, col, lpad, lit, concat, when
# initcap()	Capitalize first letter of each word, lower() Convert string to lowercase, concat_ws()	Concatenate with separator, format_string()	String formatting
# col()	Reference to a column, lpad() Left-pad a string, lit()	Add a constant/literal column, concat()	Concatenate columns (no separator), when() Conditional logic
df_customer = df_customer.withColumn("FIRST_NAME", initcap("FIRST_NAME"))
df_customer = df_customer.withColumn("LAST_NAME", initcap("LAST_NAME"))
df_customer = df_customer.withColumn("MIDDLE_NAME", lower("MIDDLE_NAME"))
df_customer = df_customer.withColumn("ADDRESS", concat_ws(",", "STREET_NAME", "APT_NO"))
df_customer = df_customer.withColumn("CUST_PHONE",concat(lit("(XXX)"),col("CUST_PHONE").substr(1, 3),lit("-"),col("CUST_PHONE").substr(4, 4)))

df_branch = df_branch.withColumn("BRANCH_PHONE",concat(lit("(XXX)"),col("BRANCH_PHONE").substr(1, 3),lit("-"),col("BRANCH_PHONE").substr(4, 4)))
df_branch = df_branch.withColumn("BRANCH_ZIP", when(col("BRANCH_ZIP").isNull(), lit("99999")).otherwise(col("BRANCH_ZIP")))

df_cc = df_cc.withColumn("TIMEID", format_string("%04d%02d%02d", col("YEAR"), col("MONTH"), col("DAY")))
#df_customer.show()
#df_branch.show()
#df_cc.show()

Load Data 1.2

In [None]:
mysql_url = "jdbc:mysql://localhost:3306/creditcard_capstone"

mysql_config = {
    "user": "root",
    "password": "password",
    "driver": "com.mysql.cj.jdbc.Driver"
}

df_customer.write.jdbc(
    url=mysql_url,
    table="CDW_SAPP_CUSTOMER",
    mode="append",  # use 'append' if you want to preserve existing data
    properties=mysql_config
)

df_branch.write.jdbc(
    url=mysql_url,
    table="CDW_SAPP_BRANCH",
    mode="append",
    properties=mysql_config
)

df_cc.write.jdbc(
    url=mysql_url,
    table="CDW_SAPP_CREDIT_CARD",
    mode="append",
    properties=mysql_config
)

print("All DataFrames transferred to MySQL.")

Py4JJavaError: An error occurred while calling o91.jdbc.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 3) (10.60.50.20 executor driver): java.sql.BatchUpdateException: Duplicate entry '123454500' for key 'cdw_sapp_customer.PRIMARY'
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at com.mysql.cj.util.Util.handleNewInstance(Util.java:192)
	at com.mysql.cj.util.Util.getInstance(Util.java:167)
	at com.mysql.cj.util.Util.getInstance(Util.java:174)
	at com.mysql.cj.jdbc.exceptions.SQLError.createBatchUpdateException(SQLError.java:224)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchSerially(ClientPreparedStatement.java:853)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchInternal(ClientPreparedStatement.java:435)
	at com.mysql.cj.jdbc.StatementImpl.executeBatch(StatementImpl.java:794)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:753)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1(JdbcUtils.scala:904)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1$adapted(JdbcUtils.scala:903)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1039)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1039)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2433)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry '123454500' for key 'cdw_sapp_customer.PRIMARY'
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:117)
	at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:953)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeUpdateInternal(ClientPreparedStatement.java:1092)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchSerially(ClientPreparedStatement.java:832)
	... 19 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$1(RDD.scala:1039)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:1037)
	at org.apache.spark.sql.Dataset.$anonfun$foreachPartition$1(Dataset.scala:3516)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.Dataset.$anonfun$withNewRDDExecutionId$1(Dataset.scala:4310)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withNewRDDExecutionId(Dataset.scala:4308)
	at org.apache.spark.sql.Dataset.foreachPartition(Dataset.scala:3516)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.saveTable(JdbcUtils.scala:903)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:70)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:869)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:391)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:364)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:251)
	at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:766)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.sql.BatchUpdateException: Duplicate entry '123454500' for key 'cdw_sapp_customer.PRIMARY'
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at com.mysql.cj.util.Util.handleNewInstance(Util.java:192)
	at com.mysql.cj.util.Util.getInstance(Util.java:167)
	at com.mysql.cj.util.Util.getInstance(Util.java:174)
	at com.mysql.cj.jdbc.exceptions.SQLError.createBatchUpdateException(SQLError.java:224)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchSerially(ClientPreparedStatement.java:853)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchInternal(ClientPreparedStatement.java:435)
	at com.mysql.cj.jdbc.StatementImpl.executeBatch(StatementImpl.java:794)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:753)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1(JdbcUtils.scala:904)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1$adapted(JdbcUtils.scala:903)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1039)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1039)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2433)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry '123454500' for key 'cdw_sapp_customer.PRIMARY'
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:117)
	at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:953)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeUpdateInternal(ClientPreparedStatement.java:1092)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchSerially(ClientPreparedStatement.java:832)
	... 19 more


df_customer.printSchema()
df_branch.printSchema()
df_cc.printSchema()

CREATE TABLE CDW_SAPP_CUSTOMER (
    APT_NO VARCHAR(10),
    CREDIT_CARD_NO VARCHAR(20),
    CUST_CITY VARCHAR(50),
    CUST_COUNTRY VARCHAR(50),
    CUST_EMAIL VARCHAR(100),
    CUST_PHONE VARCHAR(20),
    CUST_STATE VARCHAR(50),
    CUST_ZIP VARCHAR(10),
    FIRST_NAME VARCHAR(50),
    LAST_NAME VARCHAR(50),
    LAST_UPDATED VARCHAR(50),
    MIDDLE_NAME VARCHAR(50),
    SSN BIGINT PRIMARY KEY,
    STREET_NAME VARCHAR(100),
    ADDRESS VARCHAR(200) NOT NULL
);


CREATE TABLE CDW_SAPP_BRANCH (
    BRANCH_CITY VARCHAR(50),
    BRANCH_CODE BIGINT PRIMARY KEY,
    BRANCH_NAME VARCHAR(100),
    BRANCH_PHONE VARCHAR(20),
    BRANCH_STATE VARCHAR(50),
    BRANCH_STREET VARCHAR(100),
    BRANCH_ZIP VARCHAR(10),
    LAST_UPDATED VARCHAR(50)
);


CREATE TABLE CDW_SAPP_CREDIT_CARD (
    BRANCH_CODE BIGINT,
    CREDIT_CARD_NO VARCHAR(20),
    CUST_SSN BIGINT,
    DAY INT,
    MONTH INT,
    TRANSACTION_ID BIGINT PRIMARY KEY,
    TRANSACTION_TYPE VARCHAR(50),
    TRANSACTION_VALUE DOUBLE,
    YEAR INT,
    TIMEID VARCHAR(20) NOT NULL
);
