In [1]:
import os; import sys; import re

# common spark import
from pyspark import SparkFiles
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType

# connect to spark if we haven't already
if not 'spark' in locals():
  spark = SparkSession.builder \
      .master("local[*]") \
      .appName('development') \
      .config("spark.sql.debug.maxToStringFields", str(1024 * 1024)) \
      .getOrCreate()
  sc = spark.sparkContext

print("Connected to Spark!")

Connected to Spark!


# Test Data

In [70]:
data = [("Main St","","","99"),
        ("75 Joseph Ave","Apt 205","","75"),
        ("Algonquin Way","","","99"),
        ("2 Muirfield Run","","","02"),
        ("1950 N Point Blvd","","","50"),
        ("N8603 Carper Rd","","","03"),
        ("1234A MAIN ST","","","34"),
        ("HC 1 Box 1264","","","64"),
        ("525 Circle Dr","","","25"),
        ("RR Box AA","","","99"),
        ("A Main St","Apt 12","","99")]

columns = ["address_line_1","address_line_2","dpc","expected_dpc"]
df_test = spark.createDataFrame(data = data, schema = columns)

In [71]:
df_test.show()

+-----------------+--------------+---+------------+
|   address_line_1|address_line_2|dpc|expected_dpc|
+-----------------+--------------+---+------------+
|          Main St|              |   |          99|
|    75 Joseph Ave|       Apt 205|   |          75|
|    Algonquin Way|              |   |          99|
|  2 Muirfield Run|              |   |          02|
|1950 N Point Blvd|              |   |          50|
|             RR 1|              |   |          99|
|  N8603 Carper Rd|              |   |          03|
|    1234A MAIN ST|              |   |          34|
|    HC 1 Box 1264|              |   |          64|
|    525 Circle Dr|              |   |          25|
|        RR Box AA|              |   |          99|
|        A Main St|        Apt 12|   |          99|
+-----------------+--------------+---+------------+



In [72]:
df_test = df_test.withColumn('housenumber', 
                             (f.regexp_extract(f.col('address_line_1'),'(^[0-9]([0-9A-Z.*-]+)?)', 1)))
df_test.show()

+-----------------+--------------+---+------------+-----------+
|   address_line_1|address_line_2|dpc|expected_dpc|housenumber|
+-----------------+--------------+---+------------+-----------+
|          Main St|              |   |          99|           |
|    75 Joseph Ave|       Apt 205|   |          75|         75|
|    Algonquin Way|              |   |          99|           |
|  2 Muirfield Run|              |   |          02|          2|
|1950 N Point Blvd|              |   |          50|       1950|
|             RR 1|              |   |          99|           |
|  N8603 Carper Rd|              |   |          03|           |
|    1234A MAIN ST|              |   |          34|      1234A|
|    HC 1 Box 1264|              |   |          64|           |
|    525 Circle Dr|              |   |          25|        525|
|        RR Box AA|              |   |          99|           |
|        A Main St|        Apt 12|   |          99|           |
+-----------------+--------------+---+--

In [73]:
# rule 1
#df_test = df_test.withColumn('dpc', (f.regexp_extract(f.col('address_line_1'),'(^[0-9]([0-9A-Z.*-]+)?)', 1)))
df_test = df_test.withColumn('dpc', f.when((f.col('address_line_2') == "") &
(f.col('housenumber')).isNotNull() & f.col('housenumber').rlike('^[0-9]*$'),
(f.regexp_extract(f.col('housenumber'),'(\d+)',1)).substr(-2,2)
).otherwise(f.col('dpc')))
df_test.show()

