------------------
# Extraction

In [91]:
import pyspark
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('customer_etl').getOrCreate()

Let's load cdw_sapp_custmer.json

In [92]:
cust_df = spark.read.json("source_data/cdw_sapp_custmer.json")

--------------------------------
# Exploratory Analysis

In [93]:
cust_df.printSchema()
cust_df.show(10)

root
 |-- APT_NO: string (nullable = true)
 |-- CREDIT_CARD_NO: string (nullable = true)
 |-- CUST_CITY: string (nullable = true)
 |-- CUST_COUNTRY: string (nullable = true)
 |-- CUST_EMAIL: string (nullable = true)
 |-- CUST_PHONE: long (nullable = true)
 |-- CUST_STATE: string (nullable = true)
 |-- CUST_ZIP: string (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- LAST_UPDATED: string (nullable = true)
 |-- MIDDLE_NAME: string (nullable = true)
 |-- SSN: long (nullable = true)
 |-- STREET_NAME: string (nullable = true)

+------+----------------+------------+-------------+--------------------+----------+----------+--------+----------+---------+--------------------+-----------+---------+-----------------+
|APT_NO|  CREDIT_CARD_NO|   CUST_CITY| CUST_COUNTRY|          CUST_EMAIL|CUST_PHONE|CUST_STATE|CUST_ZIP|FIRST_NAME|LAST_NAME|        LAST_UPDATED|MIDDLE_NAME|      SSN|      STREET_NAME|
+------+----------------+------------+---

How many rows do we have in total in this dataframe?

In [94]:
cust_df.count()

952

Let's rearrange the columns so they make a bit more sense when looking at them.

In [95]:
cust_df = cust_df.select('CREDIT_CARD_NO', 'SSN', 'CUST_EMAIL', 'CUST_PHONE',\
            'FIRST_NAME','MIDDLE_NAME', 'LAST_NAME', 'APT_NO', 'STREET_NAME',\
            'CUST_CITY', 'CUST_ZIP', 'CUST_STATE', 'CUST_COUNTRY', 'LAST_UPDATED')

cust_df.show(5)

+----------------+---------+-------------------+----------+----------+-----------+---------+------+-----------------+------------+--------+----------+-------------+--------------------+
|  CREDIT_CARD_NO|      SSN|         CUST_EMAIL|CUST_PHONE|FIRST_NAME|MIDDLE_NAME|LAST_NAME|APT_NO|      STREET_NAME|   CUST_CITY|CUST_ZIP|CUST_STATE| CUST_COUNTRY|        LAST_UPDATED|
+----------------+---------+-------------------+----------+----------+-----------+---------+------+-----------------+------------+--------+----------+-------------+--------------------+
|4210653310061055|123456100|AHooper@example.com|   1237818|      Alec|         Wm|   Hooper|   656|Main Street North|     Natchez|   39120|        MS|United States|2018-04-21T12:49:...|
|4210653310102868|123453023|EHolman@example.com|   1238933|      Etta|    Brendan|   Holman|   829|    Redwood Drive|Wethersfield|   06109|        CT|United States|2018-04-21T12:49:...|
|4210653310116272|123454487|WDunham@example.com|   1243018|    Wilber|

In [96]:
cust_df.describe().show()

+-------+--------------------+--------------------+--------------------+------------------+----------+-----------+---------+------------------+-----------+---------+------------------+----------+-------------+--------------------+
|summary|      CREDIT_CARD_NO|                 SSN|          CUST_EMAIL|        CUST_PHONE|FIRST_NAME|MIDDLE_NAME|LAST_NAME|            APT_NO|STREET_NAME|CUST_CITY|          CUST_ZIP|CUST_STATE| CUST_COUNTRY|        LAST_UPDATED|
+-------+--------------------+--------------------+--------------------+------------------+----------+-----------+---------+------------------+-----------+---------+------------------+----------+-------------+--------------------+
|  count|                 952|                 952|                 952|               952|       952|        952|      952|               952|        952|      952|               952|       952|          952|                 952|
|   mean|4.210653353718597...|1.2345552588130252E8|                null|1239

From the above summary we can see that the credit card numbers are all 16 digits long, SSN numbers are all 9 digits long, zip codes are all 5 digits long, and all the customers are from the United States.  The only issue is that the phone numbers are only 7 digits long but should be 10 digits. 

It seems like the CUST_ZIP and CUST_STATE columns are in string format so the min and max values might only consider the first character when considering order of values.  Lets confirm that the states are 2 characters in length and the zip codes are all 5 digits long.

In [97]:
cust_df.withColumn("zip_len", F.length(cust_df["CUST_ZIP"]))\
    .groupBy("zip_len").count().show()

cust_df.withColumn("state_len", F.length(cust_df["CUST_STATE"]))\
    .groupBy("state_len").count().show()

+-------+-----+
|zip_len|count|
+-------+-----+
|      5|  952|
+-------+-----+

+---------+-----+
|state_len|count|
+---------+-----+
|        2|  952|
+---------+-----+



Lets make sure the emails are all valid. All have a "@" and a "." (for example, ".com") and no blank spaces.

In [98]:
email_filter_df = cust_df.filter(~F.col('CUST_EMAIL').rlike('^\S+@\S+\.\S+$'))
email_filter_df.select('CUST_EMAIL').show()

#^\S+@\S+\.\S+$ checks to see if the string is a valid email address.  the "^\S+" checks
#if there are one or more non-whitespace characters at the start of the string.  Can't have
#blank spaces in an email address. The ^ is necessary because "ex ample@example.com" would be
#a match on "ample@example.com".  We check for "@" with one or more non-whitespace character after it
#followed by a "." (\. needs to be used because just a "." in regex matches to any single character).
#Finished off by "\S+$" which matches one or more non-whitespace characters that end the string.  
#The "$" is necessary for the same reason that the "^" is necessary.  The "~" is like a NOT so only
#the non matches will be shown.

+----------+
|CUST_EMAIL|
+----------+
+----------+



Let's make sure all the CC numbers, SSNs, Phone numbers, and Emails are unique. Remember, there are 952 total entries.

In [99]:
cust_df.select(F.countDistinct("CREDIT_CARD_NO")).show()
cust_df.select(F.countDistinct("SSN")).show()
cust_df.select(F.countDistinct("CUST_PHONE")).show()
cust_df.select(F.countDistinct("CUST_EMAIL")).show()

+------------------------------+
|count(DISTINCT CREDIT_CARD_NO)|
+------------------------------+
|                           952|
+------------------------------+

+-------------------+
|count(DISTINCT SSN)|
+-------------------+
|                952|
+-------------------+

+--------------------------+
|count(DISTINCT CUST_PHONE)|
+--------------------------+
|                       901|
+--------------------------+

+--------------------------+
|count(DISTINCT CUST_EMAIL)|
+--------------------------+
|                       928|
+--------------------------+



Hmmm, the Phone numbers and email don't seem to be unique.  Let's explore further.

In [100]:
cust_df.groupBy('CUST_PHONE').count().orderBy(F.col('count').desc()).show()

+----------+-----+
|CUST_PHONE|count|
+----------+-----+
|   1241898|    3|
|   1236886|    3|
|   1237294|    3|
|   1243382|    2|
|   1239063|    2|
|   1240382|    2|
|   1243093|    2|
|   1243459|    2|
|   1238707|    2|
|   1242999|    2|
|   1235756|    2|
|   1242026|    2|
|   1240817|    2|
|   1235508|    2|
|   1236877|    2|
|   1240229|    2|
|   1243066|    2|
|   1236672|    2|
|   1240046|    2|
|   1242677|    2|
+----------+-----+
only showing top 20 rows



Let's look at a sample phone number to see if any of the other customer info overlaps.

In [101]:
cust_df.where(cust_df['CUST_PHONE'] == '1236886').show()

+----------------+---------+--------------------+----------+----------+-----------+---------+------+--------------+---------+--------+----------+-------------+--------------------+
|  CREDIT_CARD_NO|      SSN|          CUST_EMAIL|CUST_PHONE|FIRST_NAME|MIDDLE_NAME|LAST_NAME|APT_NO|   STREET_NAME|CUST_CITY|CUST_ZIP|CUST_STATE| CUST_COUNTRY|        LAST_UPDATED|
+----------------+---------+--------------------+----------+----------+-----------+---------+------+--------------+---------+--------+----------+-------------+--------------------+
|4210653315752398|123455638|FMeredith@example...|   1236886|   Francis|    Donnell| Meredith|   417|Lincoln Street| Wilmette|   60091|        IL|United States|2018-04-21T12:49:...|
|4210653352401004|123456941|  EWells@example.com|   1236886|     Edwin|      Alice|    Wells|   890|    5th Avenue|Mundelein|   60060|        IL|United States|2018-04-21T12:49:...|
|4210653399859149|123454047| EBeatty@example.com|   1236886|     Emery|    Susanna|   Beatty|  

Seems like 3 different people have the same number. I would consider using an area code that corresponds to the customer's location but in this example two of the customers are in the same state and fall under the same area code.

Let's add an area code to the phone numbers based on the customer's zip code and see if that helps differentiate the data.  First, lets load the area code data into a dataframe.  I cleaned up and created this file in the "area_code_data" notebook file.

In [107]:
area_code_df = spark.read.json("source_data/area_codes.json")

In [108]:
area_code_df.show(10)

+-----------+----------+-------+
|       city|  npa_list|zipCode|
+-----------+----------+-------+
|   Adjuntas|[939, 787]|  00601|
|     Aguada|     [787]|  00602|
|  Aguadilla|     [787]|  00603|
|    Maricao|[939, 787]|  00606|
|     Anasco|     [787]|  00610|
|    Arecibo|     [787]|  00612|
|Barceloneta|     [787]|  00617|
|   Boqueron|     [787]|  00622|
|  Cabo Rojo|     [787]|  00623|
|   Penuelas|     [787]|  00624|
+-----------+----------+-------+
only showing top 10 rows



In [109]:
joined_df = cust_df.join(area_code_df, cust_df.CUST_ZIP == area_code_df.zipCode, "left")
joined_df.show(5)

+----------------+---------+-------------------+----------+----------+-----------+---------+------+-----------------+------------+--------+----------+-------------+--------------------+------------+---------------+-------+
|  CREDIT_CARD_NO|      SSN|         CUST_EMAIL|CUST_PHONE|FIRST_NAME|MIDDLE_NAME|LAST_NAME|APT_NO|      STREET_NAME|   CUST_CITY|CUST_ZIP|CUST_STATE| CUST_COUNTRY|        LAST_UPDATED|        city|       npa_list|zipCode|
+----------------+---------+-------------------+----------+----------+-----------+---------+------+-----------------+------------+--------+----------+-------------+--------------------+------------+---------------+-------+
|4210653310061055|123456100|AHooper@example.com|   1237818|      Alec|         Wm|   Hooper|   656|Main Street North|     Natchez|   39120|        MS|United States|2018-04-21T12:49:...|     Natchez|     [601, 769]|  39120|
|4210653310102868|123453023|EHolman@example.com|   1238933|      Etta|    Brendan|   Holman|   829|    Redwo

In [112]:
counts_df = joined_df.groupBy(joined_df.CUST_PHONE).agg(F.count("*").alias("phone_count"))
result_df = counts_df.where(counts_df.phone_count > 1)
result_df.count()

48

In [None]:
npa_list = []

for row in joined_df.collect():



    
    result_list = area_code_df.select('npa').where(area_code_df.zipCode == row[10]).collect()
    result_list = [row[0] for row in result_list]
    if result_list:
        npa_list.append(result_list[0])
        first_item = result_list.pop(0)
        result_list.append(first_item)
    else:
        npa_list.append("null")

In [105]:
result_list = area_code_df.select('npa').where(area_code_df.zipCode == "39120").collect()
#have to use collect.  collect() returns a list of rows.  you can iterate over this list.
#if you don't use the collect() it will return a column object which can't be iterated over.
result_list = [row[0] for row in result_list]
result_list

[['601', '769']]

In [106]:
npa_list = []

for row in cust_df.collect():
    result_list = area_code_df.select('npa').where(area_code_df.zipCode == row[10]).collect()
    result_list = [row[0] for row in result_list]
    if result_list:
        npa_list.append(result_list[0])
        first_item = result_list.pop(0)
        result_list.append(first_item)
    else:
        npa_list.append("null")

Py4JJavaError: An error occurred while calling o25766.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2879.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2879.0 (TID 2896) (DESKTOP-9ER9NNS executor driver): org.apache.spark.SparkFileNotFoundException: File file:/c:/Users/phil/Desktop/Data Engineering Boot Camp/Data-Engineering-Capstone-Project/source_data/area_codes.json does not exist
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.readCurrentFileNotFoundError(QueryExecutionErrors.scala:794)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:231)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:290)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2284)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2328)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1019)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1018)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)
	at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:3997)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4167)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4165)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4165)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3994)
	at sun.reflect.GeneratedMethodAccessor105.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkFileNotFoundException: File file:/c:/Users/phil/Desktop/Data Engineering Boot Camp/Data-Engineering-Capstone-Project/source_data/area_codes.json does not exist
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.readCurrentFileNotFoundError(QueryExecutionErrors.scala:794)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:231)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:290)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [None]:
cust_df.select('FIRST_NAME', 'npa', 'CUST_ZIP').orderBy('CUST_ZIP','FIRST_NAME','npa').show()

