# TPC-DI ETL using pyspark

please visit http://www.tpc.org/tpcdi/ for more information about TPC-DI and to get a copy of the data.

In [1]:
import os
os.environ["JAVA_HOME"] = "/usr/lib64/jvm/jre-1.8.0-openjdk"
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Project2 - ETL based on TPC-DI") \
    .config("spark.driver.memory", "15g") \
    .getOrCreate()

We will start by defining functions to parse the xml file. We will read the xml file as an rdd then map those functions to said rdd to produce data frames. we had to create a separate function to handle each action type, A function to handle new customers, a function to handle updates, adding accounts and closing accounts.

Each of these functions will produce a seperate rdd that we will later collect into dataframs

In [2]:
from pyspark.sql.types import LongType, StringType, StructField, StructType, BooleanType, ArrayType, IntegerType, FloatType, DateType, TimestampType

`customer_parser` will handle new customers and will return all customer data and all account data

In [3]:
import xml.etree.ElementTree as ET
def customer_parser(rdd):
    root = ET.fromstring(rdd[0])
    records= []
    for Action in root:
        for Customer in Action:
            ActionType = Action.attrib['ActionType']
            if ActionType == 'NEW':
                record = []
                list_of_attributes = ['C_ID', 'C_TAX_ID', 'C_GNDR', 'C_TIER', 'C_DOB']
                for attribute in list_of_attributes:
                    try:
                        record.append(Customer.attrib[attribute])
                    except:
                        record.append(None)
                for Element in Customer:
                    if Element.tag == 'ContactInfo':
                        for Subelement in Element:
                            if Subelement.tag[:-1] == 'C_PHONE_':
                                phone_number = ''
                                for Subsubelement in Subelement:
                                    if isinstance(Subsubelement.text, str):                                
                                        phone_number += Subsubelement.text + " "
                                if len(phone_number)>1:
                                    phone_number = phone_number[:-1]
                                else:
                                    phone_number = None
                                record.append(phone_number)
                            else:
                                record.append(Subelement.text)
                    elif Element.tag == 'Account':
                        for attribute in Element.attrib.values():
                            record.append(attribute)
                        for Subelement in Element:
                            record.append(Subelement.text)
                    else:
                        for Subelement in Element:
                            record.append(Subelement.text)
                records.append(record)
    return records


`add_account_parser` will handle 'ADDACCT' action type and will return the account data and the customer's ID.

In [4]:
def add_account_parser(rdd):
    root = ET.fromstring(rdd[0])
    records= []
    for Action in root:
        for Customer in Action:
            ActionType = Action.attrib['ActionType']
            if ActionType == 'ADDACCT':
                record = []
                record.append(Customer.attrib['C_ID'])
                for Element in Customer:
                    if Element.tag == 'Account':
                        for attribute in Element.attrib.values():
                            record.append(attribute)
                        for Subelement in Element:
                            record.append(Subelement.text)
                records.append(record)
    return records

`update_customer_parser` will handle 'UPDCUST' actions. We noticed that each update will have only the new fields. for example a customer could update their email but keep every thing else the same. our parser will return every field for each customer but will have None in fields that did not get updated.

In [5]:
def update_customer_parser(rdd):
    root = ET.fromstring(rdd[0])
    records= []
    for Action in root:
        for Customer in Action:
            ActionType = Action.attrib['ActionType']
            if ActionType == 'UPDCUST':
                record = []
                list_of_attributes = ['C_ID', 'C_TAX_ID', 'C_GNDR', 'C_TIER', 'C_DOB']
                for attribute in list_of_attributes:
                    try:
                        record.append(Customer.attrib[attribute])
                    except:
                        record.append(None)
                for Element in Customer:
                    dict={
                    "C_L_NAME":None,
                    "C_F_NAME":None,
                    "C_M_NAME":None,
                    'C_ADLINE1':None,
                    'C_ADLINE2':None,
                    'C_ZIPCODE':None,
                    'C_CITY':None,
                    'C_STATE_PROV':None,
                    'C_CTRY':None,
                    'C_PRIM_EMAIL':None,
                    'C_ALT_EMAIL':None,
                    'C_PHONE_1':None,
                    'C_PHONE_2':None,
                    'C_PHONE_3':None,
                    "C_LCL_TX_ID":None,
                    "C_NAT_TX_ID":None
                    }
                    if Element.tag == 'ContactInfo':
                        for Subelement in Element:
                            if Subelement.tag[:-1] == 'C_PHONE_':
                                phone_number = ''
                                for Subsubelement in Subelement:
                                    if isinstance(Subsubelement.text, str):                                
                                        phone_number += Subsubelement.text + " "
                                if len(phone_number)>1:
                                    phone_number = phone_number[:-1]
                                else:
                                    phone_number = None
                                dict[Subelement.tag] = phone_number
                            else:
                                dict[Subelement.tag] = Subelement.text
                    elif Element.tag == 'Account':
                        continue
                    else:
                        for Subelement in Element:
                            dict[Subelement.tag] = Subelement.text
                records.append(record+list(dict.values()))
    return records

`update_account_parser` will handle 'UPDACCT', no need to return None here because the account data is always complete.

