In [22]:
import glob                         
import numpy as np
from datetime import datetime
import findspark
findspark.init() 
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat, initcap, lit, lower, substring, when, coalesce, expr, length, to_date
import requests

#Create an active SparkSession 
spark = SparkSession.builder.getOrCreate()

#EXTRACT json file into a dataframe. 
#These json files were downloaded and placed in the folder with VS Code file Jupyter notebook
df_customer = spark.read.json("cdw_sapp_custmer.json")
df_branch = spark.read.json("cdw_sapp_branch.json")
df_cc = spark.read.json("cdw_sapp_credit.json")


#TRANSFORM DF according to "mapping" documentation

df_customer_trans = df_customer.withColumn("FIRST_NAME", initcap(col("FIRST_NAME"))) \
                                .withColumn("MIDDLE_NAME",lower(col("MIDDLE_NAME"))) \
                                .withColumn("LAST_NAME", initcap(col("LAST_NAME"))) \
                                .withColumn("FULL_STREET_ADDRESS", concat(col("APT_NO"), lit(","),col("STREET_NAME"))) \
                                .withColumn(("CUST_PHONE"), concat(lit("(123)"),col("CUST_PHONE"))) \
                                .withColumn("SSN", col("SSN").cast("int")) \
                                .withColumn("CUST_ZIP", col("CUST_ZIP").cast("int")) \
                                .withColumn("LAST_UPDATED", col("LAST_UPDATED").cast("timestamp"))
                                
#Reordering and dropping columns according to mapping document
#By creating a list of mapping order columns and reording dataframe

column_order_cust = ["SSN", "FIRST_NAME", "MIDDLE_NAME", "LAST_NAME", "CREDIT_CARD_NO", "FULL_STREET_ADDRESS", "CUST_CITY", "CUST_STATE", "CUST_COUNTRY", "CUST_ZIP", "CUST_PHONE", "CUST_EMAIL", "LAST_UPDATED"]
df_customer_trans = df_customer_trans.select(column_order_cust)

df_combined = df.withColumn("combined_json", concat(col("json_col1"), lit(delimiter), col("json_col2")))


df_customer_trans.printSchema() 
df_customer_trans.show()

TypeError: 'StructType' object is not callable

In [18]:
df_branch = spark.read.json("cdw_sapp_branch.json")

df_branch_trans = df_branch.withColumn("BRANCH_CODE", col("BRANCH_CODE").cast("int")) \
                                .withColumn("LAST_UPDATED", col("LAST_UPDATED").cast("timestamp")) \
                                .withColumn("BRANCH_ZIP", when(length(col("BRANCH_ZIP")) == 5, col("BRANCH_ZIP").cast("int"))
                                            .otherwise(when(col("BRANCH_ZIP").isNull(), lit(99999))
                                            .otherwise(concat(lit('0'),col("BRANCH_ZIP").cast("string").cast("int")))))\
                                .withColumn("BRANCH_PHONE", 
                                            concat(
                                                lit("("),
                                                substring(col("BRANCH_PHONE"), 1,3),
                                                lit(")"),
                                                substring(col("BRANCH_PHONE"), 4,7)))
                              
                                
df_branch_trans.show()


+-----------------+-----------+------------+------------+------------+-------------------+----------+-------------------+
|      BRANCH_CITY|BRANCH_CODE| BRANCH_NAME|BRANCH_PHONE|BRANCH_STATE|      BRANCH_STREET|BRANCH_ZIP|       LAST_UPDATED|
+-----------------+-----------+------------+------------+------------+-------------------+----------+-------------------+
|        Lakeville|          1|Example Bank|(123)4565276|          MN|       Bridle Court|     55044|2018-04-18 13:51:47|
|          Huntley|          2|Example Bank|(123)4618993|          IL|  Washington Street|     60142|2018-04-18 13:51:47|
|SouthRichmondHill|          3|Example Bank|(123)4985926|          NY|      Warren Street|     11419|2018-04-18 13:51:47|
|       Middleburg|          4|Example Bank|(123)4663064|          FL|   Cleveland Street|     32068|2018-04-18 13:51:47|
|    KingOfPrussia|          5|Example Bank|(123)4849701|          PA|        14th Street|     19406|2018-04-18 13:51:47|
|         Paterson|     

In [21]:
df_cc = spark.read.json("cdw_sapp_credit.json")

df_cc_trans = df_cc.withColumn("CUST_SSN", col("CUST_SSN").cast("int")) \
                    .withColumn("BRANCH_CODE",col("BRANCH_CODE").cast("int")) \
                    .withColumn("CUST_CC_NO",col("CREDIT_CARD_NO")) \
                    .withColumn("TIMEID", to_date(concat(col("YEAR"),col("MONTH"),col("DAY")), 'yyyyMMdd')) \
                    .withColumn("TRANSACTION_ID", col("TRANSACTION_ID").cast("int"))

 
                    
df_cc_trans.printSchema()
df_cc.show()


root
 |-- BRANCH_CODE: integer (nullable = true)
 |-- CREDIT_CARD_NO: string (nullable = true)
 |-- CUST_SSN: integer (nullable = true)
 |-- DAY: long (nullable = true)
 |-- MONTH: long (nullable = true)
 |-- TRANSACTION_ID: integer (nullable = true)
 |-- TRANSACTION_TYPE: string (nullable = true)
 |-- TRANSACTION_VALUE: double (nullable = true)
 |-- YEAR: long (nullable = true)
 |-- CUST_CC_NO: string (nullable = true)
 |-- TIMEID: date (nullable = true)

+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+
|BRANCH_CODE|  CREDIT_CARD_NO| CUST_SSN|DAY|MONTH|TRANSACTION_ID|TRANSACTION_TYPE|TRANSACTION_VALUE|YEAR|
+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+
|        114|4210653349028689|123459988| 14|    2|             1|       Education|             78.9|2018|
|         35|4210653349028689|123459988| 20|    3|             2|   Entertainment|            14.24|2018|
|        