In [3]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, VarcharType, TimestampType, DoubleType
from pyspark.sql.functions import lower,upper,initcap,concat,concat_ws,lit,substring,col,format_string,lpad #Import string transformation functions

In [4]:
def get_spark_session(app_name):
    return SparkSession.builder.appName(app_name).getOrCreate()

In [5]:
def load_file_to_dataframe(spark,file,schema):
    dataframe = spark.read.json(file)

    return dataframe

In [6]:
def transform_customer(dataframe):
    transformed_dataframe = dataframe.withColumn('FIRST_NAME',initcap(dataframe['FIRST_NAME'])) \
                                     .withColumn('MIDDLE_NAME',lower(dataframe['MIDDLE_NAME'])) \
                                     .withColumn('LAST_NAME',initcap(dataframe['LAST_NAME'])) \
                                     .withColumn('FULL_STREET_ADDRESS',concat_ws(',',dataframe['APT_NO'],dataframe['STREET_NAME'])) \
                                        .drop('APT_NO') \
                                        .drop('STREET_NAME') \
                                     .withColumn('CUST_PHONE',
                                                   concat(
                                                   lit('('),
                                                   substring(dataframe['CUST_PHONE'],1,3),
                                                   lit(')'),
                                                   substring(dataframe['CUST_PHONE'],4,3),
                                                   lit('-'),
                                                   substring(dataframe['CUST_PHONE'],7,4)
                                                   )
                                                )
    transformed_dataframe.show()
    transformed_dataframe.printSchema()
    return transformed_dataframe

In [7]:
def transform_credit(dataframe):
    transformed_dataframe = dataframe.withColumn('TIMEID',concat(format_string("%04d",col('YEAR').cast('int')),format_string("%02d",col('MONTH').cast('int')),format_string("%02d",col('DAY').cast('int')))) \
                                        .drop('DAY') \
                                        .drop('MONTH') \
                                        .drop('YEAR')
    transformed_dataframe.show()
    return transformed_dataframe

In [8]:
def transform_branch(dataframe):
    #Note - col function works on chained value vs dataframe['column_name'] does not. Phone was giving errors when changed after adding padded values
    transformed_dataframe = dataframe.fillna(value=999999,subset=['BRANCH_ZIP']) \
                                     .withColumn('BRANCH_ZIP',concat(format_string("%05d",col('BRANCH_ZIP')))) \
                                     .withColumn('BRANCH_PHONE',lpad(dataframe['BRANCH_PHONE'],10,'123'))\
                                     .withColumn('BRANCH_PHONE',
                                                   concat(
                                                   lit('('),
                                                   substring(col('BRANCH_PHONE'),1,3),
                                                   lit(')'),
                                                   substring(col('BRANCH_PHONE'),4,3),
                                                   lit('-'),
                                                   substring(col('BRANCH_PHONE'),7,4)
                                                   )
                                                )
    transformed_dataframe.show()
    return transformed_dataframe

In [9]:
def dataframe_to_temp_view(dataframe,view_name):
    dataframe.createOrReplaceTempView(view_name)

In [10]:
def load_dataframe_to_db(clean_dataframe, table_name, schema):

    from config import db
    db_user = db.get('DATABASE_USER')
    db_pass = db.get('DATABASE_PASSWORD')

    #Note: Default write mode is 'overwrite' which deletes the table and recreates it
    clean_dataframe.select("*").write.format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/creditcard_capstone") \
    .option("dbtable", "creditcard_capstone."+table_name) \
    .option("user", db_user) \
    .option("password", db_pass) \
    .option("mode", "overwrite") \
    .option("truncate", "true") \
    .option("header","false") \
    .option("createTableColumnTypes", schema) \
    .save()

In [11]:
spark = get_spark_session('ETL_Loan_Application')

file_list = {
                'transform_branch':'data_sets/cdw_sapp_branch.json',
                'transform_credit':'data_sets/cdw_sapp_credit.json',
                'transform_customer':'data_sets/cdw_sapp_customer.json'
            }