In [6]:
def update_account_parser(rdd):
    root = ET.fromstring(rdd[0])
    records= []
    for Action in root:
        for Customer in Action:
            ActionType = Action.attrib['ActionType']
            if ActionType == 'UPDACCT':
                record = []
                record.append(Customer.attrib['C_ID'])
                for Account in Customer:
                    record.append(Account.attrib['CA_ID'])
                    try:
                        record.append(Account.attrib['CA_TAX_ST'])
                    except:
                        record.append(None)
                        dict = {
                        "CA_B_ID":None,
                        "CA_NAME":None}
                    for element in Account:
                        dict[element.tag] = element.text
                records.append(record+list(dict.values()))
    return records

We will consider 'INACT' and 'CLOSEACCT' both 'INACT' for simplicity's sake.

In [7]:
def inactive_parser(rdd):
    root = ET.fromstring(rdd[0])
    records= []
    for Action in root:
        for Customer in Action:
            ActionType = Action.attrib['ActionType']
            if ActionType == 'INACT' or ActionType == 'CLOSEACCT':
                records.append(Customer.attrib['C_ID'])
    return records

We Will read the xml file into an rdd

In [8]:
file_rdd = spark.read.text('./Dataset/CustomerMgmt.xml', wholetext=True).rdd

we will create `new_customer_records_rdd` by using `customer_parser`

In [9]:
new_customer_records_rdd = file_rdd.flatMap(customer_parser)

`customer_parser` returns for each new customer, the customer details and their account details. we need to split the resulting dataframe to separate the customers' information from the accounts' information

In [10]:
new_customer_schema = StructType([
    StructField("C_ID", StringType(), False),
    StructField("C_TAX_ID", StringType(), False),
    StructField("C_GNDR", StringType(), True),
    StructField("C_TIER", StringType(), True),
    StructField("C_DOB", StringType(), False),
    StructField("C_L_NAME", StringType(), False),
    StructField("C_F_NAME", StringType(), False),
    StructField("C_M_NAME", StringType(), True),
    StructField("C_ADLINE1", StringType(), False),
    StructField("C_ADLINE2", StringType(), True),
    StructField("C_ZIPCODE", StringType(), False),
    StructField("C_CITY", StringType(), False),
    StructField("C_STATE_PROV", StringType(), False),
    StructField("C_CTRY", StringType(), False),
    StructField("C_PRIM_EMAIL", StringType(), False),
    StructField("C_ALT_EMAIL", StringType(), True),
    StructField("C_PHONE_1", StringType(), True),
    StructField("C_PHONE_2", StringType(), True),
    StructField("C_PHONE_3", StringType(), True),
    StructField("C_LCL_TX_ID", StringType(), False),
    StructField("C_NAT_TX_ID", StringType(), False),
    StructField("CA_ID", StringType(), False),
    StructField("CA_TAX_ST", StringType(), False),
    StructField("CA_B_ID", StringType(), False),
    StructField("CA_NAME", StringType(), True)])
customer_schema = StructType([
    StructField("C_ID", StringType(), True),
    StructField("C_TAX_ID", StringType(), True),
    StructField("C_GNDR", StringType(), True),
    StructField("C_TIER", StringType(), True),
    StructField("C_DOB", StringType(), True),
    StructField("C_L_NAME", StringType(), True),
    StructField("C_F_NAME", StringType(), True),
    StructField("C_M_NAME", StringType(), True),
    StructField("C_ADLINE1", StringType(), True),
    StructField("C_ADLINE2", StringType(), True),
    StructField("C_ZIPCODE", StringType(), True),
    StructField("C_CITY", StringType(), True),
    StructField("C_STATE_PROV", StringType(), True),
    StructField("C_CTRY", StringType(), True),
    StructField("C_PRIM_EMAIL", StringType(), True),
    StructField("C_ALT_EMAIL", StringType(), True),
    StructField("C_PHONE_1", StringType(), True),
    StructField("C_PHONE_2", StringType(), True),
    StructField("C_PHONE_3", StringType(), True),
    StructField("C_LCL_TX_ID", StringType(), True),
    StructField("C_NAT_TX_ID", StringType(), True)])
account_schema = StructType([
    StructField("C_ID", StringType(), True),
    StructField("CA_ID", StringType(), True),
    StructField("CA_TAX_ST", StringType(), True),
    StructField("CA_B_ID", StringType(), True),
    StructField("CA_NAME", StringType(), True)])
new_customer_df = new_customer_records_rdd.toDF(new_customer_schema).select("C_ID", "C_TAX_ID", "C_GNDR", "C_TIER", "C_DOB", "C_L_NAME", "C_F_NAME", "C_M_NAME", "C_ADLINE1", "C_ADLINE2", "C_ZIPCODE", "C_CITY", "C_STATE_PROV", "C_CTRY", "C_PRIM_EMAIL", "C_ALT_EMAIL", "C_PHONE_1", "C_PHONE_2", "C_PHONE_3", "C_LCL_TX_ID", "C_NAT_TX_ID")
new_account_df = new_customer_records_rdd.toDF(new_customer_schema).select("C_ID", "CA_ID", "CA_TAX_ST", "CA_B_ID", "CA_NAME")

We now have two dataframes containing new customers' data and their accounts' data respectively.

In [11]:
new_customer_df.show(5)

