# ETL CODE
---

## Extract data from datafiles and API


In [1]:
import requests
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType, FloatType
from pyspark.sql.functions import col, initcap, lower, concat_ws, concat, lit, substring, when, lpad

spark = SparkSession.builder.appName("credit_card").getOrCreate()

Create the schemas for the data

In [2]:
schema_branches = StructType([
                            StructField("BRANCH_CODE", IntegerType(), False),
                            StructField("BRANCH_NAME", StringType()), 
                            StructField("BRANCH_STREET", StringType()),
                            StructField("BRANCH_CITY", StringType()),
                            StructField("BRANCH_STATE", StringType()),
                            StructField("BRANCH_ZIP", StringType()),
                            StructField("BRANCH_PHONE", StringType()),
                            StructField("LAST_UPDATED", TimestampType())
])

schema_customers = StructType([
                            StructField("FIRST_NAME", StringType()),
                            StructField("MIDDLE_NAME", StringType()),
                            StructField("LAST_NAME", StringType()),
                            StructField("SSN", IntegerType(), False),
                            StructField("CREDIT_CARD_NO", StringType()),
                            StructField("APT_NO", StringType()),
                            StructField("STREET_NAME", StringType()),
                            StructField("CUST_CITY", StringType()),
                            StructField("CUST_STATE", StringType()),
                            StructField("CUST_COUNTRY", StringType()),
                            StructField("CUST_ZIP", StringType()),
                            StructField("CUST_PHONE", StringType()),
                            StructField("CUST_EMAIL", StringType()),
                            StructField("LAST_UPDATED", TimestampType())
])

schema_cc_transactions = StructType([
                            StructField("TRANSACTION_ID", IntegerType(), False),
                            StructField("DAY", IntegerType()),
                            StructField("MONTH", IntegerType()),
                            StructField("YEAR", IntegerType()),
                            StructField("CREDIT_CARD_NO", StringType()),
                            StructField("CUST_SSN", StringType()),
                            StructField("BRANCH_CODE", IntegerType()),
                            StructField("TRANSACTION_TYPE", StringType()),
                            StructField("TRANSACTION_VALUE", FloatType())
])

schema_loans = StructType([
                            StructField("Application_ID", StringType()),
                            StructField("Gender", StringType()),
                            StructField("Married", StringType()),
                            StructField("Dependents", StringType()),
                            StructField("Education", StringType()),
                            StructField("Self_Employed", StringType()),
                            StructField("Credit_History", StringType()),
                            StructField("Property_Area", StringType()),
                            StructField("Income", StringType()),
                            StructField("Application_Status", StringType())
])


In [3]:
resp = requests.get("https://raw.githubusercontent.com/platformps/LoanDataset/main/loan_data.json")
customers_df = spark.createDataFrame(resp.json(), schema=schema_loans)
customers_df.show(5)

+--------------+------+-------+----------+------------+-------------+--------------+-------------+------+------------------+
|Application_ID|Gender|Married|Dependents|   Education|Self_Employed|Credit_History|Property_Area|Income|Application_Status|
+--------------+------+-------+----------+------------+-------------+--------------+-------------+------+------------------+
|      LP001002|  Male|     No|         0|    Graduate|           No|             1|        Urban|medium|                 Y|
|      LP001003|  Male|    Yes|         1|    Graduate|           No|             1|        Rural|medium|                 N|
|      LP001005|  Male|    Yes|         0|    Graduate|          Yes|             1|        Urban|   low|                 Y|
|      LP001006|  Male|    Yes|         0|Not Graduate|           No|             1|        Urban|   low|                 Y|
|      LP001008|  Male|     No|         0|    Graduate|           No|             1|        Urban|medium|                 Y|


In [4]:
branches_df = spark.read.json("../data_files/cdw_sapp_branch.json", schema=schema_branches)
branches_df.show(5)

customers_df = spark.read.json("../data_files/cdw_sapp_customer.json", schema=schema_customers)
customers_df.show(5)

cc_transactions_df = spark.read.json("../data_files/cdw_sapp_credit.json", schema=schema_cc_transactions)
cc_transactions_df.show(5)

resp = requests.get("https://raw.githubusercontent.com/platformps/LoanDataset/main/loan_data.json")
loan_df = spark.createDataFrame(resp.json(), schema=schema_loans)
loan_df.show(5)



