In [1]:
import pandas as pd
import usaddress
import numpy as np
from typing import Dict, List, Tuple, TypeAlias, Union
from sklearn.model_selection import train_test_split
import spacy
from spacy.tokens import DocBin
import os

In [2]:
nlp = spacy.blank("en")

In [3]:
locations = pd.read_csv('../data/raw/urban_renewal_location_photos.csv')

In [4]:
def clean_title(title):
    return title.replace('.', '').replace('&', 'and')

In [5]:
street_mask = locations.street.notna()
locations.loc[street_mask, 'street_address'] = locations[street_mask].street.str.split(',').str[0].apply(clean_title)
locations['title_cleaned'] = locations.title.apply(clean_title)

In [6]:
title_contains_street = locations[street_mask].apply(lambda x : x.street_address in x.title_cleaned, axis=1)

In [7]:
labeled_examples = locations[street_mask&title_contains_street]
le_last_index = len(labeled_examples) - 1

In [8]:
def construct_street_name(x : Dict[str, str]) -> str:
    """
    Constructs a street name from usaddress tags
    """
    if not x.get('StreetName', False):
        return np.nan
    street_parts = []
    for key in ['StreetNamePreDirectional', 'StreetName', 'StreetNamePostType', 'StreetNamePostDirectional']:
        part = x.get(key, False)
        if not part:
            continue

        street_parts.append(part)

    return ' '.join(street_parts).strip()
 

In [9]:
all_streets = locations[street_mask].street_address.apply(lambda x : usaddress.tag(x)).str[0].apply(construct_street_name)
street_names = all_streets[all_streets.notna()].unique()
sn_last_index = len(street_names) -1

highest_address = 9999

blocks = list(range(100, highest_address, 100))
blocks_last_index = len(blocks) - 1

range_additions = [2, 2, 2, 4, 6, 8, 10, 20]
ra_last_index = len(range_additions) - 1

In [10]:
Label : TypeAlias = Tuple[int, int, str]
LabeledExample : TypeAlias = Tuple[str, List[Label]]

def replace_street(new_street: str) -> str:
    row = labeled_examples.iloc[np.random.randint(0, le_last_index)]
    return row.title_cleaned.replace(row.street_address, new_street)

def create_address_label(title : str, street : str) -> Label:
    start = title.find(street)
    end  = start + len(street)
    return (start, end, 'ADDRESS')

def create_labeled_example(new_street : str, example : Union[str, bool] = False) -> LabeledExample:
    if not example:
        example = replace_street(new_street)
    label =  create_address_label(example, new_street)
    return (example, [label]) 

def create_intersection_example() -> LabeledExample:
    street1 = street_names[np.random.randint(0, sn_last_index)]
    street2 = street_names[np.random.randint(0, sn_last_index)]
    new_street = f'{street1} and {street2}'
    return create_labeled_example(new_street)
    
def create_block_example() -> LabeledExample:
    block = blocks[np.random.randint(0, blocks_last_index)]
    street = street_names[np.random.randint(0, sn_last_index)]
    new_street = f'{block} block of {street}'
    return create_labeled_example(new_street)

def create_range_example() -> LabeledExample:
    start = np.random.randint(10, highest_address)
    end = start + range_additions[np.random.randint(0, ra_last_index)]
    street = street_names[np.random.randint(0, sn_last_index)]
    new_street = f'{start}-{str(end)[-2:]} {street}'
    return create_labeled_example(new_street)


In [11]:
current_examples = locations[street_mask&title_contains_street].drop_duplicates(
    subset=['title']
).apply(
    lambda x : create_labeled_example(x.street_address, x.title_cleaned),
    axis=1
).tolist()

In [12]:
n_intersections = 2000
n_blocks = 200
n_street_range = 200

In [13]:
intersection_examples = [create_intersection_example() for _ in range(n_intersections)]
block_examples = [create_block_example() for _ in range(n_blocks)]
range_examples = [create_range_example() for _ in range(n_street_range)]

In [14]:
all_examples = current_examples + intersection_examples + block_examples + range_examples

In [15]:
train_set, remainder = train_test_split(all_examples, test_size=0.2)
eval_set, test_set = train_test_split(remainder, test_size=0.5)

In [21]:
def examples_to_spacy(dataset, outputfile):
    count = 0
    db = DocBin()
    for text, annotations in dataset:
        doc = nlp(text)
        ents = []
        for start, end, label in annotations:
            span = doc.char_span(start, end, label=label)
            ents.append(span)
        try:
            doc.ents = ents
            db.add(doc)
        except Exception as e:
            count = count + 1
    print(count)
    db.to_disk(outputfile)

In [22]:

root_path = '../data/processed/address_ner'
os.makedirs(root_path, exist_ok=True)

for dataset, outputfile in zip([train_set, eval_set, test_set], ['train.spacy', 'eval.spacy', 'test.spacy']):
    examples_to_spacy(dataset, root_path + '/' + outputfile)

27
1
6