+----+-----------+------+------+----------+--------+--------+--------+--------------------+---------+---------+----------------+------------+--------------------+--------------------+--------------------+------------------+------------+---------+-----------+-----------+
|C_ID|   C_TAX_ID|C_GNDR|C_TIER|     C_DOB|C_L_NAME|C_F_NAME|C_M_NAME|           C_ADLINE1|C_ADLINE2|C_ZIPCODE|          C_CITY|C_STATE_PROV|              C_CTRY|        C_PRIM_EMAIL|         C_ALT_EMAIL|         C_PHONE_1|   C_PHONE_2|C_PHONE_3|C_LCL_TX_ID|C_NAT_TX_ID|
+----+-----------+------+------+----------+--------+--------+--------+--------------------+---------+---------+----------------+------------+--------------------+--------------------+--------------------+------------------+------------+---------+-----------+-----------+
|   0|923-54-6498|     F|     3|1940-12-02| Joannis|   Adara|    null|     4779 Weller Way|     null|    92624|        Columbus|     Ontario|              Canada|Adara.Joannis@moo...|Adar

In [12]:
new_account_df.show(5)

+----+-----+---------+-------+--------------------+
|C_ID|CA_ID|CA_TAX_ST|CA_B_ID|             CA_NAME|
+----+-----+---------+-------+--------------------+
|   0|    0|        1|  17713|CJlmMuFyibKOmKLHI...|
|   1|    1|        2|    615|BbxTgVGOlgyrYtVRj...|
|   2|    2|        1|   3927|IGzIDNTTRUDKwGaoV...|
|   3|    3|        1|   6256|ZHXwHtCcLZqdWhWOP...|
|   4|    4|        1|   3412|mzlYZlTIDmOGuKQHO...|
+----+-----+---------+-------+--------------------+
only showing top 5 rows



We will create `add_account_records_rdd` using `add_account_parser` and will create another dataframes containing accounts that existing customers added.

In [13]:
add_account_records_rdd = file_rdd.flatMap(add_account_parser)
add_account_df = add_account_records_rdd.toDF(account_schema)
add_account_df.show(5)

+----+-----+---------+-------+--------------------+
|C_ID|CA_ID|CA_TAX_ST|CA_B_ID|             CA_NAME|
+----+-----+---------+-------+--------------------+
|  88|  125|        2|  12126|WUIiLVBcUKKmgobPO...|
|  54|  126|        1|  15244|JEddDkBzL R NXaer...|
|  20|  127|        0|  12314|fFWYxTIiUmlHCHaCc...|
|  21|  128|        1|  25910|PDQySQaONlBUACNtV...|
|  77|  129|        1|  43888|ASPUxCpSHDPGGNQva...|
+----+-----+---------+-------+--------------------+
only showing top 5 rows



We will create `updated_account_rdd` with `update_account_parser` and create a dataframe that contains all the acounts that have been updated

In [14]:
from pyspark.sql.functions import udf
updated_account_rdd = file_rdd.flatMap(update_account_parser)
updated_account_df = updated_account_rdd.toDF(account_schema)

Finally, to collect all the accounts in one dataframe we will do the following:
- We concatenate `new_account_df` and `add_account_df`, to get all the initial accounts data in one dataframe.
- We will do a left anti join between the new accounts and the updated accounts to get only the accounts that have not been updated.
- Then we will concatenate the result with the accounts that have been updated. the resulting dataframe will have all up to date account data.

In [15]:
Accounts = new_account_df.union(add_account_df).join(updated_account_df, on=['C_ID','CA_ID'], how='left_anti').union(updated_account_df)

Finally for the accounts, we will populate the `CA_ST_ID` column

In [16]:
inactive_accounts = file_rdd.flatMap(inactive_parser)
inact_list = inactive_accounts.collect()
inact_func = udf(lambda x: 'INAC' if str(x) in inact_list else 'ACTV')

Accounts = Accounts.withColumn('CA_ST_ID', inact_func(Accounts.C_ID))
Accounts.show(5)

+-----+-----+---------+-------+--------------------+--------+
| C_ID|CA_ID|CA_TAX_ST|CA_B_ID|             CA_NAME|CA_ST_ID|
+-----+-----+---------+-------+--------------------+--------+
|   10| 3429|        1|   4984|NvnqmafKEeRraHJlD...|    INAC|
|10284|20469|        0|  30262|WSrAJPnvZzbENxGPc...|    ACTV|
|10472|20832|        1|   7082|GxHMKBqZhsFTwZxrB...|    ACTV|
|  106|  916|        1|  27782|fXyDCcGSMkKqkcAJD...|    INAC|
|11165|22225|        1|  12877|i HGYvIGvIsbtW KO...|    ACTV|
+-----+-----+---------+-------+--------------------+--------+
only showing top 5 rows



Now back to customers, we will create a dataframe that contains all the customers who updated their data.

In [17]:
update_customer_rdd = file_rdd.flatMap(update_customer_parser)
update_customer_df = update_customer_rdd.toDF(customer_schema)
update_customer_df.show(5)

+----+--------+------+------+-----+--------+--------+--------+---------+---------+---------+------+------------+------+--------------------+--------------------+--------------+--------------+------------------+-----------+-----------+
|C_ID|C_TAX_ID|C_GNDR|C_TIER|C_DOB|C_L_NAME|C_F_NAME|C_M_NAME|C_ADLINE1|C_ADLINE2|C_ZIPCODE|C_CITY|C_STATE_PROV|C_CTRY|        C_PRIM_EMAIL|         C_ALT_EMAIL|     C_PHONE_1|     C_PHONE_2|         C_PHONE_3|C_LCL_TX_ID|C_NAT_TX_ID|
+----+--------+------+------+-----+--------+--------+--------+---------+---------+---------+------+------------+------+--------------------+--------------------+--------------+--------------+------------------+-----------+-----------+
|  69|    null|  null|     2| null|    null|    null|    null|     null|     null|     null|  null|        null|  null|Venus.DiLoreto@ip...|                null|1 889 662-1131|      505-9373|              null|       null|       null|
|   7|    null|  null|     3| null|    null|    null|    nul

