In [0]:
!pip install geocoder python-Levenshtein

[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
Collecting tqdm
  Using cached tqdm-4.66.4-py3-none-any.whl (78 kB)
Installing collected packages: tqdm
Successfully installed tqdm-4.66.4
[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m


In [0]:
import sys, os
sys.path.append(os.path.abspath('/Workspace/Repos/zhastay_yeltay@epam.com/utils/'))

from delta.tables import *
from pyspark.sql import functions as F
import requests
import geocoder
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, LongType

from pyspark.sql.window import Window
from pyspark.sql import DataFrame

import pandas as pd


from init import *
init_spark()

from util_logger import init_logger
dbutils.widgets.text('task', "test_logger")
logger = init_logger(dbutils.widgets.get('task'))

In [0]:
def get_address_info(address):
    try:
        g = geocoder.bing(address, key='YOUR-API-KEY')
        data = g.json
        return (
            data['lat'] if 'lat' in data else None,
            data['lng'] if 'lng' in data else None,
            data['postal'] if 'postal' in data else None,
            data['raw']['address']['adminDistrict2'] if 'adminDistrict2' in data['raw']['address'] else None,
            data['neighborhood'] if 'neighborhood' in data else None,
            data['address'] if 'address' in data else None
        )
    except Exception as e:
        print(f'address: {address}, {str(e)}')
        return (None, None, None, None, None, None)
    
coord_schema = StructType([
    StructField("lat", DoubleType(), True),
    StructField("lng", DoubleType(), True),
    StructField("postal_code", StringType(), True),
    StructField("county", StringType(), True),
    StructField("neighborhood", StringType(), True),
    StructField("completed_address", StringType(), True),
])

get_address_info_udf = F.udf(get_address_info, coord_schema)

print(get_address_info('1 Boren Ave, Port Townsend, Washington, US'))
print(get_address_info('4932 20, Port Townsend, Washington'))
print(get_address_info('7266 20, Port Townsend, Washington'))
print(get_address_info('313782 Us 101 street, Brinnon city, Washington state, US country'))
print(get_address_info('304753 Us 101 street, Brinnon city, Washington state, US country'))



(48.0535013, -122.7730743, '98368', 'Jefferson County', None, '1 N Stromberg Ave, Port Townsend, WA 98368')
(48.028846, -122.824346, '98368', 'Jefferson County', None, '4932 State Route 20, Port Townsend, WA 98368')
(48.062968, -122.818718, '98368', 'Jefferson County', None, '7266 State Route 20, Port Townsend, WA 98368')
(47.6494049, -122.9346286, None, 'Jefferson County', None, 'US Highway 101, Brinnon, WA')
(47.6494049, -122.9346286, None, 'Jefferson County', None, 'US Highway 101, Brinnon, WA')


In [0]:
street_mapping = {
    'N': 'North',
    'S': 'South',
    'W': 'West',
    'E': 'East',
    'NW': 'Northwest',
    'NE': 'Northeast',
    'SW': 'Southwest',
    'SE': 'Southeast',
    'Ave': 'Avenue',
    'St': 'Street',
    'Rd': 'Road',
    'Dr': 'Drive',
    'Pl': 'Place',
    'Ln': 'Lane',
    'Way': 'Way',  # 'Way' does not need abbreviation
    'Blvd': 'Boulevard',
    'Ct': 'Court',
    'Hwy': 'Highway',
    'Pkwy': 'Parkway'
}

def expand_street_names(address):
    result = []
    address = address.replace(' street', '')
    for word in address.split():
        if word[-1] == ',':
            cleaned_word = word.replace(',', '').strip()
            mapped = street_mapping.get(cleaned_word, cleaned_word)+','
        else:
            mapped = street_mapping.get(word, word)
        result.append(mapped)
    return ' '.join(result)

expand_street_names_udf = udf(expand_street_names, StringType())



street_mapping_reverse = {value: key for key, value in street_mapping.items()}

def abbreviate_street_names(address):
    result = []
    for word in address.split():
        if word[-1] == ',':
            cleaned_word = word.replace(',', '').strip()
            mapped = street_mapping_reverse.get(cleaned_word, cleaned_word)+','
        else:
            mapped = street_mapping_reverse.get(word, word)
        result.append(mapped)
    return ' '.join(result)




print(expand_street_names('10319 NE 114th Pl, Kirkland, WA 98033'))
print(abbreviate_street_names('10319 Northeast 114th Place, Kirkland, Washington 98033'))

10319 Northeast 114th Place, Kirkland, WA 98033
10319 NE 114th Pl, Kirkland, Washington 98033


In [0]:
try:
    logger.info(f"Loading addressline_enriched Delta table from zhastay_yeltay_02_silver.addressline_enriched.")
    enriched_addresses_old = spark.table(
        "hive_metastore.zhastay_yeltay_02_silver.addressline_enriched"
    )

    # Log processing start
    logger.info("Starting to process addressline entries.")

    max_id = enriched_addresses_old.agg(F.max('id')).head()[0]
    if not max_id:
        max_id = 0
    print(max_id)

    unique_addresslines = (
        spark.table("hive_metastore.zhastay_yeltay_02_silver.addresses")
        .select("addressline", "country", "state", "city")
        .distinct()
        .withColumn(
            'full_address',
            F.concat(
                F.col("addressline"),
                F.lit(", "),
                # F.lit(" street, "), # Adding that is a street clarification to API
                F.col("city"),
                F.lit(", "),
                # F.lit(" city, "), # Adding that is a city clarification to API
                F.col("state"),
                F.lit(", "),
                # F.lit(" state, "), # Adding that is a state clarification to API
                F.col("country")
            )
        )
        .alias("new")
        .join(
            enriched_addresses_old.alias("old"),
            (F.col("new.addressline") == F.col("old.addressline"))
            & (F.col("new.city") == F.col("old.city"))
            & (F.col("new.state") == F.col("old.state"))
            & (F.col("new.country") == F.col("old.country")),
            how="leftanti",
        )
    )


    pandas_df = unique_addresslines.toPandas()

    # Log the outcome
    logger.info(f"Processed unique addresslines.")
except Exception as e:
    logger.error("An error occurred while processing addressline entries.", exc_info=True)
    raise
    

3429


Unnamed: 0,addressline,country,state,city,full_address
0,14261 Sargent Place,US,Washington,Burlington,"14261 Sargent Place, Burlington, Washington, US"
1,1111 Northeast Broadway,US,Oregon,Portland,"1111 Northeast Broadway, Portland, Oregon, US"
2,514 East Main Street,US,Oregon,Cottage Grove,"514 East Main Street, Cottage Grove, Oregon, US"
3,1137 Southwest Broadway,US,Oregon,Portland,"1137 Southwest Broadway, Portland, Oregon, US"
4,4208 Mason Road Northeast,US,Washington,Seattle,"4208 Mason Road Northeast, Seattle, Washington..."
...,...,...,...,...,...
59,1500 East Main Street,US,Oregon,Cottage Grove,"1500 East Main Street, Cottage Grove, Oregon, US"
60,481 Forest Dr,US,Washington,Brinnon,"481 Forest Dr, Brinnon, Washington, US"
61,11021 Northeast 123rd Lane,US,Washington,Kirkland,"11021 Northeast 123rd Lane, Kirkland, Washingt..."
62,10802 A Eagle Creek Lane,US,Washington,Leavenworth,"10802 A Eagle Creek Lane, Leavenworth, Washing..."


In [0]:
import time


processed_list = []
num_retry = 2


logger.info("Starting to get data from Bing Maps API.")

for it_num, value in enumerate(pandas_df.to_dict('records')):
    query = (
        value['full_address']
        .replace('Us 101', 'US Highway 101')
    )
    retry = 0
    lat, lng, postal_code, county, neighborhood, completed_address = get_address_info(query)

    distance = Levenshtein.distance(abbreviate_street_names(query.split(', ')[0]), abbreviate_street_names(str(completed_address).split(', ')[0]))
    while ((not completed_address) or distance > 10) and retry < num_retry:
        address_line, city, state, country = query.split(', ')
        query = f"{address_line}, {city} city, {state} state, United States"
        time.sleep(0.5)
        print(f"\n i: {it_num}, retry: {retry}, distance: {distance}")
        lat, lng, postal_code, county, neighborhood, completed_address = get_address_info(query)
        
        distance = Levenshtein.distance(abbreviate_street_names(query.split(', ')[0]), abbreviate_street_names(completed_address.split(', ')[0]))
        retry += 1
    

    value['lat'] = lat
    value['lng'] = lng
    value['postal_code'] = postal_code
    value['county'] = county
    value['neighborhood'] = neighborhood
    value['completed_address'] = completed_address
    processed_list.append(value)
    print(f"{it_num} / {pandas_df.shape[0]-1}, is None: {value['completed_address'] == None}, retry: {retry}, distance: {distance} \r", end='', flush=True)
    time.sleep(0.1)


logger.info("Finished of getting data from Bing Maps API.")

0 / 63, is None: False, retry: 0, distance: 8 1 / 63, is None: False, retry: 0, distance: 3 2 / 63, is None: False, retry: 0, distance: 8 3 / 63, is None: False, retry: 0, distance: 3 4 / 63, is None: False, retry: 0, distance: 8 5 / 63, is None: False, retry: 0, distance: 9 6 / 63, is None: False, retry: 0, distance: 8 7 / 63, is None: False, retry: 0, distance: 6 8 / 63, is None: False, retry: 0, distance: 6 9 / 63, is None: False, retry: 0, distance: 1 10 / 63, is None: False, retry: 0, distance: 0 11 / 63, is None: False, retry: 0, distance: 7 12 / 63, is None: False, retry: 0, distance: 0 i: 13, retry: 0, distance: 18

i: 13, retry: 1, distance: 13

13 / 63, is None: False, retry: 2, distance: 13 14 / 63, is None: False, retry: 0, distance: 0 15 / 63, is None: False, retry: 0, distance: 6 16 / 63, is None: False, retry: 0, distance: 0 17 / 63, is None: False, retry: 0, distance: 3 18 / 63, is None: False, retry: 0, distance: 0 19 / 63, is None: False, retry: 0, 

Status code 500 from http://dev.virtualearth.net/REST/v1/Locations: ERROR - 500 Server Error: Internal Server Error for url: http://dev.virtualearth.net/REST/v1/Locations?q=270+Brighton+Ave%2C+Port+Hadlock%2C+Washington%2C+US&o=json&inclnb=1&key=&maxResults=1


address: 270 Brighton Ave, Port Hadlock, Washington, US, argument of type 'NoneType' is not iterable
i: 42, retry: 0, distance: 13

42 / 63, is None: False, retry: 1, distance: 3 43 / 63, is None: False, retry: 0, distance: 6 44 / 63, is None: False, retry: 0, distance: 8 45 / 63, is None: False, retry: 0, distance: 2 46 / 63, is None: False, retry: 0, distance: 4 47 / 63, is None: False, retry: 0, distance: 8 48 / 63, is None: False, retry: 0, distance: 4 49 / 63, is None: False, retry: 0, distance: 5 50 / 63, is None: False, retry: 0, distance: 2 51 / 63, is None: False, retry: 0, distance: 6 i: 52, retry: 0, distance: 12

i: 52, retry: 1, distance: 12

52 / 63, is None: False, retry: 2, distance: 12 i: 53, retry: 0, distance: 12

i: 53, retry: 1, distance: 12

53 / 63, is None: False, retry: 2, distance: 3 54 / 63, is None: False, retry: 0, distance: 6 55 / 63, is None: False, retry: 0, distance: 2 56 / 63, is None: False, retry: 0, distance: 3 i: 57, retry: 0, distan

In [0]:
print(len([i for i in processed_list if i['postal_code'] == None]))

1


In [0]:
try:
    logger.info("Starting to process API entries.")

    if len(processed_list) > 0:
        
        processed_df = (
            spark.createDataFrame(processed_list)
            .withColumn("old_house_number", F.split(F.col("full_address"), " ")[0])
            .withColumn("new_house_number", F.split(F.col("completed_address"), " ")[0])
            .withColumn(
                "levenshtein_distance",
                F.levenshtein(
                    expand_street_names_udf(F.split(F.col("full_address"), ", ")[0]),
                    expand_street_names_udf(F.split(F.col("completed_address"), ", ")[0]),
                ),
            )
            .filter(
                F.col("completed_address").isNotNull()
                # & F.col("postal_code").isNotNull()
                & F.col("lat").isNotNull()
                & F.col("lng").isNotNull()
                # & (F.col('levenshtein_distance') <= 5)
                # & (F.col('old_house_number') == F.col('new_house_number'))
            )
            .withColumn("id", max_id + F.row_number().over(Window.orderBy("full_address")))
            .select(
                "id",
                "addressline",
                "country",
                "state",
                "city",
                "full_address",
                "completed_address",
                "lat",
                "lng",
                "postal_code",
                "county",
                "neighborhood",
            )
        )

        processed_df.write.format("delta").mode("append").saveAsTable(
            f"{catalog_name}.{schema_silver_name}.addressline_enriched"
        )

        # Log the outcome
        logger.info(f"Processed enriched AddressLines from Bing API.")
    else:
        logger.info(f"There is no new AddressLines.")

except Exception as e:
    logger.error("An error occurred while processing enriched AddressLines entries.", exc_info=True)
    raise


id,addressline,country,state,city,full_address,completed_address,lat,lng,postal_code,county,neighborhood
3430,1 Boren Ave,US,Washington,Port Townsend,"1 Boren Ave, Port Townsend, Washington, US","1 N Stromberg Ave, Port Townsend, WA 98368",48.0535013,-122.7730743,98368,Jefferson County,
3431,1 East Main Street,US,Washington,Auburn,"1 East Main Street, Auburn, Washington, US","1 Washington St, Auburn, MA 01501",42.21281596,-71.80338702,01501,Worcester County,
3432,1001 North Broadway Avenue,US,Washington,Everett,"1001 North Broadway Avenue, Everett, Washington, US","1001 N Broadway, Everett, WA 98201",48.004985,-122.195625,98201,Snohomish County,Northwest Everett
3433,1008 72nd Street East,US,Washington,Tacoma,"1008 72nd Street East, Tacoma, Washington, US","1008 E 72nd St, Tacoma, WA 98404",47.19169948,-122.41668081,98404,Pierce County,Eastside
3434,10210 Northeast Points Drive,US,Washington,Kirkland,"10210 Northeast Points Drive, Kirkland, Washington, US","10210 Points Dr, Kirkland, WA 98033",47.643529,-122.203722,98033,King County,Lakeview
3435,10429 Alderbrook Place Northwest,US,Washington,Seattle,"10429 Alderbrook Place Northwest, Seattle, Washington, US","10429 12th Ave NW, Seattle, WA 98177",47.70542224,-122.37151058,98177,King County,North Beach-Blue Ridge
3436,10802 A Eagle Creek Lane,US,Washington,Leavenworth,"10802 A Eagle Creek Lane, Leavenworth, Washington, US","10802 Eagle Creek Rd, Leavenworth, WA 98826",47.6359275,-120.6099657,98826,Chelan County,
3437,10915 Northeast 123rd Lane,US,Washington,Kirkland,"10915 Northeast 123rd Lane, Kirkland, Washington, US","10915 123rd Ln NE, Kirkland, WA 98033",47.69792087,-122.17632431,98033,King County,North Rose Hill
3438,11021 Northeast 123rd Lane,US,Washington,Kirkland,"11021 Northeast 123rd Lane, Kirkland, Washington, US","11021 123rd Ln NE, Kirkland, WA 98033",47.6998583,-122.1776417,98033,King County,North Rose Hill
3439,1111 Northeast Broadway,US,Oregon,Portland,"1111 Northeast Broadway, Portland, Oregon, US","1111 NE Broadway St, Portland, OR 97232",45.53527,-122.654353,97232,Multnomah County,Hollywood


In [0]:
addressline_enriched_count = spark.table("hive_metastore.zhastay_yeltay_02_silver.addressline_enriched").count()
addresses_count = spark.table("hive_metastore.zhastay_yeltay_02_silver.addresses").select('addressline').distinct().count()

logger.info(f'All data processed: {addressline_enriched_count == addresses_count}')

All data processed: True