+-----------------+--------------+---+------------+-----------+
|   address_line_1|address_line_2|dpc|expected_dpc|housenumber|
+-----------------+--------------+---+------------+-----------+
|          Main St|              |   |          99|           |
|    75 Joseph Ave|       Apt 205|   |          75|         75|
|    Algonquin Way|              |   |          99|           |
|  2 Muirfield Run|              |  2|          02|          2|
|1950 N Point Blvd|              | 50|          50|       1950|
|             RR 1|              |   |          99|           |
|  N8603 Carper Rd|              |   |          03|           |
|    1234A MAIN ST|              |   |          34|      1234A|
|    HC 1 Box 1264|              |   |          64|           |
|    525 Circle Dr|              | 25|          25|        525|
|        RR Box AA|              |   |          99|           |
|        A Main St|        Apt 12|   |          99|           |
+-----------------+--------------+---+--

In [74]:
# rule 2
df_test = df_test.withColumn('dpc', 
                   f.when((f.col('dpc') == "") & (f.col('housenumber') == ""), 
                          f.lit('99'))
                   .otherwise(f.col('dpc'))
                  )
df_test.show()

+-----------------+--------------+---+------------+-----------+
|   address_line_1|address_line_2|dpc|expected_dpc|housenumber|
+-----------------+--------------+---+------------+-----------+
|          Main St|              | 99|          99|           |
|    75 Joseph Ave|       Apt 205|   |          75|         75|
|    Algonquin Way|              | 99|          99|           |
|  2 Muirfield Run|              |  2|          02|          2|
|1950 N Point Blvd|              | 50|          50|       1950|
|             RR 1|              | 99|          99|           |
|  N8603 Carper Rd|              | 99|          03|           |
|    1234A MAIN ST|              |   |          34|      1234A|
|    HC 1 Box 1264|              | 99|          64|           |
|    525 Circle Dr|              | 25|          25|        525|
|        RR Box AA|              | 99|          99|           |
|        A Main St|        Apt 12| 99|          99|           |
+-----------------+--------------+---+--

In [None]:
#if address line 1 does not start with a number, use 99
#from pyspark.sql.functions import col, when

#df2 = df_test.withColumn('dpc', f.when(f.col('address_line_1') == 'Main St', f.lit('99'))
                    #.otherwise(f.col('dpc')))

#df2.show()

In [None]:
#pattern = '^[a-z,A-Z]{2,}\s[a-z,A-Z]+'

#df3 = df_test.withColumn('dpc', 
                    f.when(f.col('address_line_1').rlike(pattern), f.lit('99'))
                    .otherwise(f.col('dpc')))

# Dataframe

In [75]:
df = spark.read.option("header", "true").csv("address-linkage-key/address_link/data/test/*medium*.gz")
df.limit(10).toPandas()

Unnamed: 0,predirection,streetname,streetsuffix,postdirection,unitdesignator,unitdesignatornumber,cityname,state,zipcode,zip_4,dwellingtype,address_line_1,address_line_2,expected_dpc,expected_check_digit
0,,Circle,Dr,,,,Fort Morgan,CO,80701,3419,S,525 Circle Dr,,25,0
1,,Primrose,Ave,,,,Vista,CA,92083,8032,S,2312 Primrose Ave,,12,2
2,,Gardenstone,Cir,,,,Tallmadge,OH,44278,1085,S,849 Gardenstone Cir,,49,8
3,,Briarwood,Dr,,,,Crestwood,KY,40014,9019,S,7511 Briarwood Dr,,11,0
4,S,15th,St,,,,Saint Clair,MI,48079,5203,S,1071 S 15th St,,71,4
5,,Front,St,,Apt,4B,Brooklyn,NY,11201,1223,M,206 Front St,Apt 4B,42,1
6,,Millpond,Rd,,,,Elizabeth Cty,NC,27909,7551,S,1295 Millpond Rd,,95,1
7,,Glenwood,Ln,,,,East Meadow,NY,11554,3719,S,479 Glenwood Ln,,79,8
8,NW,9th,St,,Apt,105,Miami,FL,33125,3443,M,2150 NW 9th St,Apt 105,30,9
9,,10th,St,NE,,,Naples,FL,34120,2057,S,460 10th St NE,,60,0