We will do a left anti join between `new_customer_df` and `update_customer_df` to get all the customers whose information did not get updated in one dataframe. We will then do an inner join between `new_customer_df` and `update_customer_df`to get all customers whose information has been updated. The resulting datafram will have, for each customer, one value for ID and two values for each other field. one coming from the data before the update and one from after. The data from after the update is incomplete and will contain null values if a certain feild did not get updated.

We will rename the second dataframes columns for clarity by appending '_pdate' to the updated columns. for example "phone1" and "phone1_update"

In [18]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
customers_not_updated = new_customer_df.join(update_customer_df, on=['C_ID'], how='left_anti')
customers_updated = new_customer_df.join(update_customer_df, on=['C_ID'], how='inner')
columns = []
for index, column in enumerate(customers_updated.columns):
    if index <= 20:
        columns.append(column)
    else:
        columns.append(column+'_update')

We now need to iterate over each row to check if each field has been updated (if the new value is not null) and replace the old value with the new value. to achieve this we will convert the dataframe back to rdd then use `customer_updater` to insert the updated values and we will get a dataframe that will have the up to date data.

In [19]:
customers_updated = customers_updated.toDF(*columns).rdd
def customer_updater(row):
    new_row= [row.C_ID]
    for column in columns:
        if column != 'C_ID' and (not '_update' in column):
            if not getattr(row,column+'_update') is None:
                new_row.append(getattr(row,column+'_update'))
            else:
                new_row.append(getattr(row,column))
    return new_row
customers_updated = customers_updated.map(customer_updater).toDF(customer_schema)
customers_updated.show(5)

+-----+-----------+------+------+----------+---------+--------+--------+--------------------+---------+---------+------------+------------+--------------------+--------------------+--------------------+--------------+------------+---------+-----------+-----------+
| C_ID|   C_TAX_ID|C_GNDR|C_TIER|     C_DOB| C_L_NAME|C_F_NAME|C_M_NAME|           C_ADLINE1|C_ADLINE2|C_ZIPCODE|      C_CITY|C_STATE_PROV|              C_CTRY|        C_PRIM_EMAIL|         C_ALT_EMAIL|     C_PHONE_1|   C_PHONE_2|C_PHONE_3|C_LCL_TX_ID|C_NAT_TX_ID|
+-----+-----------+------+------+----------+---------+--------+--------+--------------------+---------+---------+------------+------------+--------------------+--------------------+--------------------+--------------+------------+---------+-----------+-----------+
|10436|925-79-3935|  null|     3|2005-01-15|   Hachey|  Sarath|    null|196 Bellaire Cres...|     null|    19244|Fort Collins|          NM|United States of ...|Sarath.Hachey@icq...|Sarath.Hachey@gmx...|743

Finally we will concatenate the dataframe containing customers who did not get updated and the dataframe containing customers who did get updated.

In [20]:
Customers = customers_not_updated.union(customers_updated)
Customers.show(5)

+-----+-----------+------+------+----------+--------+--------+--------+--------------------+---------+---------+---------------+--------------------+--------------------+--------------------+--------------------+------------------+------------+--------------+-----------+-----------+
| C_ID|   C_TAX_ID|C_GNDR|C_TIER|     C_DOB|C_L_NAME|C_F_NAME|C_M_NAME|           C_ADLINE1|C_ADLINE2|C_ZIPCODE|         C_CITY|        C_STATE_PROV|              C_CTRY|        C_PRIM_EMAIL|         C_ALT_EMAIL|         C_PHONE_1|   C_PHONE_2|     C_PHONE_3|C_LCL_TX_ID|C_NAT_TX_ID|
+-----+-----------+------+------+----------+--------+--------+--------+--------------------+---------+---------+---------------+--------------------+--------------------+--------------------+--------------------+------------------+------------+--------------+-----------+-----------+
|10096|214-25-1030|  null|     3|1997-09-15|  Polder|    Kyla|    null|    15031 Wood South|     null|    29409|          Omaha|                  CA

In [21]:
 schema_TaxRate = StructType([
    StructField("TX_ID", StringType(), False),
    StructField("TX_NAME", StringType(), False),
    StructField("TX_RATE", FloatType(), False)])

df_TaxRate = spark.read\
            .format("csv")\
            .schema(schema_TaxRate)\
            .option("header", "false")\
            .option("sep", "|")\
            .load("./Dataset/TaxRate.txt")
df_TaxRate.show(5)

+-----+--------------------+-------+
|TX_ID|             TX_NAME|TX_RATE|
+-----+--------------------+-------+
|  US1|U.S. Income Tax B...|   0.15|
|  US2|U.S. Income Tax B...|  0.275|
|  US3|U.S. Income Tax B...|  0.305|
|  US4|U.S. Income Tax B...|  0.355|
|  US5|U.S. Income Tax B...|  0.391|
+-----+--------------------+-------+
only showing top 5 rows