+-----------+------------+-----------------+-----------------+------------+----------+------------+-------------------+
|BRANCH_CODE| BRANCH_NAME|    BRANCH_STREET|      BRANCH_CITY|BRANCH_STATE|BRANCH_ZIP|BRANCH_PHONE|       LAST_UPDATED|
+-----------+------------+-----------------+-----------------+------------+----------+------------+-------------------+
|          1|Example Bank|     Bridle Court|        Lakeville|          MN|     55044|  1234565276|2018-04-18 15:51:47|
|        999|   TEST Bank|      Night Court|         TreeCity|          MM|      null|  1234567890|2018-04-18 15:51:47|
|          2|Example Bank|Washington Street|          Huntley|          IL|     60142|  1234618993|2018-04-18 15:51:47|
|          3|Example Bank|    Warren Street|SouthRichmondHill|          NY|     11419|  1234985926|2018-04-18 15:51:47|
|          4|Example Bank| Cleveland Street|       Middleburg|          FL|     32068|  1234663064|2018-04-18 15:51:47|
+-----------+------------+--------------

## Transformation with PySpark


Transforming customer data

In [5]:
# Convert the first_name to Title case
print("Convert the first_name to Title case")
customers_df = customers_df.withColumn("FIRST_NAME", initcap(col("FIRST_NAME")))
customers_df.show(3)

# Convert the middle name in lower case
print("Convert the middle name in lower case")
customers_df = customers_df.withColumn("MIDDLE_NAME", lower(col("MIDDLE_NAME")))
customers_df.show(3)

# Convert the Last Name in Title Case
print("Convert the Last Name in Title Case")
customers_df = customers_df.withColumn("LAST_NAME", initcap(col("LAST_NAME")))
customers_df.show(3)

# Concatenate Apartment no and Street name of customer's Residence with comma as a seperator (Street, Apartment)
print("Concatenate Apartment no and Street name of customer's Residence with comma as a seperator (Street, Apartment)")
customers_df2 = customers_df.withColumn("STREET_NAME", concat_ws(', ',col("STREET_NAME"),col("APT_NO")))
customers_df2 = customers_df2.withColumnRenamed("STREET_NAME", "FULL_STREET_ADDRESS")
customers_df2 = customers_df2.drop("APT_NO")
customers_df2.show(3)

# Change the format of phone number to (XXX)XXX-XXXX....the data ONLY has 7 digits NOT 10, so changing format to:  XXX-XXXX
customers_df3 = customers_df2.withColumn("CUST_PHONE", concat_ws("-",substring(col("CUST_PHONE"), 0, 3),substring(col("CUST_PHONE"), 4, 7)))
customers_df3.show(3)

# Rearranging columns
customers_df3 = customers_df3.select(
                                    "SSN" ,
                                    "FIRST_NAME" ,
                                    "MIDDLE_NAME" ,
                                    "LAST_NAME" ,
                                    "CREDIT_CARD_NO" ,
                                    "FULL_STREET_ADDRESS",
                                    "CUST_CITY" ,
                                    "CUST_STATE" ,
                                    "CUST_COUNTRY" ,
                                    "CUST_ZIP" ,
                                    "CUST_PHONE" ,
                                    "CUST_EMAIL" ,
                                    "LAST_UPDATED"
)
customers_df3.show(3)


Convert the first_name to Title case
+----------+-----------+---------+---------+----------------+------+-----------------+------------+----------+-------------+--------+----------+-------------------+-------------------+
|FIRST_NAME|MIDDLE_NAME|LAST_NAME|      SSN|  CREDIT_CARD_NO|APT_NO|      STREET_NAME|   CUST_CITY|CUST_STATE| CUST_COUNTRY|CUST_ZIP|CUST_PHONE|         CUST_EMAIL|       LAST_UPDATED|
+----------+-----------+---------+---------+----------------+------+-----------------+------------+----------+-------------+--------+----------+-------------------+-------------------+
|      Alec|         Wm|   Hooper|123456100|4210653310061055|   656|Main Street North|     Natchez|        MS|United States|   39120|   1237818|AHooper@example.com|2018-04-21 11:49:02|
|      Etta|    Brendan|   Holman|123453023|4210653310102868|   829|    Redwood Drive|Wethersfield|        CT|United States|   06109|   1238933|EHolman@example.com|2018-04-21 11:49:02|
|    Wilber|   Ezequiel|   Dunham|1234

Transforming branch data