In [76]:
df = df.withColumn('dpc', f.lit(None))

In [77]:
df = df.withColumn('housenumber', (f.regexp_extract(f.col('address_line_1'),'(^[0-9]([0-9A-Z.*-]+)?)', 1)))

In [78]:
df.limit(10).toPandas()

Unnamed: 0,predirection,streetname,streetsuffix,postdirection,unitdesignator,unitdesignatornumber,cityname,state,zipcode,zip_4,dwellingtype,address_line_1,address_line_2,expected_dpc,expected_check_digit,dpc,housenumber
0,,Circle,Dr,,,,Fort Morgan,CO,80701,3419,S,525 Circle Dr,,25,0,,525
1,,Primrose,Ave,,,,Vista,CA,92083,8032,S,2312 Primrose Ave,,12,2,,2312
2,,Gardenstone,Cir,,,,Tallmadge,OH,44278,1085,S,849 Gardenstone Cir,,49,8,,849
3,,Briarwood,Dr,,,,Crestwood,KY,40014,9019,S,7511 Briarwood Dr,,11,0,,7511
4,S,15th,St,,,,Saint Clair,MI,48079,5203,S,1071 S 15th St,,71,4,,1071
5,,Front,St,,Apt,4B,Brooklyn,NY,11201,1223,M,206 Front St,Apt 4B,42,1,,206
6,,Millpond,Rd,,,,Elizabeth Cty,NC,27909,7551,S,1295 Millpond Rd,,95,1,,1295
7,,Glenwood,Ln,,,,East Meadow,NY,11554,3719,S,479 Glenwood Ln,,79,8,,479
8,NW,9th,St,,Apt,105,Miami,FL,33125,3443,M,2150 NW 9th St,Apt 105,30,9,,2150
9,,10th,St,NE,,,Naples,FL,34120,2057,S,460 10th St NE,,60,0,,460


In [79]:
#rule1
df = df.withColumn('dpc', f.when((f.col('address_line_2').isNull()) &
(f.col('housenumber')).isNotNull() & f.col('housenumber').rlike('^[0-9]*$'),
(f.regexp_extract(f.col('housenumber'),'(\d+)',1)).substr(-2,2)
).otherwise(f.col('dpc')))
df.limit(10).toPandas()

Unnamed: 0,predirection,streetname,streetsuffix,postdirection,unitdesignator,unitdesignatornumber,cityname,state,zipcode,zip_4,dwellingtype,address_line_1,address_line_2,expected_dpc,expected_check_digit,dpc,housenumber
0,,Circle,Dr,,,,Fort Morgan,CO,80701,3419,S,525 Circle Dr,,25,0,25.0,525
1,,Primrose,Ave,,,,Vista,CA,92083,8032,S,2312 Primrose Ave,,12,2,12.0,2312
2,,Gardenstone,Cir,,,,Tallmadge,OH,44278,1085,S,849 Gardenstone Cir,,49,8,49.0,849
3,,Briarwood,Dr,,,,Crestwood,KY,40014,9019,S,7511 Briarwood Dr,,11,0,11.0,7511
4,S,15th,St,,,,Saint Clair,MI,48079,5203,S,1071 S 15th St,,71,4,71.0,1071
5,,Front,St,,Apt,4B,Brooklyn,NY,11201,1223,M,206 Front St,Apt 4B,42,1,,206
6,,Millpond,Rd,,,,Elizabeth Cty,NC,27909,7551,S,1295 Millpond Rd,,95,1,95.0,1295
7,,Glenwood,Ln,,,,East Meadow,NY,11554,3719,S,479 Glenwood Ln,,79,8,79.0,479
8,NW,9th,St,,Apt,105,Miami,FL,33125,3443,M,2150 NW 9th St,Apt 105,30,9,,2150
9,,10th,St,NE,,,Naples,FL,34120,2057,S,460 10th St NE,,60,0,60.0,460


