In [1]:
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import xml.etree.ElementTree as ET

# File can be found courtesy of MapZen
# https://mapzen.com/data/metro-extracts/
OSM_FILE = "detroit_michigan.osm"
filename = OSM_FILE

In [2]:
import pprint
import re
def count_tags(filename):
    tags={}
    for event, elem in ET.iterparse(filename):
        if type(elem.tag)=='None':
            pass
        if elem.tag not in tags.keys():
            tags[elem.tag] = 1
        else:
            tags[elem.tag] += 1
    return tags
   
tags = count_tags(filename)
pprint.pprint(tags)

{'bounds': 1,
 'member': 45327,
 'nd': 4229236,
 'node': 3699172,
 'osm': 1,
 'relation': 5682,
 'tag': 2006880,
 'way': 363821}


In [3]:
lower = re.compile(r'^([a-z]|_)*$')
lower_colon = re.compile(r'^([a-z]|_)*:([a-z]|_)*$')
problemchars = re.compile(r'[=\+/&<>;\'"\?%#$@\,\. \t\r\n]')

def key_type(element, keys):
    if element.tag == "tag":
        key = element.get('k')
        if re.search(lower, key):
            keys['lower'] += 1
        elif re.search(lower_colon, key):
            keys['lower_colon'] += 1
        elif re.search(problemchars, key):
            keys['problemchars'] += 1
        else:
            keys['other'] += 1
    return keys

def process_map(filename):
    keys = {"lower": 0, "lower_colon": 0, "problemchars": 0, "other": 0}
    for _, element in ET.iterparse(filename):
        keys = key_type(element, keys)

    return keys

def test():
    keys = process_map(filename)
    pprint.pprint(keys)
test()

{'lower': 909638, 'lower_colon': 967712, 'other': 129527, 'problemchars': 3}


In [4]:
from collections import defaultdict

street_type_re = re.compile(r'\b\S+\.?$', re.IGNORECASE)


expected = ["Street", "Avenue", "Boulevard", "Drive", "Court", "Place", "Square", "Lane", "Road", 
            "Trail", "Parkway", "Commons", "Circle", "Crescent", "Bluff", "North", "South", "East", "West",
           "Grove", "Highway", "Line", "Mall", "Park", "Sideroad", "Triangle", "Way"]

def audit_street_type(street_types, street_name):
    m = street_type_re.search(street_name)
    if m:
        street_type = m.group()
        if street_type not in expected:
            street_types[street_type].add(street_name)


def is_street_name(elem):
    return (elem.attrib['k'] == "addr:street")


def audit(filename):
    osm_file = open(filename, "r")
    street_types = defaultdict(set)
    for event, elem in ET.iterparse(filename, events=("start",)):

        if elem.tag == "node" or elem.tag == "way":
            for tag in elem.iter("tag"):
                if is_street_name(tag):
                    audit_street_type(street_types, tag.attrib['v'])
    osm_file.close()
    return street_types
audit(filename)

