Map Area:
http://www.openstreetmap.org/export#map=11/47.6057/-122.3616

In [1]:
##Import Data
import xml.etree.ElementTree as ET  # Use cElementTree or lxml if too slow
import pprint


OSM_FILE = "seattle_WA"  
SAMPLE_FILE = "sample.osm"

k = 10 # Parameter: take every k-th top level element

def get_element(osm_file, tags=('node', 'way', 'relation')):
    """Yield element if it is the right type of tag

    Reference:
    http://stackoverflow.com/questions/3095434/inserting-newlines-in-xml-file-generated-via-xml-etree-elementtree-in-python
    """
    context = iter(ET.iterparse(osm_file, events=('start', 'end')))
    _, root = next(context)
    for event, elem in context:
        if event == 'end' and elem.tag in tags:
            yield elem
            root.clear()


with open(SAMPLE_FILE, 'wb') as output:
    output.write('<?xml version="1.0" encoding="UTF-8"?>\n'.encode())
    output.write('<osm>\n  '.encode())

    # Write every kth top level element
    for i, element in enumerate(get_element(OSM_FILE)):
        if i % k == 0:
            output.write(ET.tostring(element, encoding='utf-8'))

    output.write('</osm>'.encode())

In [2]:
output

<_io.BufferedWriter name='sample.osm'>

In [3]:
##Parse
def count_tags(filename):
        tags={}
        parser = ET.iterparse(filename)
        
        for event, elem in parser:
            if elem.tag not in tags:
                tags[elem.tag] =1
            else:
                tags[elem.tag] +=1
                
        return tags

In [4]:
tags = count_tags('seattle_WA')
pprint.pprint(tags)

{'member': 28193,
 'meta': 1,
 'nd': 233291,
 'node': 208159,
 'note': 1,
 'osm': 1,
 'relation': 689,
 'tag': 253924,
 'way': 25286}


In [5]:
##Audit
from collections import defaultdict
import re

##Street names
street_type_re = re.compile(r'\S+\.?$', re.IGNORECASE)
street_types = defaultdict(int)

def audit_street_type(street_types, street_name):
    m = street_type_re.search(street_name)
    if m:
        street_type = m.group()

        street_types[street_type] += 1

def print_sorted_dict(d):
    keys = d.keys()
    keys = sorted(keys, key=lambda s: s.lower())
    for k in keys:
        v = d[k]
        print("%s: %d" % (k, v))

def is_street_name(elem):
    return (elem.tag == "tag") and (elem.attrib['k'] == "addr:street")

def audit():
    for event, elem in ET.iterparse(SAMPLE_FILE):
        if is_street_name(elem):
            audit_street_type(street_types, elem.attrib['v'])    
    print_sorted_dict(street_types)    

audit()


Alley: 5
Ave: 1
Avenue: 625
Broadway: 15
Court: 5
Drive: 18
East: 381
North: 124
Pl: 1
Place: 32
S: 1
South: 377
Southwest: 3
Street: 632
Way: 102
West: 50


In [6]:
##Audit
##Amenity
amen_type_re = re.compile(r'\S+\.?$', re.IGNORECASE)
amen_types = defaultdict(int)

def audit_amen_type(amen_types, amen_t):
    m = amen_type_re.search(amen_t)
    if m:
        amen_type = m.group()

        amen_types[amen_type] += 1

def print_sorted_dict(d):
    keys = d.keys()
    keys = sorted(keys, key=lambda s: s.lower())
    for k in keys:
        v = d[k]
        print("%s: %d" % (k, v))

def is_amen_type(elem):
    return (elem.tag == "tag") and (elem.attrib['k'] == "amenity")

def audit():
    for event, elem in ET.iterparse(SAMPLE_FILE):
        if is_amen_type(elem):
            audit_amen_type(amen_types, elem.attrib['v'])    
    print_sorted_dict(amen_types)    

audit()