+----------+---+--------+
|FIRST_NAME|npa|CUST_ZIP|
+----------+---+--------+
|   Allison|781|   01810|
|  Courtney|781|   01810|
|     Dusty|781|   01810|
|     Elmer|781|   01810|
|  Isabella|781|   01810|
|   Jacinto|781|   01810|
|   Janelle|781|   01810|
|   Liliana|781|   01810|
|  Mckinley|781|   01810|
|   Pearlie|781|   01810|
|    Prince|781|   01810|
|     Tanya|781|   01810|
|      Aron|781|   02127|
|  Bradford|781|   02127|
|       Bud|781|   02127|
|      Russ|781|   02127|
|     Tonya|781|   02127|
|    Alexis|339|   02155|
|    Brooke|339|   02155|
|   Geraldo|339|   02155|
+----------+---+--------+
only showing top 20 rows



In [None]:
#had to use pandas because adding a list in pandas is super easy.  in pyspark I would have to 
#create a dataframe for the python list and add an incrementing index column for both dataframes 
#and then join them on the index column and then drop that column.

# pandas_cust = cust_df.toPandas()
# pandas_cust['npa'] = npa_list
# cust_df = spark.createDataFrame(pandas_cust)
# cust_df.show()

+----------------+---------+--------------------+----------+----------+-----------+---------+------+-----------------+------------+--------+----------+-------------+--------------------+---+
|  CREDIT_CARD_NO|      SSN|          CUST_EMAIL|CUST_PHONE|FIRST_NAME|MIDDLE_NAME|LAST_NAME|APT_NO|      STREET_NAME|   CUST_CITY|CUST_ZIP|CUST_STATE| CUST_COUNTRY|        LAST_UPDATED|npa|
+----------------+---------+--------------------+----------+----------+-----------+---------+------+-----------------+------------+--------+----------+-------------+--------------------+---+
|4210653310061055|123456100| AHooper@example.com|   1237818|      Alec|         Wm|   Hooper|   656|Main Street North|     Natchez|   39120|        MS|United States|2018-04-21T12:49:...|601|
|4210653310102868|123453023| EHolman@example.com|   1238933|      Etta|    Brendan|   Holman|   829|    Redwood Drive|Wethersfield|   06109|        CT|United States|2018-04-21T12:49:...|860|
|4210653310116272|123454487| WDunham@example.