# schema_df_branch = StructType([ \
#                                 StructField("BRANCH_CODE",IntegerType(),True), \
#                                 StructField("BRANCH_NAME",VarcharType(25),True), \
#                                 StructField("BRANCH_STREET",VarcharType(50),True), \
#                                 StructField("BRANCH_CITY", VarcharType(50), True), \
#                                 StructField("BRANCH_STATE", VarcharType(2), True), \
#                                 StructField("BRANCH_ZIP", IntegerType(), True), \
#                                 StructField("BRANCH_PHONE", VarcharType(10), True), \
#                                 StructField("LAST_UPDATED", TimestampType(), True) \
#                             ])

# schema_df_credit = StructType([ \
#                                 StructField("TRANSACTION_ID",IntegerType(),True), \
#                                 StructField("DAY",VarcharType(2),True), \
#                                 StructField("MONTH",VarcharType(2),True), \
#                                 StructField("YEAR",VarcharType(4),True), \
#                                 StructField("CREDIT_CARD_NO", VarcharType(16), True), \
#                                 StructField("CUST_SSN", IntegerType(), True), \
#                                 StructField("BRANCH_CODE", IntegerType(), True), \
#                                 StructField("TRANSACTION_TYPE", VarcharType(25), True), \
#                                 StructField("TRANSACTION_VALUE", DoubleType(), True) \
#                             ])

# schema_df_customer = StructType([ \
#                                 StructField("FIRST_NAME",VarcharType(25),True), \
#                                 StructField("MIDDLE_NAME",VarcharType(25),True), \
#                                 StructField("LAST_NAME",VarcharType(25),True), \
#                                 StructField("SSN",IntegerType(),True), \
#                                 StructField("CREDIT_CARD_NO", VarcharType(16), True), \
#                                 StructField("APT_NO", VarcharType(10), True), \
#                                 StructField("STREET_NAME", VarcharType(50), True), \
#                                 StructField("CUST_CITY", VarcharType(50), True), \
#                                 StructField("CUST_STATE", VarcharType(2), True), \
#                                 StructField("CUST_COUNTRY", VarcharType(25), True), \
#                                 StructField("CUST_ZIP", IntegerType(), True), \
#                                 StructField("CUST_PHONE", VarcharType(13), True), \
#                                 StructField("CUST_EMAIL", VarcharType(50), True), \
#                                 StructField("LAST_UPDATED", TimestampType(), True) \
#                             ])


# schema_transform_branch = StructType([ \
#                                 StructField("BRANCH_CODE",IntegerType(),True), \
#                                 StructField("BRANCH_NAME",StringType(),True), \
#                                 StructField("BRANCH_STREET",StringType(),True), \
#                                 StructField("BRANCH_CITY", StringType(), True), \
#                                 StructField("BRANCH_STATE", StringType(), True), \
#                                 StructField("BRANCH_ZIP", IntegerType(), True), \
#                                 StructField("BRANCH_PHONE", StringType(), True), \
#                                 StructField("LAST_UPDATED", TimestampType(), True) \
#                             ])

# schema_transform_credit = StructType([ \
#                                 StructField("TRANSACTION_ID",IntegerType(),True), \
#                                 StructField("DAY",StringType(),True), \
#                                 StructField("MONTH",StringType(),True), \
#                                 StructField("YEAR",StringType(),True), \
#                                 StructField("CREDIT_CARD_NO", StringType(), True), \
#                                 StructField("CUST_SSN", IntegerType(), True), \
#                                 StructField("BRANCH_CODE", IntegerType(), True), \
#                                 StructField("TRANSACTION_TYPE", StringType(), True), \
#                                 StructField("TRANSACTION_VALUE", DoubleType(), True) \
#                             ])