atm: 5
bail_bonds: 1
bank: 10
bar: 8
bbq: 1
bench: 18
bicycle_parking: 122
cafe: 34
car_rental: 2
cinema: 2
clinic: 3
college: 1
courthouse: 1
dentist: 1
doctors: 1
drinking_water: 5
embassy: 1
fast_food: 14
ferry_terminal: 3
fountain: 3
hospital: 2
ice_cream: 1
kindergarten: 2
marketplace: 1
parking: 79
parking_entrance: 10
pharmacy: 3
place_of_worship: 12
post_box: 3
post_office: 1
pub: 6
recycling: 8
restaurant: 49
school: 3
shelter: 1
social_facility: 5
telephone: 3
theatre: 4
toilets: 2
veterinary: 1
waste_basket: 26


In [13]:
##Cleaning
##Street Names

expected = ["Street", "Avenue", "Boulevard", "Drive", "Court", "Place", "Square", "Lane", "Road", 
            "Trail", "Parkway", "Commons","West", "North", "South", "East", "Way", "Alley", "Broadway", "Southwest"]

mapping = { "St": "Street", 
            "St.": "Street", 
             "Rd.": "Road", 
             "Ave": "Avenue", 
             "Av": "Avenue", 
             "Ave.": "Avenue", 
             "Baselin": "Baseline", 
             "Blvd": "Boulevard", 
             "Cir": "Circle", 
             "Ct": "Court", 
             "Dr": "Drive", 
             "Ln": "Lane", 
             "Pl": "Place", 
             "Rd": "Road", 
             "dr": "Drive", 
             "trail": "Trail",
              "S": "South",
           "S.": "South",
              "N": "North",
              "W": "West",
              "E": "East"} 



def audit_street_type(street_types, street_name):
    m = street_type_re.search(street_name)
    if m:
        street_type = m.group()
        if street_type not in expected:
            street_types[street_type].add(street_name)


def is_street_name(elem):
    return (elem.attrib['k'] == "addr:street")


def audit(osmfile):
    osm_file = open(osmfile, "r", encoding='utf-8')
    street_types = defaultdict(set)
    for event, elem in ET.iterparse(osm_file, events=("start",)):

        if elem.tag == "node" or elem.tag == "way":
            for tag in elem.iter("tag"):
                if is_street_name(tag):
                    audit_street_type(street_types, tag.attrib['v'])
    osm_file.close()
    return street_types


def update_name(name, mapping):
    words = name.split(" ")    
    for w in range(len(words)):
         if words[w] in mapping:
            words[w] = mapping[words[w]]
    name = " ".join(words)
    
    return name


def test():
    st_types = audit(SAMPLE_FILE)
    

    for st_type, ways in st_types.items():
        for name in ways:
            better_name = update_name(name, mapping)
            print(name, "=>", better_name)
           
                


if __name__ == '__main__':
    test()

S Bradner Pl => South Bradner Place
15th Ave S => 15th Avenue South
Westlake Ave => Westlake Avenue


In [14]:
##Cleaning
##Amenity
expected = ["bail_bonds","bar","bbq","bench","bicycle_parking","cafe","car_rental","cinema",
            "clinic","college","courthouse", "dentist","doctors","drinking_water","embassy",
            "ferry_terminal","fountain","hospital","ice_cream","kindergarten","marketplace",
            "parking","parking_entrance","pharmacy","place_of_worship","post_box", "bank",
           "post_office","pub","recycling","school","shelter","social_facility",
            "telephone","theatre","toilets","veterinary","waste_basket", "restaurant"
           ]

mapping = {"atm": "bank",
          "fast_food": "restaurant"} 



def audit_amen_type(amen_types, amen_t):
    m = amen_type_re.search(amen_t)
    if m:
        amen_type = m.group()
        if amen_type not in expected:
            amen_types[amen_type].add(amen_type)


def is_amen_type(elem):
    return (elem.attrib['k'] == "amenity")


def audit(osmfile):
    osm_file = open(osmfile, "r", encoding='utf-8')
    amen_types = defaultdict(set)
    for event, elem in ET.iterparse(osm_file, events=("start",)):

        if elem.tag == "node" or elem.tag == "way":
            for tag in elem.iter("tag"):
                if is_amen_type(tag):
                    audit_amen_type(amen_types, tag.attrib['v'])
    osm_file.close()
    return amen_types