In [6]:
# If the BRANCH_ZIP source value is null load default (99999) value else Direct move
branches_df = branches_df.withColumn("BRANCH_ZIP", when(col("BRANCH_ZIP").isNull(), 99999).otherwise(col("BRANCH_ZIP")))
branches_df.show(5)


# Change the format of phone number to (XXX)XXX-XXXX
branches_df2 = branches_df.withColumn("BRANCH_PHONE", 
                                      concat(
                                            lit("("),
                                            substring(col("BRANCH_PHONE"), 0, 3),
                                            lit(")"),
                                            substring(col("BRANCH_PHONE"), 4, 3),
                                            lit("-"),
                                            substring(col("BRANCH_PHONE"), 7, 4)
                                             ))
branches_df2.show(3)


+-----------+------------+-----------------+-----------------+------------+----------+------------+-------------------+
|BRANCH_CODE| BRANCH_NAME|    BRANCH_STREET|      BRANCH_CITY|BRANCH_STATE|BRANCH_ZIP|BRANCH_PHONE|       LAST_UPDATED|
+-----------+------------+-----------------+-----------------+------------+----------+------------+-------------------+
|          1|Example Bank|     Bridle Court|        Lakeville|          MN|     55044|  1234565276|2018-04-18 15:51:47|
|        999|   TEST Bank|      Night Court|         TreeCity|          MM|     99999|  1234567890|2018-04-18 15:51:47|
|          2|Example Bank|Washington Street|          Huntley|          IL|     60142|  1234618993|2018-04-18 15:51:47|
|          3|Example Bank|    Warren Street|SouthRichmondHill|          NY|     11419|  1234985926|2018-04-18 15:51:47|
|          4|Example Bank| Cleveland Street|       Middleburg|          FL|     32068|  1234663064|2018-04-18 15:51:47|
+-----------+------------+--------------

Transforming credit card transaction data

In [7]:
# Convert DAY, MONTH, and YEAR into a TIMEID (YYYYMMDD)
cc_transactions_df2 = cc_transactions_df.withColumn("DAY", concat("YEAR", lpad("MONTH", 2, '0'), lpad("DAY", 2, '0')))\
                                        .withColumnRenamed("DAY","TIMEID")\
                                        .drop("MONTH")\
                                        .drop("YEAR")
cc_transactions_df2.show(3)

+--------------+--------+----------------+---------+-----------+----------------+-----------------+
|TRANSACTION_ID|  TIMEID|  CREDIT_CARD_NO| CUST_SSN|BRANCH_CODE|TRANSACTION_TYPE|TRANSACTION_VALUE|
+--------------+--------+----------------+---------+-----------+----------------+-----------------+
|             1|20180214|4210653349028689|123459988|        114|       Education|             78.9|
|             2|20180320|4210653349028689|123459988|         35|   Entertainment|            14.24|
|             3|20180708|4210653349028689|123459988|        160|         Grocery|             56.7|
+--------------+--------+----------------+---------+-----------+----------------+-----------------+
only showing top 3 rows



## Load into database

First I updated the create_tables.sql script to reflect the transformations made above

In [12]:
conn_url = "jdbc:mysql://mikey.helioho.st:3306/michaelwschmidt_creditcard_capstone",
table = "cdw_sapp_customer",
conn_mode = "append"
with open('../.secrets', 'r') as f:
    CREDITCARD_CAPSTONE_PASSWORD = f.read()

conn_prop = {
            "driver": "com.mysql.cj.jdbc.Driver",
            "user": "michaelwschmidt_cc_capstone",
            "password": CREDITCARD_CAPSTONE_PASSWORD,
            "dbtable": "michaelwschmidt_creditcard_capstone.cdw_sapp_customer",
            "header": False
}



customers_df3.write.format("jdbc").mode("append")\
                .option("driver", "com.mysql.cj.jdbc.Driver")\
                .option("url", "jdbc:mysql://tommy2.heliohost.org:3306/michaelwschmidt_creditcard_capstone")\
                .option("user", "michaelwschmidt_cc_capstone")\
                .option("dbtable","cdw_sapp_customer")\
                .option("password", CREDITCARD_CAPSTONE_PASSWORD)\
                .option("header", "false")\
                .save()