defaultdict(set,
            {'1': {'Concession Road 1', 'Kent Road 1'},
             '10': {'Concession Road 10', 'Sideroad 9 & 10'},
             '100': {'Orchard Hill Place, Suite 100'},
             '11': {'Concession Road 11', 'County Road 11'},
             '12': {'Concession Road 12',
              'County Road 12',
              'East Old US 12',
              'East Old US Hwy 12',
              'US 12'},
             '13': {'Concession Road 13', 'Sideroad 12 & 13'},
             '14': {'Concession Road 14',
              'County Road 14',
              'Sideroad 13 & 14'},
             '15': {'County Road 15'},
             '17': {'Sideroad 16 & 17'},
             '18': {'County Road 11 & 18', 'County Road 18'},
             '19': {'Sideroad 18 & 19'},
             '1a': {'Concession Road 1a'},
             '2': {'Concession Road 2', 'County Road 2'},
             '20': {'County Road 20'},
             '202': {'Wilshire Drive, Suite 202'},
             '203': {'North Center St

In [6]:
mapping = { "St": "Street",
            "St.": "Street",
            "Ave": "Avenue",
            "Ave.": "Avenue",
            "Rd": "Road",
            "Rd.": "Road",
            "Ln.": "Lane",
            "Ln": "Lane",
            "N.": "North",
            "N" : "North",
            "S.": "South",
            "S": "South",
            "E.": "East",
            "E": "East",
            "W.": "West",
            "W": "West",
            "Pkwy": "Parkway",
            "Pkwy.": "Parkway",
            "Hwy": "Highway",
            "Hwy.": "Highway",
            "Blvd.": "Boulevard",
            "Blvd": "Boulevard",
            "Dr": "Drive",
            "Dr.": "Drive",
            "Pl" : "Place",
            "Pl." : "Place",
            "Cir" : "Circle",
            "Cir." : "Circle",
            "Ct." : "Court",
            "Ct" : "Court",
            }

def update_name(name, mapping):
    m = street_type_re.search(name)
    if m:
        street_type = m.group()
        if street_type in mapping.keys():
            name = re.sub(m.group(), mapping[m.group()], name)
    return name
 

def test():
    st_types = audit(filename)
    pprint.pprint(dict(st_types))

    for st_type, ways in st_types.iteritems():
        for name in ways:
            better_name = update_name(name, mapping)
            print name, "=>", better_name
            
test()

{'1': set(['Concession Road 1', 'Kent Road 1']),
 '10': set(['Concession Road 10', 'Sideroad 9 & 10']),
 '100': set(['Orchard Hill Place, Suite 100']),
 '11': set(['Concession Road 11', 'County Road 11']),
 '12': set(['Concession Road 12',
            'County Road 12',
            'East Old US 12',
            'East Old US Hwy 12',
            'US 12']),
 '13': set(['Concession Road 13', 'Sideroad 12 & 13']),
 '14': set(['Concession Road 14', 'County Road 14', 'Sideroad 13 & 14']),
 '15': set(['County Road 15']),
 '17': set(['Sideroad 16 & 17']),
 '18': set(['County Road 11 & 18', 'County Road 18']),
 '19': set(['Sideroad 18 & 19']),
 '1a': set(['Concession Road 1a']),
 '2': set(['Concession Road 2', 'County Road 2']),
 '20': set(['County Road 20']),
 '202': set(['Wilshire Drive, Suite 202']),
 '203': set(['North Center Street, Suite 203']),
 '22': set(['County Road 22']),
 '223': set(['New US 223']),
 '23': set(['County Road 23', 'Old US Highway 23']),
 '3': set(['8th Concession Road,

In [7]:
city = set()
postcode = set()

for _, element in ET.iterparse(filename, events=('end',)):
    for child in element.iter('tag'):
        if 'addr:' in child.attrib['k']:
            
            if re.search(r'addr:city$', child.attrib['k']):
                city.add(child.attrib['v'])

            elif re.search(r'addr:postcode$', child.attrib['k']):
                postcode.add(child.attrib['v'])
                
           
print "city: {}\n".format(city)

print "postcode: {}\n".format(postcode)

city: set(['Ypsilanti', 'Trowbridge', 'Tecumseh', 'Maybee', 'Northville Township', 'Casco', 'Dundee', 'brownstown township', 'troy', 'Wayne', 'Lennon', 'Clyde Township', 'lincoln park', 'Detroit, MI', 'Romeo', 'Shelby Township', 'Milford', 'Highland', 'Madison Heights', 'Goodrich', 'Whitmore Lake', 'Leonard', 'highland Park', 'Ann Arbor', 'Lapeer', 'Flint', 'Pinckney', 'Rochester Hills', 'Municipality of Leamington', 'Dearborn', 'Bloomfield Hills', 'Belle River', 'Sarnia 45', 'Montrose', 'Garden City', 'New Hudson', 'Dryden', 'Port Huron', 'Burtchville', 'flint', 'City of Sault Ste. Marie', 'Lincoln Park', 'hamtramck', 'Chesterfield', 'Pittsfield Charter Township', 'Inkster', 'Kimball', 'Erie', 'Washington', 'White Lake Township', 'Township of St. Clair', 'Harrow', 'Center Line', 'Redford', 'Troy', 'Livonia', 'Manchester', 'Columbiaville', 'Lambertville', 'Hazel Park', 'Ray', 'Saint Clair Shores', 'Windsor', 'Warren', 'Fraser', 'Rochester', 'Novi', 'Metamora', 'Commerce Charter Townshi

In [14]:
def fix_postal_code(postcode):
    sep = '-'
    postcode = postcode.strip()
    # check if postcode has a '-' separator
    if sep in postcode:
        # split returns a list
        codesList = postcode.split(sep, 1)
        # select the part of 'postcode' before the `-`
        # strip whitespaces.
        postcode = codesList[0].strip()
    if "MI" in postcode:
        # if "MI" is in postcode, remove it:
        postcode = postcode.replace("MI", "").strip()
    if "N" in postcode:
        # if "N" is in postcode, add a space after 3 characters:
        # where 'postcode[:3]' is 'slicing' the string
        postcode = postcode[:3] + ' ' + postcode[3:]
    # output is updated postcode if cleaned, or the original postcode if not
    # in either case, strip whitespace
    postcode = postcode.strip()
    # in "N" postcodes, there may have already been a space
    # so we would want to remove the extra one
    postcode = " ".join(postcode.split())
    return postcode

In [15]:
check = ['MI64433-4233',' N64431-4233','MI74213','  MI64434  ','  N64434  ',' N64 434']

for entry in check:
    print fix_postal_code(entry)

64433
N64 431
74213
64434
N64 434
N64 434


In [10]:
def fix_cities(city):
    # create an empty string (which will be used to piece the city name 
    # back together again later in the code):
    city1 = ''
    # again, split creates a list (in this case, of each word in the city name)
    # where the default is to split at whitespace:
    cityList = city.split()
    # loop over the words in the city name, capitalize by splicing:
    for ind in cityList:
        # piece the city name back together again, using the empty string 'city1'
        city1 += ind[:1].upper() + ind[1:] + " "
    city = city1.strip()
    return city

In [16]:
import csv
import codecs
import copy
import cerberus
import schema

OSM_PATH = filename

NODES_PATH = "nodes.csv"
NODE_TAGS_PATH = "nodes_tags.csv"
WAYS_PATH = "ways.csv"
WAY_NODES_PATH = "ways_nodes.csv"
WAY_TAGS_PATH = "ways_tags.csv"

LOWER_COLON = re.compile(r'^([a-z]|_)+:([a-z]|_)+')
PROBLEMCHARS = re.compile(r'[=\+/&<>;\'"\?%#$@\,\. \t\r\n]')

SCHEMA = schema.schema

NODE_FIELDS = ['id', 'lat', 'lon', 'user', 'uid', 'version', 'changeset', 'timestamp']
NODE_TAGS_FIELDS = ['id', 'key', 'value', 'type']
WAY_FIELDS = ['id', 'user', 'uid', 'version', 'changeset', 'timestamp']
WAY_TAGS_FIELDS = ['id', 'key', 'value', 'type']
WAY_NODES_FIELDS = ['id', 'node_id', 'position']


def shape_element(element, node_attr_fields=NODE_FIELDS, way_attr_fields=WAY_FIELDS,
                  problem_chars=PROBLEMCHARS, default_tag_type='regular'):
    """Clean and shape node or way XML element to Python dict"""
    node_attribs = {} 
    way_attribs = {}
    way_nodes = []
    tags = []
    
    if element.tag=='node':
        for i in node_attr_fields:
            node_attribs[i]=element.attrib[i]
            
    if element.tag=='way':
        for i in way_attr_fields:
            way_attribs[i]=element.attrib[i]
     
    for tag in element.iter("tag"):
        tag_dict={}
        attributes=tag.attrib
        if problem_chars.search(tag.attrib['k']):
            continue
        if element.tag=='node':
            tag_dict['id']=node_attribs['id']
        else:
            tag_dict['id']=way_attribs['id']
        tag_dict['value']=attributes['v']
        # if there is a certain signifying key, update/fix the street name, city name, and postal code 
        if tag.attrib['k'] == 'addr:street':
            tag_dict['value'] = update_name(tag.attrib['v'], mapping)
        elif tag.attrib['k'] == 'addr:city':
            tag_dict['value'] = fix_cities(tag.attrib['v'])
        elif tag.attrib['k'] == 'addr:postcode':
            tag_dict['value'] = fix_postal_code(tag.attrib['v'])
        else:
            pass
        lc=LOWER_COLON.search(tag.attrib['k'])
        if lc:
            before_colon = re.findall('^(.+?):', tag.attrib['k'])
            after_colon = re.findall('^[a-z|_]+:(.+)', tag.attrib['k'])
            try:
                tag_dict['type'] = before_colon[0]
            except:
                print tag.attrib['k']
            try:
                tag_dict['key'] = after_colon[0]
            except:
                print tag.attrib['k']
        else:
            tag_dict['key']=attributes['k']
            tag_dict['type']='regular'
        tags.append(tag_dict)
    
    if element.tag=='way':
        count=0
        for nd in element.iter("nd"):
            way_node_dict={}
            way_node_dict['id']=way_attribs['id']
            way_node_dict['node_id']=nd.attrib['ref']
            way_node_dict['position']= count
            count += 1
            way_nodes.append(way_node_dict)
    
    if element.tag == 'node':
        return {'node': node_attribs, 'node_tags': tags}
    elif element.tag == 'way':
        return {'way': way_attribs, 'way_nodes': way_nodes, 'way_tags': tags}


# ================================================== #
#               Helper Functions                     #
# ================================================== #
def get_element(osm_file, tags=('node', 'way', 'relation')):
    """Yield element if it is the right type of tag"""

    context = ET.iterparse(osm_file, events=('start', 'end'))
    _, root = next(context)
    for event, elem in context:
        if event == 'end' and elem.tag in tags:
            yield elem
            root.clear()


def validate_element(element, validator, schema=SCHEMA):
    """Raise ValidationError if element does not match schema"""
    if validator.validate(element, schema) is not True:
        field, errors = next(validator.errors.iteritems())
        message_string = "\nElement of type '{0}' has the following errors:\n{1}"
        error_strings = (
            "{0}: {1}".format(k, v if isinstance(v, str) else ", ".join(v))
            for k, v in errors.iteritems()
        )
        raise cerberus.ValidationError(
            message_string.format(field, "\n".join(error_strings))
        )


class UnicodeDictWriter(csv.DictWriter, object):
    """Extend csv.DictWriter to handle Unicode input"""

    def writerow(self, row):
        super(UnicodeDictWriter, self).writerow({
            k: (v.encode('utf-8') if isinstance(v, unicode) else v) for k, v in row.iteritems()
        })

    def writerows(self, rows):
        for row in rows:
            self.writerow(row)


# ================================================== #
#               Main Function                        #
# ================================================== #
def process_map(file_in, validate):
    """Iteratively process each XML element and write to csv(s)"""

    with codecs.open(NODES_PATH, 'w') as nodes_file, \
         codecs.open(NODE_TAGS_PATH, 'w') as nodes_tags_file, \
         codecs.open(WAYS_PATH, 'w') as ways_file, \
         codecs.open(WAY_NODES_PATH, 'w') as way_nodes_file, \
         codecs.open(WAY_TAGS_PATH, 'w') as way_tags_file:

        nodes_writer = UnicodeDictWriter(nodes_file, NODE_FIELDS)
        node_tags_writer = UnicodeDictWriter(nodes_tags_file, NODE_TAGS_FIELDS)
        ways_writer = UnicodeDictWriter(ways_file, WAY_FIELDS)
        way_nodes_writer = UnicodeDictWriter(way_nodes_file, WAY_NODES_FIELDS)
        way_tags_writer = UnicodeDictWriter(way_tags_file, WAY_TAGS_FIELDS)

        nodes_writer.writeheader()
        node_tags_writer.writeheader()
        ways_writer.writeheader()
        way_nodes_writer.writeheader()
        way_tags_writer.writeheader()

        validator = cerberus.Validator()

        for element in get_element(file_in, tags=('node', 'way')):
            el = shape_element(element)
            if el:
                if validate is True:
                    validate_element(el, validator)

                if element.tag == 'node':
                    nodes_writer.writerow(el['node'])
                    node_tags_writer.writerows(el['node_tags'])
                elif element.tag == 'way':
                    ways_writer.writerow(el['way'])
                    way_nodes_writer.writerows(el['way_nodes'])
                    way_tags_writer.writerows(el['way_tags'])


if __name__ == '__main__':
    # Note: Validation is ~ 10X slower. For the project consider using a small
    # sample of the map when validating.
    process_map(OSM_PATH, validate=True)