def update_name(name, mapping):

    m = amen_type_re.search(name) 
    if m: 
        start, end = m.span() 
        a_name = m.group() 
        
        if a_name in mapping: 
            better_name = mapping[a_name] 
            name = name[:start] + better_name 
             


    return name


def test():
    a_types = audit(SAMPLE_FILE)
    

    for a_type, ways in a_types.items():
      
        for name in ways:
        
            better_name = update_name(name, mapping)
            print(name, "=>", better_name)
           
                


if __name__ == '__main__':
    test()

fast_food => restaurant
atm => bank


In [15]:
##Prepare for database
import csv
import codecs
import pprint
import re
import xml.etree.cElementTree as ET

import cerberus

import schema

OSM_PATH = "sample.osm"

NODES_PATH = "nodes.csv"
NODE_TAGS_PATH = "nodes_tags.csv"
WAYS_PATH = "ways.csv"
WAY_NODES_PATH = "ways_nodes.csv"
WAY_TAGS_PATH = "ways_tags.csv"

LOWER_COLON = re.compile(r'^([a-z]|_)+:([a-z]|_)+')
PROBLEMCHARS = re.compile(r'[=\+/&<>;\'"\?%#$@\,\. \t\r\n]')

SCHEMA = schema.schema

# Make sure the fields order in the csvs matches the column order in the sql table schema
NODE_FIELDS = ['id', 'lat', 'lon', 'user', 'uid', 'version', 'changeset', 'timestamp']
NODE_TAGS_FIELDS = ['id', 'key', 'value', 'type']
WAY_FIELDS = ['id', 'user', 'uid', 'version', 'changeset', 'timestamp']
WAY_TAGS_FIELDS = ['id', 'key', 'value', 'type']
WAY_NODES_FIELDS = ['id', 'node_id', 'position']

def find_tags(elem, id_value):
  """ Load the secondary tags for nodes and ways """
  #assign key and type fields
  key = elem.attrib['k']
  if PROBLEMCHARS.search(key) is None:
    if ':' in key:
      key_split = key.split(':')
      type_field = key_split[0]
      key = key[len(type_field)+1:]
    else:
      type_field = 'regular'
      
  tag_append = { 'id'   : id_value,
          'key'  : key,
          'value' : elem.attrib['v'],
          'type'  : type_field
         }
         
  return tag_append

def shape_element(element, node_attr_fields=NODE_FIELDS, way_attr_fields=WAY_FIELDS,
                  problem_chars=PROBLEMCHARS, default_tag_type='regular'):
    """Clean and shape node or way XML element to Python dict"""

    node_attribs = {}
    way_attribs = {}
    way_nodes = []
    tags = []  # Handle secondary tags the same way for both node and way elements
    expected = ["Street", "Avenue", "Boulevard", "Drive", "Court", "Place", "Square", "Lane", "Road", 
            "Trail", "Parkway", "Commons","West", "North", "South", "East", "Way", "Alley", "Broadway", "Southwest"]

    mapping = { "St": "Street", 
            "St.": "Street", 
             "Rd.": "Road", 
             "Ave": "Avenue", 
             "Av": "Avenue", 
             "Ave.": "Avenue", 
             "Baselin": "Baseline", 
             "Blvd": "Boulevard", 
             "Cir": "Circle", 
             "Ct": "Court", 
             "Dr": "Drive", 
             "Ln": "Lane", 
             "Pl": "Place", 
             "Rd": "Road", 
             "dr": "Drive", 
             "trail": "Trail",
              "S": "South",
              "N": "North",
              "W": "West",
              "E": "East",
              "atm": "bank",
          "fast_food": "restaurant"}
    
    if element.tag == 'node':
    
    #populate key-value pairs for nodes
        for attribute in node_attr_fields:
          node_attribs[attribute] = element.attrib[attribute]
          
        #populate secondary node tags
        for secondary_elem in element.findall('tag'):
          #CLEAN HERE
            if secondary_elem.attrib['k'] == 'addr:street' or secondary_elem.attrib['k']=='amenity':
                name = secondary_elem.attrib['v']
                words = name.split()
                for w in range(len(words)):
                    if words[w] in mapping:
                        if words[w].lower() not in ['suite', 'ste.', 'ste']: 
                            words[w] = mapping[words[w]] 
                            name = " ".join(words)
                            secondary_elem.attrib['v']=name
            tag_append = find_tags(secondary_elem, element.attrib['id'])
            tags.append(tag_append)
        return {'node': node_attribs, 'node_tags': tags}
    elif element.tag == 'way':
       #populate key-value pairs for ways
        for attribute in way_attr_fields:
          way_attribs[attribute] = element.attrib[attribute]
          
        #populate secondary way tags
        for secondary_elem in element.findall('tag'):
            if secondary_elem.attrib['k'] == 'addr:street' or secondary_elem.attrib['k']=='amenity':
                name = secondary_elem.attrib['v']
                words = name.split()
                for w in range(len(words)):
                    if words[w] in mapping:
                        if words[w].lower() not in ['suite', 'ste.', 'ste']: 
                            words[w] = mapping[words[w]] 
                            name = " ".join(words)
                            secondary_elem.attrib['v']=name
            tag_append = find_tags(secondary_elem, element.attrib['id'])
            tags.append(tag_append)
              #populate way nodes
        position = 0
        for secondary_elem in element.findall('nd'):
          way_nodes_append = { 'id'    : element.attrib['id'],
                     'node_id' : secondary_elem.attrib['ref'],
                     'position' : position
                    }
          position += 1
          way_nodes.append(way_nodes_append)
        return {'way': way_attribs, 'way_nodes': way_nodes, 'way_tags': tags}