branches_df2.write.format("jdbc").mode("append")\
                .option("driver", "com.mysql.cj.jdbc.Driver")\
                .option("url", "jdbc:mysql://mikey.helioho.st:3306/michaelwschmidt_creditcard_capstone")\
                .option("user", "michaelwschmidt_cc_capstone")\
                .option("dbtable","cdw_sapp_branch")\
                .option("password", CREDITCARD_CAPSTONE_PASSWORD)\
                .option("header", "false")\
                .save()


cc_transactions_df2.write.format("jdbc").mode("append")\
                .option("driver", "com.mysql.cj.jdbc.Driver")\
                .option("url", "jdbc:mysql://mikey.helioho.st:3306/michaelwschmidt_creditcard_capstone")\
                .option("user", "michaelwschmidt_cc_capstone")\
                .option("dbtable","cdw_sapp_credit_card")\
                .option("password", CREDITCARD_CAPSTONE_PASSWORD)\
                .option("header", "false")\
                .save()


Py4JJavaError: An error occurred while calling o254.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 20.0 failed 1 times, most recent failure: Lost task 1.0 in stage 20.0 (TID 23) (Delltop executor driver): java.sql.SQLNonTransientConnectionException: No operations allowed after connection closed.
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:110)
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:89)
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:63)
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:73)
	at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:73)
	at com.mysql.cj.jdbc.ConnectionImpl.rollback(ConnectionImpl.java:1862)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:775)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1(JdbcUtils.scala:891)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1$adapted(JdbcUtils.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1009)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1009)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2303)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: com.mysql.cj.exceptions.ConnectionIsClosedException: No operations allowed after connection closed.
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:61)
	at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:105)
	at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:151)
	at com.mysql.cj.NativeSession.checkClosed(NativeSession.java:1171)
	at com.mysql.cj.jdbc.ConnectionImpl.checkClosed(ConnectionImpl.java:573)
	at com.mysql.cj.jdbc.ConnectionImpl.rollback(ConnectionImpl.java:1816)
	... 15 more
Caused by: com.mysql.cj.exceptions.CJCommunicationsException: Communications link failure

The last packet successfully received from the server was 1,681 milliseconds ago. The last packet sent successfully to the server was 1,681 milliseconds ago.
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:61)
	at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:105)
	at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:151)
	at com.mysql.cj.exceptions.ExceptionFactory.createCommunicationsException(ExceptionFactory.java:167)
	at com.mysql.cj.protocol.a.NativeProtocol.readMessage(NativeProtocol.java:546)
	at com.mysql.cj.protocol.a.NativeProtocol.checkErrorMessage(NativeProtocol.java:710)
	at com.mysql.cj.protocol.a.NativeProtocol.sendCommand(NativeProtocol.java:649)
	at com.mysql.cj.protocol.a.NativeProtocol.sendQueryPacket(NativeProtocol.java:948)
	at com.mysql.cj.NativeSession.execSQL(NativeSession.java:1075)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:930)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeUpdateInternal(ClientPreparedStatement.java:1092)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchSerially(ClientPreparedStatement.java:832)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchInternal(ClientPreparedStatement.java:435)
	at com.mysql.cj.jdbc.StatementImpl.executeBatch(StatementImpl.java:794)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:735)
	... 14 more