# schema_transform_customer = StructType([ \
#                                 StructField("FIRST_NAME",StringType(),True), \
#                                 StructField("MIDDLE_NAME",StringType(),True), \
#                                 StructField("LAST_NAME",StringType(),True), \
#                                 StructField("SSN",IntegerType(),True), \
#                                 StructField("CREDIT_CARD_NO", StringType(), True), \
#                                 StructField("APT_NO", StringType(), True), \
#                                 StructField("STREET_NAME", StringType(), True), \
#                                 StructField("CUST_CITY", StringType(), True), \
#                                 StructField("CUST_STATE", StringType(), True), \
#                                 StructField("CUST_COUNTRY", StringType(), True), \
#                                 StructField("CUST_ZIP", IntegerType(), True), \
#                                 StructField("CUST_PHONE", StringType(), True), \
#                                 StructField("CUST_EMAIL", StringType(), True), \
#                                 StructField("LAST_UPDATED", TimestampType(), True) \
#                             ])

schema_transform_branch = "BRANCH_NAME varchar(25), BRANCH_STREET varchar(50), BRANCH_CITY varchar(50), BRANCH_STATE varchar(2), BRANCH_PHONE varchar(15), LAST_UPDATED timestamp"

schema_transform_credit = "TIMEID varchar(8), CREDIT_CARD_NO varchar(16), TRANSACTION_TYPE varchar(25)"

schema_transform_customer = "FIRST_NAME varchar(50), MIDDLE_NAME varchar(50), LAST_NAME varchar(50), CREDIT_CARD_NO varchar(16), FULL_STREET_ADDRESS varchar(200), CUST_CITY varchar(50), CUST_STATE varchar(2), CUST_COUNTRY varchar(25), CUST_PHONE varchar(13), CUST_EMAIL varchar(50)"

for file_transform, file_path in file_list.items():
    
    #df_name = load_file_to_dataframe(spark, file_path, schema_df_branch)
    schema = 'schema_'+str(file_transform) #Generate dynamic variable to get runtime schema using locals()
    #dataframe = spark.read.json(file_path,schema=locals()[schema])
    dataframe = load_file_to_dataframe(spark, file_path, locals()[schema])
    clean_dataframe = locals()[file_transform](dataframe) #locals used to call function in local scope or it parses it as string

    table_name = file_path.partition('/')[2].partition('.')[0]
    load_dataframe_to_db(clean_dataframe, table_name, locals()[schema])
    #dataframe.printSchema()

#Notes -
#help(spark.read.json) # help function is used to show the documentation, parameters for the function etc..
#Ctrl+Space will give prompts for function


+-----------------+-----------+------------+-------------+------------+-------------------+----------+--------------------+
|      BRANCH_CITY|BRANCH_CODE| BRANCH_NAME| BRANCH_PHONE|BRANCH_STATE|      BRANCH_STREET|BRANCH_ZIP|        LAST_UPDATED|
+-----------------+-----------+------------+-------------+------------+-------------------+----------+--------------------+
|        Lakeville|          1|Example Bank|(123)456-5276|          MN|       Bridle Court|     55044|2018-04-18T16:51:...|
|          Huntley|          2|Example Bank|(123)461-8993|          IL|  Washington Street|     60142|2018-04-18T16:51:...|
|SouthRichmondHill|          3|Example Bank|(123)498-5926|          NY|      Warren Street|     11419|2018-04-18T16:51:...|
|       Middleburg|          4|Example Bank|(123)466-3064|          FL|   Cleveland Street|     32068|2018-04-18T16:51:...|
|    KingOfPrussia|          5|Example Bank|(123)484-9701|          PA|        14th Street|     19406|2018-04-18T16:51:...|
|       

In [9]:
dataframe_branch = spark.read.json(file_list[0])
dataframe_branch.printSchema()
type(dataframe_branch)

root
 |-- BRANCH_CITY: string (nullable = true)
 |-- BRANCH_CODE: long (nullable = true)
 |-- BRANCH_NAME: string (nullable = true)
 |-- BRANCH_PHONE: string (nullable = true)
 |-- BRANCH_STATE: string (nullable = true)
 |-- BRANCH_STREET: string (nullable = true)
 |-- BRANCH_ZIP: long (nullable = true)
 |-- LAST_UPDATED: string (nullable = true)



pyspark.sql.dataframe.DataFrame