In [None]:
cust_df.show(950)

+----------------+---------+--------------------+----------+----------+-----------+-------------+------+-------------------+-----------------+--------+----------+-------------+--------------------+
|  CREDIT_CARD_NO|      SSN|          CUST_EMAIL|CUST_PHONE|FIRST_NAME|MIDDLE_NAME|    LAST_NAME|APT_NO|        STREET_NAME|        CUST_CITY|CUST_ZIP|CUST_STATE| CUST_COUNTRY|        LAST_UPDATED|
+----------------+---------+--------------------+----------+----------+-----------+-------------+------+-------------------+-----------------+--------+----------+-------------+--------------------+
|4210653310061055|123456100| AHooper@example.com|   1237818|      Alec|         Wm|       Hooper|   656|  Main Street North|          Natchez|   39120|        MS|United States|2018-04-21T12:49:...|
|4210653310102868|123453023| EHolman@example.com|   1238933|      Etta|    Brendan|       Holman|   829|      Redwood Drive|     Wethersfield|   06109|        CT|United States|2018-04-21T12:49:...|
|421065331

In [None]:
area_code_df.select('npa').where(area_code_df.zipCode == "30024").limit(1).show()

+---+
|npa|
+---+
|770|
+---+



In [None]:
cust_df.withColumn('npa', F.lookup(area_code_df, "zipCode", cust_df['CUST_ZIP'])).show()

AttributeError: module 'pyspark.sql.functions' has no attribute 'lookup'

In [None]:
cust_df.withColumn('CUST_PHONE',
            F.format_string("(%s)%s-%s", area_code_df.select('npa').where(area_code_df.zipCode == cust_df.CUST_ZIP).limit(1),
            cust_df['CUST_PHONE'][0:3], cust_df['CUST_PHONE'][3:4])).show()

AnalysisException: Resolved attribute(s) CUST_ZIP#2001 missing from npa#3591,zipCode#3593 in operator !Filter (zipCode#3593 = CUST_ZIP#2001).;
Project [npa#3591]
+- !Filter (zipCode#3593 = CUST_ZIP#2001)
   +- Project [npa#3591, zipCode#3593]
      +- Relation [city#3589,countryISO#3590,npa#3591,stateISO#3592,zipCode#3593] json