Caused by: java.net.SocketException: Software caused connection abort: recv failed
	at java.net.SocketInputStream.socketRead0(Native Method)
	at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
	at java.net.SocketInputStream.read(SocketInputStream.java:171)
	at java.net.SocketInputStream.read(SocketInputStream.java:141)
	at com.mysql.cj.protocol.ReadAheadInputStream.fill(ReadAheadInputStream.java:107)
	at com.mysql.cj.protocol.ReadAheadInputStream.readFromUnderlyingStreamIfNecessary(ReadAheadInputStream.java:150)
	at com.mysql.cj.protocol.ReadAheadInputStream.read(ReadAheadInputStream.java:180)
	at java.io.FilterInputStream.read(FilterInputStream.java:133)
	at com.mysql.cj.protocol.FullReadInputStream.readFully(FullReadInputStream.java:64)
	at com.mysql.cj.protocol.a.SimplePacketReader.readHeader(SimplePacketReader.java:63)
	at com.mysql.cj.protocol.a.SimplePacketReader.readHeader(SimplePacketReader.java:45)
	at com.mysql.cj.protocol.a.TimeTrackingPacketReader.readHeader(TimeTrackingPacketReader.java:52)
	at com.mysql.cj.protocol.a.TimeTrackingPacketReader.readHeader(TimeTrackingPacketReader.java:41)
	at com.mysql.cj.protocol.a.MultiPacketReader.readHeader(MultiPacketReader.java:54)
	at com.mysql.cj.protocol.a.MultiPacketReader.readHeader(MultiPacketReader.java:44)
	at com.mysql.cj.protocol.a.NativeProtocol.readMessage(NativeProtocol.java:540)
	... 24 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2284)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2328)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$1(RDD.scala:1009)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
	at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:1007)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.saveTable(JdbcUtils.scala:890)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:70)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:133)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:856)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:387)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:360)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.sql.SQLNonTransientConnectionException: No operations allowed after connection closed.
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:110)
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:89)
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:63)
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:73)
	at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:73)
	at com.mysql.cj.jdbc.ConnectionImpl.rollback(ConnectionImpl.java:1862)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:775)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1(JdbcUtils.scala:891)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1$adapted(JdbcUtils.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1009)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1009)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2303)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: com.mysql.cj.exceptions.ConnectionIsClosedException: No operations allowed after connection closed.
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:61)
	at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:105)
	at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:151)
	at com.mysql.cj.NativeSession.checkClosed(NativeSession.java:1171)
	at com.mysql.cj.jdbc.ConnectionImpl.checkClosed(ConnectionImpl.java:573)
	at com.mysql.cj.jdbc.ConnectionImpl.rollback(ConnectionImpl.java:1816)
	... 15 more
Caused by: com.mysql.cj.exceptions.CJCommunicationsException: Communications link failure

The last packet successfully received from the server was 1,681 milliseconds ago. The last packet sent successfully to the server was 1,681 milliseconds ago.
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:61)
	at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:105)
	at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:151)
	at com.mysql.cj.exceptions.ExceptionFactory.createCommunicationsException(ExceptionFactory.java:167)
	at com.mysql.cj.protocol.a.NativeProtocol.readMessage(NativeProtocol.java:546)
	at com.mysql.cj.protocol.a.NativeProtocol.checkErrorMessage(NativeProtocol.java:710)
	at com.mysql.cj.protocol.a.NativeProtocol.sendCommand(NativeProtocol.java:649)
	at com.mysql.cj.protocol.a.NativeProtocol.sendQueryPacket(NativeProtocol.java:948)
	at com.mysql.cj.NativeSession.execSQL(NativeSession.java:1075)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:930)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeUpdateInternal(ClientPreparedStatement.java:1092)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchSerially(ClientPreparedStatement.java:832)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchInternal(ClientPreparedStatement.java:435)
	at com.mysql.cj.jdbc.StatementImpl.executeBatch(StatementImpl.java:794)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:735)
	... 14 more
Caused by: java.net.SocketException: Software caused connection abort: recv failed
	at java.net.SocketInputStream.socketRead0(Native Method)
	at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
	at java.net.SocketInputStream.read(SocketInputStream.java:171)
	at java.net.SocketInputStream.read(SocketInputStream.java:141)
	at com.mysql.cj.protocol.ReadAheadInputStream.fill(ReadAheadInputStream.java:107)
	at com.mysql.cj.protocol.ReadAheadInputStream.readFromUnderlyingStreamIfNecessary(ReadAheadInputStream.java:150)
	at com.mysql.cj.protocol.ReadAheadInputStream.read(ReadAheadInputStream.java:180)
	at java.io.FilterInputStream.read(FilterInputStream.java:133)
	at com.mysql.cj.protocol.FullReadInputStream.readFully(FullReadInputStream.java:64)
	at com.mysql.cj.protocol.a.SimplePacketReader.readHeader(SimplePacketReader.java:63)
	at com.mysql.cj.protocol.a.SimplePacketReader.readHeader(SimplePacketReader.java:45)
	at com.mysql.cj.protocol.a.TimeTrackingPacketReader.readHeader(TimeTrackingPacketReader.java:52)
	at com.mysql.cj.protocol.a.TimeTrackingPacketReader.readHeader(TimeTrackingPacketReader.java:41)
	at com.mysql.cj.protocol.a.MultiPacketReader.readHeader(MultiPacketReader.java:54)
	at com.mysql.cj.protocol.a.MultiPacketReader.readHeader(MultiPacketReader.java:44)
	at com.mysql.cj.protocol.a.NativeProtocol.readMessage(NativeProtocol.java:540)
	... 24 more