In [22]:
schema_Prospect = StructType([
    StructField("AgencyID", StringType(), False),
    StructField("LastName", StringType(), False),
    StructField("FirstName", StringType(), False),
    StructField("MiddleInitial", StringType(), False),
    StructField("Gender", StringType(), False),
    StructField("AddressLine1", StringType(), False),
    StructField("AddressLine2", StringType(), False),
    StructField("PostalCode", StringType(), False),
    StructField("City", StringType(), False),
    StructField("State", StringType(), False),
    StructField("Country", StringType(), False),
    StructField("Phone", StringType(), False),
    StructField("Income", IntegerType(), False),
    StructField("NumberCars", IntegerType(), False),
    StructField("NumberChildren", IntegerType(), False),
    StructField("MaritalStatus", StringType(), False),
    StructField("Age", IntegerType(), False),
    StructField("CreditRating", IntegerType(), False),
    StructField("OwnOrRentFlag", StringType(), False),
    StructField("Employer", StringType(), False),
    StructField("NumberCreditCards", IntegerType(), False),
    StructField("NetWorth", IntegerType(), False)])

df_Prospect = spark.read\
            .format("csv")\
            .schema(schema_Prospect)\
            .option("header", "false")\
            .option("sep", ",")\
            .load("./Dataset/Prospect.csv")
df_Prospect.show(5)

+--------+--------+---------+-------------+------+--------------------+------------+----------+---------------+-----+--------------------+--------------+------+----------+--------------+-------------+----+------------+-------------+--------------------+-----------------+--------+
|AgencyID|LastName|FirstName|MiddleInitial|Gender|        AddressLine1|AddressLine2|PostalCode|           City|State|             Country|         Phone|Income|NumberCars|NumberChildren|MaritalStatus| Age|CreditRating|OwnOrRentFlag|            Employer|NumberCreditCards|NetWorth|
+--------+--------+---------+-------------+------+--------------------+------------+----------+---------------+-----+--------------------+--------------+------+----------+--------------+-------------+----+------------+-------------+--------------------+-----------------+--------+
|    KOZ0|  KOZIOL|  Mahmood|         null|  null|17886 st. phillip...|        null|   H6b 1w1|       St. Paul|   TN|              Canada|1-712-522-6088|3687

In [23]:
from pyspark.sql.functions import broadcast
df_TaxRate_broad = broadcast(df_TaxRate)
df_TaxRate_broad.createOrReplaceTempView("TaxRate_broad")

In [24]:
df_Prospect_broad = broadcast(df_Prospect)
df_Prospect_broad.createOrReplaceTempView("Prospect_broad")

In [25]:
schema_Account = StructType([
    StructField("CDC_FLAG", StringType(), False),
    StructField("CDC_DSN", IntegerType(), True),
    StructField("CA_ID", IntegerType(), True),
    StructField("CA_B_ID", IntegerType(), True),
    StructField("CA_C_ID", IntegerType(), True),
    StructField("CA_NAME", StringType(), False),
    StructField("CA_TAX_ST", IntegerType(), True),
    StructField("CA_ST_ID", StringType(), True)])

In [26]:
df_StatusType = spark.read\
            .format("csv")\
            .schema('ST_ID string, ST_NAME string')\
            .option("header", "false")\
            .option("sep", "|")\
            .load("./Dataset/StatusType.txt")

In [27]:
df_StatusType.show(10)

+-----+---------+
|ST_ID|  ST_NAME|
+-----+---------+
| ACTV|   Active|
| CMPT|Completed|
| CNCL| Canceled|
| PNDG|  Pending|
| SBMT|Submitted|
| INAC| Inactive|
+-----+---------+



In [28]:
df_StatusType_broad = broadcast(df_StatusType)

In [29]:
from datetime import datetime

In [30]:
Accounts.createOrReplaceTempView("accounts")
df_StatusType_broad.createOrReplaceTempView("statusType_broad")

In [31]:
dimAccount = spark.sql("\
                       Select CA_ID as AccountID,\
                       C_ID as CustomerID,\
                       ST_NAME as Status,\
                       CA_NAME as AccountDesc,\
                       CA_TAX_ST as TaxStatus,\
                       CAST('True' as BOOLEAN) as IsCurrent,\
                       CAST('1' as INT) as BatchID,\
                       to_date('2015-01-01', 'yyyy-MM-dd') as EffectiveDate,\
                       to_date('9999-12-31', 'yyyy-MM-dd') as EndDate\
                       From accounts join statusType_broad on accounts.CA_ST_ID = statusType_broad.ST_ID")

In [32]:
dimAccount.show(10)

+---------+----------+--------+--------------------+---------+---------+-------+-------------+----------+
|AccountID|CustomerID|  Status|         AccountDesc|TaxStatus|IsCurrent|BatchID|EffectiveDate|   EndDate|
+---------+----------+--------+--------------------+---------+---------+-------+-------------+----------+
|     3429|        10|Inactive|NvnqmafKEeRraHJlD...|        1|     true|      1|   2015-01-01|9999-12-31|
|    20469|     10284|  Active|WSrAJPnvZzbENxGPc...|        0|     true|      1|   2015-01-01|9999-12-31|
|    20832|     10472|  Active|GxHMKBqZhsFTwZxrB...|        1|     true|      1|   2015-01-01|9999-12-31|
|      916|       106|Inactive|fXyDCcGSMkKqkcAJD...|        1|     true|      1|   2015-01-01|9999-12-31|
|    22225|     11165|  Active|i HGYvIGvIsbtW KO...|        1|     true|      1|   2015-01-01|9999-12-31|
|    28436|     11167|  Active|WmgHiXrLf kNTUXSI...|        1|     true|      1|   2015-01-01|9999-12-31|
|    25613|     11338|  Active|ZFqDhUqNLjCjSdC