In [81]:
# rule 2
df = df.withColumn('dpc', 
                   f.when((f.col('dpc').isNull()) & (f.col('housenumber').isNull()), 
                          f.lit('99'))
                   .otherwise(f.col('dpc'))
                  )

In [82]:
df.limit(50).toPandas()

Unnamed: 0,predirection,streetname,streetsuffix,postdirection,unitdesignator,unitdesignatornumber,cityname,state,zipcode,zip_4,dwellingtype,address_line_1,address_line_2,expected_dpc,expected_check_digit,dpc,housenumber
0,,Circle,Dr,,,,Fort Morgan,CO,80701,3419,S,525 Circle Dr,,25,0,25.0,525.0
1,,Primrose,Ave,,,,Vista,CA,92083,8032,S,2312 Primrose Ave,,12,2,12.0,2312.0
2,,Gardenstone,Cir,,,,Tallmadge,OH,44278,1085,S,849 Gardenstone Cir,,49,8,49.0,849.0
3,,Briarwood,Dr,,,,Crestwood,KY,40014,9019,S,7511 Briarwood Dr,,11,0,11.0,7511.0
4,S,15th,St,,,,Saint Clair,MI,48079,5203,S,1071 S 15th St,,71,4,71.0,1071.0
5,,Front,St,,Apt,4B,Brooklyn,NY,11201,1223,M,206 Front St,Apt 4B,42,1,,206.0
6,,Millpond,Rd,,,,Elizabeth Cty,NC,27909,7551,S,1295 Millpond Rd,,95,1,95.0,1295.0
7,,Glenwood,Ln,,,,East Meadow,NY,11554,3719,S,479 Glenwood Ln,,79,8,79.0,479.0
8,NW,9th,St,,Apt,105,Miami,FL,33125,3443,M,2150 NW 9th St,Apt 105,30,9,,2150.0
9,,10th,St,NE,,,Naples,FL,34120,2057,S,460 10th St NE,,60,0,60.0,460.0


In [None]:
df = df.withColumn('dpc', 
                    f.when(f.col('address_line_1').rlike(pattern), f.lit('99'))
                    .otherwise(f.col('dpc')))

In [None]:
df.limit(25).toPandas()

In [None]:
df.groupBy('dpc').count().show()

In [None]:
df.filter(df.dpc == '99').limit(20).toPandas()

In [None]:
def apply_rule_2(df):
    # Use 99 when address contains no house number
    df = df.withColumn('dpc',f.when((f.col('dpc').isNull()) & (f.col('housenumber').isNull()),f.lit('99'))
                       .otherwise(f.col('dpc')))
    return df

class TestRule2(SparkTestCase):
    def test(self):
        # Test Case: Rule 2 should use 99 when the address contains no house number.
        # run a query that gives me the input that the function expects and the expected value from that
        # parse operation
        testdf = self.spark.sql("SELECT 'MAIN ST' AS address_line_1, CAST(NULL AS string) AS housenumber, CAST(NULL AS string) AS dpc, '99' AS expected")
        #run the function
        testdf = apply_rule_2(testdf)
        #gather the results [row 0 only] and compare the returned dpc to the expected dpc
        row = testdf.collect()[0]
        self.assertEquals(row.expected, row.dpc, "rule 2 fail; expected does not match dpc")

In [None]:
class TestRule1(SparkTestCase):
    def test(self):
        # Test Case: Rule 1 (General Rule) should provide the last two digits of a primary street number, 
        # post office box, rural route box, or highway contract route number
        # run a query that gives me the input that the function expects and the expected value from that
        # parse operation
        testdf = self.spark.sql("SELECT '1234 MAIN ST' AS address_line_1, CAST(NULL AS string) AS dpc, '34' AS expected")
        #run the function
        testdf = apply_rule_1(testdf)
        #gather the results [row 0 only] and compare the returned dpc to the expected dpc
        row = testdf.collect()[0]
        self.assertEquals(row.expected, row.dpc, "rule 1 created match")