# ================================================== #
#               Helper Functions                     #
# ================================================== #
def get_element(osm_file, tags=('node', 'way', 'relation')):
    """Yield element if it is the right type of tag"""

    context = ET.iterparse(osm_file, events=('start', 'end'))
    _, root = next(context)
    for event, elem in context:
        if event == 'end' and elem.tag in tags:
            yield elem
            root.clear()


def validate_element(element, validator, schema=SCHEMA):
    """Raise ValidationError if element does not match schema"""
    if validator.validate(element, schema) is not True:
        field, errors = next(validator.errors.iteritems())
        message_string = "\nElement of type '{0}' has the following errors:\n{1}"
        error_string = pprint.pformat(errors)
        
        raise Exception(message_string.format(field, error_string))





# ================================================== #
#               Main Function                        #
# ================================================== #
def process_map(file_in, validate):
    """Iteratively process each XML element and write to csv(s)"""

    with codecs.open(NODES_PATH, 'w', encoding='utf8') as nodes_file, \
         codecs.open(NODE_TAGS_PATH, 'w',encoding='utf8') as nodes_tags_file, \
         codecs.open(WAYS_PATH, 'w',encoding='utf8') as ways_file, \
         codecs.open(WAY_NODES_PATH, 'w',encoding='utf8') as way_nodes_file, \
         codecs.open(WAY_TAGS_PATH, 'w',encoding='utf8') as way_tags_file:

        nodes_writer = csv.DictWriter(nodes_file, NODE_FIELDS)
        node_tags_writer = csv.DictWriter(nodes_tags_file, NODE_TAGS_FIELDS)
        ways_writer = csv.DictWriter(ways_file, WAY_FIELDS)
        way_nodes_writer = csv.DictWriter(way_nodes_file, WAY_NODES_FIELDS)
        way_tags_writer = csv.DictWriter(way_tags_file, WAY_TAGS_FIELDS)

        nodes_writer.writeheader()
        node_tags_writer.writeheader()
        ways_writer.writeheader()
        way_nodes_writer.writeheader()
        way_tags_writer.writeheader()

        validator = cerberus.Validator()

        for element in get_element(file_in, tags=('node', 'way')):
            el = shape_element(element)
            if el:
                if validate is True:
                    validate_element(el, validator)

                if element.tag == 'node':
                    nodes_writer.writerow(el['node'])
                    node_tags_writer.writerows(el['node_tags'])
                elif element.tag == 'way':
                    ways_writer.writerow(el['way'])
                    way_nodes_writer.writerows(el['way_nodes'])
                    way_tags_writer.writerows(el['way_tags'])


if __name__ == '__main__':
    # Note: Validation is ~ 10X slower. For the project consider using a small
    # sample of the map when validating.
    process_map(OSM_PATH, validate=True)