In [33]:
dimAccount.printSchema()

root
 |-- AccountID: string (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- AccountDesc: string (nullable = true)
 |-- TaxStatus: string (nullable = true)
 |-- IsCurrent: boolean (nullable = true)
 |-- BatchID: integer (nullable = true)
 |-- EffectiveDate: date (nullable = true)
 |-- EndDate: date (nullable = true)



In [34]:
Customers.createOrReplaceTempView("customers")

In [35]:
dimCustomer = spark.sql("\
                       Select c.C_ID as CustomerID,\
                       C_TAX_ID as TaxID,\
                       C_L_NAME as LastName,\
                       C_F_NAME as FirstName,\
                       C_M_NAME as MiddleInitial,\
                       C_GNDR as Gender,\
                       C_TIER as Tier,\
                       C_DOB as DOB,\
                       C_ADLINE1 as AddressLine1,\
                       C_ADLINE2 as AddressLine2,\
                       C_ZIPCODE as PostalCode,\
                       C_CITY as City,\
                       C_STATE_PROV as StateProv,\
                       C_CTRY as Country,\
                       C_PHONE_1 as Phone1,\
                       C_PHONE_2 as Phone2,\
                       C_PHONE_3 as Phone3,\
                       C_PRIM_EMAIL as Email1,\
                       C_ALT_EMAIL as Email2,\
                       NAT.TX_NAME as NationalTaxRateDesc,\
                       NAT.TX_RATE as NationalTaxRate,\
                       LCL.TX_NAME as LocalTaxRateDesc,\
                       LCL.TX_RATE as LocalTaxRate,\
                       AgencyID as AgencyID,\
                       CreditRating as CreditRating,\
                       NetWorth as NetWorth,\
                        COALESCE(CASE \
                            WHEN NetWorth > 1000000 THEN 'HighValue+' \
                            ELSE NULL \
                        END,\
                       CASE \
                            WHEN NumberChildren > 3 THEN 'Expenses+' \
                            WHEN NumberCreditCards > 5 THEN 'Expenses+'\
                            ELSE NULL \
                        END,\
                       CASE \
                            WHEN Age > 45 THEN 'Boomer+' \
                            ELSE NULL \
                        END,\
                       CASE \
                            WHEN Income < 50000 THEN 'MoneyAlert+' \
                            WHEN CreditRating < 600 THEN 'MoneyAlert+' \
                            WHEN NetWorth < 100000 THEN 'MoneyAlert+' \
                            ELSE Null \
                        END,\
                       CASE \
                            WHEN NumberCars > 3 THEN 'Spender+' \
                            WHEN NumberCreditCards > 7 THEN 'Spender+' \
                            ELSE Null \
                        END,\
                       CASE \
                            WHEN Age < 25 THEN 'Inherited' \
                            WHEN NetWorth > 100000 THEN 'Inherited' \
                            ELSE Null \
                        END) as MarketingNameplate,\
                       CAST('True' as BOOLEAN) as IsCurrent,\
                       CAST('1' as INT) as BatchID,\
                       to_date('2015-01-01', 'yyyy-MM-dd') as EffectiveDate,\
                       to_date('9999-12-31', 'yyyy-MM-dd') as EndDate\
                       From customers as c \
                       left join TaxRate_broad as NAT on c.C_NAT_TX_ID = NAT.TX_ID\
                       left join TaxRate_broad as LCL on c.C_LCL_TX_ID = LCL.TX_ID\
                       left join Prospect_broad as p on (c.C_L_NAME = p.LastName and c.C_F_NAME = p.FirstName and c.C_ADLINE1 = p.AddressLine1 and c.C_ADLINE2 =  p.AddressLine2 and c.C_ZIPCODE = p.PostalCode)")


In [36]:
file_rdd.unpersist()
new_customer_records_rdd.unpersist()
new_customer_df.unpersist()
new_account_df.unpersist()
add_account_records_rdd.unpersist()
add_account_df.unpersist()
Accounts.unpersist()
inactive_accounts.unpersist()
update_customer_rdd.unpersist()
update_customer_df.unpersist()
customers_not_updated.unpersist()
customers_updated.unpersist()
Customers.unpersist()
df_TaxRate.unpersist()
df_Prospect.unpersist()
df_TaxRate_broad.unpersist()
df_Prospect_broad.unpersist()

DataFrame[AgencyID: string, LastName: string, FirstName: string, MiddleInitial: string, Gender: string, AddressLine1: string, AddressLine2: string, PostalCode: string, City: string, State: string, Country: string, Phone: string, Income: int, NumberCars: int, NumberChildren: int, MaritalStatus: string, Age: int, CreditRating: int, OwnOrRentFlag: string, Employer: string, NumberCreditCards: int, NetWorth: int]

In [37]:
inact_func = udf(lambda x: 'Inactive' if str(x) in inact_list else 'Active')

dimCustomer = dimCustomer.withColumn('Status', inact_func(dimCustomer.CustomerID))
dimCustomer.show(10)
dimCustomer.createOrReplaceTempView("tbldimCustomer")

+----------+-----------+--------+---------+-------------+------+----+----------+--------------------+------------+----------+---------------+--------------------+--------------------+------------------+------------+--------------+--------------------+--------------------+--------------------+---------------+--------------------+------------+--------+------------+--------+------------------+---------+-------+-------------+----------+--------+
|CustomerID|      TaxID|LastName|FirstName|MiddleInitial|Gender|Tier|       DOB|        AddressLine1|AddressLine2|PostalCode|           City|           StateProv|             Country|            Phone1|      Phone2|        Phone3|              Email1|              Email2| NationalTaxRateDesc|NationalTaxRate|    LocalTaxRateDesc|LocalTaxRate|AgencyID|CreditRating|NetWorth|MarketingNameplate|IsCurrent|BatchID|EffectiveDate|   EndDate|  Status|
+----------+-----------+--------+---------+-------------+------+----+----------+--------------------+-------

In [38]:
schema_Date = StructType([
    StructField("SK_DateID", IntegerType(), False),
    StructField("DateValue", StringType(), False),
    StructField("DateDesc", StringType(), False),
    StructField("CalendarYearID", IntegerType(), False),
    StructField("CalendarYearDesc", StringType(), False),
    StructField("CalendarQtrID", IntegerType(), False),
    StructField("CalendarQtrDesc", StringType(), False),
    
    StructField("CalendarMonthID", IntegerType(), False),
    StructField("CalendarMonthDesc", StringType(), False),
    StructField("CalendarWeekID", IntegerType(), False),
    StructField("CalendarWeekDesc", StringType(), False),
    
    StructField("DayOfWeekNum", IntegerType(), False),
    StructField("DayOfWeekDesc", StringType(), False),
    StructField("FiscalYearID", IntegerType(), False),
    StructField("FiscalYearDesc", StringType(), False),
    
    StructField("FiscalQtrID", IntegerType(), False),
    StructField("FiscalQtrDesc", StringType(), False),
    
    StructField("HolidayFlag", BooleanType(), True)])

In [39]:
df_Date = spark.read\
            .format("csv")\
            .schema(schema_Date)\
            .option("header", "false")\
            .option("sep", "|")\
            .load("./Dataset/Date.txt")

In [40]:
df_Date.show(10)

+---------+----------+----------------+--------------+----------------+-------------+---------------+---------------+-----------------+--------------+----------------+------------+-------------+------------+--------------+-----------+-------------+-----------+
|SK_DateID| DateValue|        DateDesc|CalendarYearID|CalendarYearDesc|CalendarQtrID|CalendarQtrDesc|CalendarMonthID|CalendarMonthDesc|CalendarWeekID|CalendarWeekDesc|DayOfWeekNum|DayOfWeekDesc|FiscalYearID|FiscalYearDesc|FiscalQtrID|FiscalQtrDesc|HolidayFlag|
+---------+----------+----------------+--------------+----------------+-------------+---------------+---------------+-----------------+--------------+----------------+------------+-------------+------------+--------------+-----------+-------------+-----------+
| 19500101|1950-01-01| January 1, 1950|          1950|            1950|        19501|        1950 Q1|          19501|     1950 January|         19501|         1950-W1|           7|       Sunday|        1950|          

In [41]:
df_Date.printSchema()

root
 |-- SK_DateID: integer (nullable = true)
 |-- DateValue: string (nullable = true)
 |-- DateDesc: string (nullable = true)
 |-- CalendarYearID: integer (nullable = true)
 |-- CalendarYearDesc: string (nullable = true)
 |-- CalendarQtrID: integer (nullable = true)
 |-- CalendarQtrDesc: string (nullable = true)
 |-- CalendarMonthID: integer (nullable = true)
 |-- CalendarMonthDesc: string (nullable = true)
 |-- CalendarWeekID: integer (nullable = true)
 |-- CalendarWeekDesc: string (nullable = true)
 |-- DayOfWeekNum: integer (nullable = true)
 |-- DayOfWeekDesc: string (nullable = true)
 |-- FiscalYearID: integer (nullable = true)
 |-- FiscalYearDesc: string (nullable = true)
 |-- FiscalQtrID: integer (nullable = true)
 |-- FiscalQtrDesc: string (nullable = true)
 |-- HolidayFlag: boolean (nullable = true)



In [42]:
df_Date.createOrReplaceTempView("date")

In [43]:
dimDate = spark.sql("\
                       Select SK_DateID,\
                           Cast(DateValue as date),\
                           DateDesc,\
                           CalendarYearID,\
                           CalendarYearDesc,\
                           CalendarQtrID,\
                           CalendarQtrDesc,\
                           CalendarMonthID,\
                           CalendarMonthDesc,\
                           CalendarWeekID,\
                           CalendarWeekDesc,\
                           DayOfWeekNum,\
                           DayOfWeekDesc,\
                           FiscalYearID,\
                           FiscalYearDesc,\
                           FiscalQtrID,\
                           FiscalQtrDesc,\
                           HolidayFlag\
                       From date")

In [44]:
dimDate.show(10)

+---------+----------+----------------+--------------+----------------+-------------+---------------+---------------+-----------------+--------------+----------------+------------+-------------+------------+--------------+-----------+-------------+-----------+
|SK_DateID| DateValue|        DateDesc|CalendarYearID|CalendarYearDesc|CalendarQtrID|CalendarQtrDesc|CalendarMonthID|CalendarMonthDesc|CalendarWeekID|CalendarWeekDesc|DayOfWeekNum|DayOfWeekDesc|FiscalYearID|FiscalYearDesc|FiscalQtrID|FiscalQtrDesc|HolidayFlag|
+---------+----------+----------------+--------------+----------------+-------------+---------------+---------------+-----------------+--------------+----------------+------------+-------------+------------+--------------+-----------+-------------+-----------+
| 19500101|1950-01-01| January 1, 1950|          1950|            1950|        19501|        1950 Q1|          19501|     1950 January|         19501|         1950-W1|           7|       Sunday|        1950|          

In [45]:
dimDate.printSchema()

root
 |-- SK_DateID: integer (nullable = true)
 |-- DateValue: date (nullable = true)
 |-- DateDesc: string (nullable = true)
 |-- CalendarYearID: integer (nullable = true)
 |-- CalendarYearDesc: string (nullable = true)
 |-- CalendarQtrID: integer (nullable = true)
 |-- CalendarQtrDesc: string (nullable = true)
 |-- CalendarMonthID: integer (nullable = true)
 |-- CalendarMonthDesc: string (nullable = true)
 |-- CalendarWeekID: integer (nullable = true)
 |-- CalendarWeekDesc: string (nullable = true)
 |-- DayOfWeekNum: integer (nullable = true)
 |-- DayOfWeekDesc: string (nullable = true)
 |-- FiscalYearID: integer (nullable = true)
 |-- FiscalYearDesc: string (nullable = true)
 |-- FiscalQtrID: integer (nullable = true)
 |-- FiscalQtrDesc: string (nullable = true)
 |-- HolidayFlag: boolean (nullable = true)



In [46]:
schema_CashTrans = StructType([
    StructField("CDC_FLAG", StringType(), False),
    StructField("CDC_DSN", IntegerType(), False),
    StructField("CT_CA_ID", IntegerType(), False),
    StructField("CT_DTS", TimestampType(), False),
    StructField("CT_AMT", FloatType(), False),
    StructField("CT_NAME", StringType(), False)])

In [47]:
df_CashTrans = spark.read\
            .format("csv")\
            .schema(schema_CashTrans)\
            .option("header", "false")\
            .option("sep", "|")\
            .load("./Dataset/Batch2/CashTransaction.txt")

In [48]:
df_CashTrans.show(10)

+--------+-------+--------+-------------------+----------+--------------------+
|CDC_FLAG|CDC_DSN|CT_CA_ID|             CT_DTS|    CT_AMT|             CT_NAME|
+--------+-------+--------+-------------------+----------+--------------------+
|       I|4937695|    6507|2017-07-08 10:16:09|   5519.45|AYJRCJpzLBMJUWKjS...|
|       I|4938687|    1571|2017-07-08 15:07:22| 623871.06|jPmEvxgxaeaq Uxqu...|
|       I|4938975|   11708|2017-07-08 17:55:33|   8712.61|BIOgCYoEPlRuRUiMG...|
|       I|4940035|   12403|2017-07-08 22:55:52|-516203.06|          GO ERHHSVO|
|       I|4941475|    4189|2017-07-08 21:37:17|  -3084.95|FIgutPGqjffXeBwvY...|
|       I|4941627|    4212|2017-07-08 04:46:55|-216073.48|mTyTAMFpoVUAuNfxs...|
|       I|4942163|    6390|2017-07-08 14:43:48| -58952.78|GAuMSSixIeaqGjZZL...|
|       I|4942839|    9836|2017-07-08 19:28:33|   4761.51|         sqTyZKkSGVG|
|       I|4943923|    5689|2017-07-08 05:35:45|  -3700.86|fdMCOdoGxgfVCMpgu...|
|       I|4944143|    5689|2017-07-08 07

In [49]:
df_CashTrans.createOrReplaceTempView("cashTrans")
dimDate.createOrReplaceTempView("dimDate_tbl")
dimAccount.createOrReplaceTempView("dimAccount_tbl")

In [50]:
factCashBalances = spark.sql("\
                       Select CustomerID,\
                           AccountID,\
                           SK_DateID,\
                           sum(CT_AMT) as Cash,\
                           CAST('1' as INT) as BatchID\
                       From cashTrans join dimAccount_tbl as ac on (CT_CA_ID =ac.AccountID)\
                       join dimDate_tbl as dt on dt.DateValue = Date(CT_DTS)\
                       Group by AccountID, CustomerID, SK_DateID")

In [51]:
factCashBalances.show(5)

+----------+---------+---------+------------------+-------+
|CustomerID|AccountID|SK_DateID|              Cash|BatchID|
+----------+---------+---------+------------------+-------+
|      1337|     2562| 20170708|    -87205.7421875|      1|
|      1514|     9587| 20170708|1647.3900146484375|      1|
|       383|    11259| 20170708|-2605.860107421875|      1|
|      4858|     9618| 20170708| -3108.56005859375|      1|
|      1356|     3627| 20170708|   8159.0498046875|      1|
+----------+---------+---------+------------------+-------+
only showing top 5 rows



In [52]:
dimAccount.write.parquet("dimAccount.parquet")

In [53]:
dimDate.write.parquet("dimDate.parquet")

In [54]:
dimCustomer.write.parquet("dimCustomer.parquet")

In [55]:
factCashBalances.write.parquet("factCashBalances.parquet")