I downloaded the OSM data for the [Denver and Boulder, Colorado](https://mapzen.com/data/metro-extracts/metro/denver-boulder_colorado/) metro area. I had to download [7-Zip](http://www.7-zip.org/) to decompress the file.

Sample OSM area.

Import modules

In [1]:
import xml.etree.cElementTree as ET
from collections import defaultdict
from pprint import pprint
import re

Create sample

In [2]:
# Code provided by Udacity
OSM_FILE = "denver-boulder_colorado.osm"
SAMPLE_FILE = "sample.osm"

k = 15 # Parameter: take every k-th top level element; decrement to sample larger portions of data

def get_element(osm_file, tags=('node', 'way', 'relation')):
    """Yield element if it is the right type of tag

    Reference:
    http://stackoverflow.com/questions/3095434/inserting-newlines-in-xml-file-generated-via-xml-etree-elementtree-in-python
    """
    context = iter(ET.iterparse(osm_file, events=('start', 'end')))
    _, root = next(context)
    for event, elem in context:
        if event == 'end' and elem.tag in tags:
            yield elem
            root.clear()
            
with open(SAMPLE_FILE, 'wb') as output:
    output.write('<?xml version="1.0" encoding="UTF-8"?>\n')
    output.write('<osm>\n  ')

    # Write every kth top level element
    for i, element in enumerate(get_element(OSM_FILE)):
        if i % k == 0:
            output.write(ET.tostring(element, encoding='utf-8'))

    output.write('</osm>')

Auditing and Cleaning

In [5]:
expected_streets = ["Street", "Avenue", "Boulevard", "Drive", "Court", "Place", "Square", "Lane", "Road", 
            "Trail", "Parkway", "Commons"]

def audit_street_type(street_types, street_name):
    """ set, str -> set
    Given a set list of street types and v tag attribute for addr:street element,
    add the attribute to the set and print if it's not in the list of expected
    street types expected_streets.
    
    expected_streets = ["Street"]
    audit_street_type(defaultdict(set), "Parkway")
    
    Parkway
    (["Parkway"])
    """
    street_type_re = re.compile(r'\b\S+\.?$', re.IGNORECASE)
    m = street_type_re.search(street_name)
    if m:
        street_type = m.group()
        if street_type not in expected_streets:
            street_types[street_type].add(street_name)

def audit_postcode(postcodes, postcode):
    """ dict, str -> dict
    Return dictionary of postcodes.
    """
    p = re.match(r'^\d{5}$', postcode)
    if p:
        return postcode
    else:
        postcodes[postcode].add(postcode)
            
def is_street_name(elem):
    "Return whether the element attribute is a street attribute."
    return (elem.attrib['k'] == "addr:street")

def is_postcode(elem):
    "Return whether the element attribute is a postcode attribute."
    return (elem.attrib['k'] == "addr:postcode")
        
def audit(osmfile):
        osm_file = open(osmfile, "r")
        street_types = defaultdict(set)
        postcodes = defaultdict(set)
        for event, elem in ET.iterparse(osm_file, events=("start",)):
            if elem.tag == "node" or elem.tag == "way":
                for tag in elem.iter("tag"):
                    if is_street_name(tag):
                        audit_street_type(street_types, tag.attrib['v'])
                    elif is_postcode(tag):
                        audit_postcode(postcodes, tag.attrib['v'])
        osm_file.close()
        return street_types, postcodes
    
street_type_dict, postcode_dict = audit(SAMPLE_FILE)

print "\n\n Street types \n"
for k, v in street_type_dict.items():
    print k
    pprint(v)
    
print "\n\n Postcodes \n"
for k, v in postcode_dict.items():
    print k
    pprint(v)
    
## Cleaning functions
street_mapping = {
    'Rd': 'Road',
    'Rd.': 'Road',
    'Ave.': 'Avenue',
    'Ave': 'Avenue',
    'Pl': 'Place',
    'Dr': 'Drive',
    'Cir': 'Circle',
    'Blvd': 'Boulevard',
    'St': 'Street',
    'Ct': 'Court'
    }

def update_street_name(name, mapping): # from case study
    ''' str, dict -> str
    
    print update_street_name('West 4th St.', {'St.':'Street'})
    West 4th Street
    
    print update_street_name('North 57th Ct', street_mapping)
    North 57th Court
    '''
    name = name.strip().split()
    if name == []:
        return ""
    if name[-1] in mapping:
        name[-1] = mapping[name[-1]]
    output = " ".join(name)
    return output

def update_postcode(postcode):
    """ str -> str
    Search for a full postcode in a string and return the first 5 digits.
    
    Adapted from: https://stackoverflow.com/questions/7425860/regular-expression-get-us-zip-code
    
    print update_postcode('80113-1525')
    80113
    
    print update_postcode('Golden, CO 80401')
    80401
    
    print update_postcode('CO 80223')
    80223
    """
    match = re.search('(\d{5})([- ])?(\d{4})?', postcode)
    if match: 
        return match.groups()[0]



 Street types 

Colfax
set(['East Colfax'])
Bypass
set(['Wadsworth Bypass'])
Elm
set(['East Elm'])
Ln
set(['Lee Ln'])
West
set(['Park Avenue West', 'South Carr Avenue West'])
Main
set(['Main'])
314
set(['County Road 314'])
Rd
set(['E Arapahoe Rd', 'Pine Valley Rd', 'S Parker Rd', 'S. Golden Rd'])
Appia
set(['Via Appia'])
Way
set(['Aberdeen Way',
     'Airport Way',
     'Castleton Way',
     'Del Corso Way',
     'Durham Way',
     'East 60th Way',
     'Golden Eagle Way',
     'Iliad Way',
     'Kincross Way',
     'Landmark Way',
     'Lioness Way',
     'Mariposa Way',
     'Meredith Way',
     'Nyland Way',
     'Pear Lake Way',
     'Pine Bluffs Way',
     'Pipit Lake Way',
     'Progress Way',
     'Rampart Way',
     'Reed Way',
     'S Cornerstar Way',
     'S Ellipse Way',
     'S Monaco Way',
     'S Oneida Way',
     'South Akron Way',
     'South Alton Way',
     'South Columbine Way',
     'South Fultondale Way',
     'South Gibraltar Way',
     'South Grand Baker Way',


On a future pass, I'd like to standardize names like "S Monaco Way" or "S. Monaco Way" to "South Monaco Way". Similarly, some streets have the direction on the end like "Arapahoe Road E" that should read "Arapahoe Road East". I didn't do that here because of alphabetical streets like "A Street" or "E Streets" that might be incorrectly changed to "East Street".

Create CSVs.

In [5]:
# %load schema.py
# Code from Udacity

schema = {
    'node': {
        'type': 'dict',
        'schema': {
            'id': {'required': True, 'type': 'integer', 'coerce': int},
            'lat': {'required': True, 'type': 'float', 'coerce': float},
            'lon': {'required': True, 'type': 'float', 'coerce': float},
            'user': {'required': True, 'type': 'string'},
            'uid': {'required': True, 'type': 'integer', 'coerce': int},
            'version': {'required': True, 'type': 'string'},
            'changeset': {'required': True, 'type': 'integer', 'coerce': int},
            'timestamp': {'required': True, 'type': 'string'}
        }
    },
    'node_tags': {
        'type': 'list',
        'schema': {
            'type': 'dict',
            'schema': {
                'id': {'required': True, 'type': 'integer', 'coerce': int},
                'key': {'required': True, 'type': 'string'},
                'value': {'required': True, 'type': 'string'},
                'type': {'required': True, 'type': 'string'}
            }
        }
    },
    'way': {
        'type': 'dict',
        'schema': {
            'id': {'required': True, 'type': 'integer', 'coerce': int},
            'user': {'required': True, 'type': 'string'},
            'uid': {'required': True, 'type': 'integer', 'coerce': int},
            'version': {'required': True, 'type': 'string'},
            'changeset': {'required': True, 'type': 'integer', 'coerce': int},
            'timestamp': {'required': True, 'type': 'string'}
        }
    },
    'way_nodes': {
        'type': 'list',
        'schema': {
            'type': 'dict',
            'schema': {
                'id': {'required': True, 'type': 'integer', 'coerce': int},
                'node_id': {'required': True, 'type': 'integer', 'coerce': int},
                'position': {'required': True, 'type': 'integer', 'coerce': int}
            }
        }
    },
    'way_tags': {
        'type': 'list',
        'schema': {
            'type': 'dict',
            'schema': {
                'id': {'required': True, 'type': 'integer', 'coerce': int},
                'key': {'required': True, 'type': 'string'},
                'value': {'required': True, 'type': 'string'},
                'type': {'required': True, 'type': 'string'}
            }
        }
    }
}

In [7]:
import re
import codecs
import cerberus
import csv
import schema

NODES_PATH = "nodes.csv"
NODE_TAGS_PATH = "nodes_tags.csv"
WAYS_PATH = "ways.csv"
WAY_NODES_PATH = "ways_nodes.csv"
WAY_TAGS_PATH = "ways_tags.csv"

LOWER_COLON = re.compile(r'^([a-z]|_)+:([a-z]|_)+')
PROBLEMCHARS = re.compile(r'[=\+/&<>;\'"\?%#$@\,\. \t\r\n]')

SCHEMA = schema.schema

# Make sure the fields order in the csvs matches the column order in the sql table schema
NODE_FIELDS = ['id', 'lat', 'lon', 'user', 'uid', 'version', 'changeset', 'timestamp']
NODE_TAGS_FIELDS = ['id', 'key', 'value', 'type']
WAY_FIELDS = ['id', 'user', 'uid', 'version', 'changeset', 'timestamp']
WAY_TAGS_FIELDS = ['id', 'key', 'value', 'type']
WAY_NODES_FIELDS = ['id', 'node_id', 'position']


def shape_element(element, node_attr_fields=NODE_FIELDS, way_attr_fields=WAY_FIELDS,
                  problem_chars=PROBLEMCHARS, default_tag_type='regular'):
    """Clean and shape node or way XML element to Python dict"""

    node_attribs = {}
    way_attribs = {}
    way_nodes = []
    tags = []  # Handle secondary tags the same way for both node and way elements
    
    if element.tag == 'node':
        for node_attribute in NODE_FIELDS:
            node_attribs[node_attribute] = element.attrib[node_attribute]
        for child in element.iter():
            if child.tag == 'tag':
                if PROBLEMCHARS.match(child.attrib['k']) is not None:
                    pass
                else:
                    if create_tag_dict(element, child):
                        tags.append(create_tag_dict(element, child))
                    else:
                        continue
        return {'node': node_attribs, 'node_tags': tags}
    elif element.tag == 'way':
        pos = 0
        for way_attribute in WAY_FIELDS:
            way_attribs[way_attribute] = element.attrib[way_attribute]
        for child in element.iter():
            if child.tag == 'tag':
                if PROBLEMCHARS.match(child.attrib['k']) is not None:
                    pass
                else: 
                    if create_tag_dict(element, child):
                        tags.append(create_tag_dict(element, child))
                    else:
                        continue
            elif child.tag == 'nd':
                nd = {}
                nd['id'] = way_attribs['id']
                nd['node_id'] = child.attrib['ref']
                nd['position'] = pos
                pos += 1
                way_nodes.append(nd)
        return {'way': way_attribs, 'way_nodes': way_nodes, 'way_tags': tags}


# ================================================== #
#               Helper Functions                     #
# ================================================== #

def get_element(osm_file, tags=('node', 'way', 'relation')):
    """Yield element if it is the right type of tag"""

    context = ET.iterparse(osm_file, events=('start', 'end'))
    _, root = next(context)
    for event, elem in context:
        if event == 'end' and elem.tag in tags:
            yield elem
            root.clear()

def create_tag_dict(element, child):
    ''' str, str -> dict
    Given the element of an XML doc and its child, return a dict of child element attribute values.
    '''
    d = {}
    d['id'] = element.attrib['id']
    d['key'] = child.attrib['k']
    if ':' in child.attrib['k']:
        index = child.attrib['k'].index(":") + 1
        d["key"] = child.attrib["k"][index:]
        d["type"] = child.attrib["k"][:index-1]
        if is_street_name(child):
            d['value'] = update_street_name(child.attrib['v'], street_mapping)
        elif is_postcode(child):
            d['value'] = update_postcode(child.attrib['v'])
        else:
            d['value'] = child.attrib['v']
    else:
        d['key'] = child.attrib['k']
        d['type'] = 'regular'
        d['value'] = child.attrib['v']
    return d
               
def validate_element(element, validator, schema=SCHEMA):
    """Raise ValidationError if element does not match schema"""
    if validator.validate(element, schema) is not True:
        field, errors = next(validator.errors.iteritems())
        message_string = "\nElement of type '{0}' has the following errors:\n{1}"
        error_strings = (
            "{0}: {1}".format(k, v if isinstance(v, str) else ", ".join(v))
            for k, v in errors.iteritems()
        )
        raise cerberus.ValidationError(
            message_string.format(field, "\n".join(error_strings))
        )

class UnicodeDictWriter(csv.DictWriter, object):
    """Extend csv.DictWriter to handle Unicode input"""

    def writerow(self, row):
        super(UnicodeDictWriter, self).writerow({
            k: (v.encode('utf-8') if isinstance(v, unicode) else v) for k, v in row.iteritems()
        })

    def writerows(self, rows):
        for row in rows:
            self.writerow(row)


# ================================================== #
#               Main Function                        #
# ================================================== #
def process_map(file_in, validate):
    """Iteratively process each XML element and write to csv(s)"""

    with codecs.open(NODES_PATH, 'w') as nodes_file, \
         codecs.open(NODE_TAGS_PATH, 'w') as nodes_tags_file, \
         codecs.open(WAYS_PATH, 'w') as ways_file, \
         codecs.open(WAY_NODES_PATH, 'w') as way_nodes_file, \
         codecs.open(WAY_TAGS_PATH, 'w') as way_tags_file:

        nodes_writer = UnicodeDictWriter(nodes_file, NODE_FIELDS)
        node_tags_writer = UnicodeDictWriter(nodes_tags_file, NODE_TAGS_FIELDS)
        ways_writer = UnicodeDictWriter(ways_file, WAY_FIELDS)
        way_nodes_writer = UnicodeDictWriter(way_nodes_file, WAY_NODES_FIELDS)
        way_tags_writer = UnicodeDictWriter(way_tags_file, WAY_TAGS_FIELDS)

        nodes_writer.writeheader()
        node_tags_writer.writeheader()
        ways_writer.writeheader()
        way_nodes_writer.writeheader()
        way_tags_writer.writeheader()

        validator = cerberus.Validator()

        for element in get_element(file_in, tags=('node', 'way')):
            el = shape_element(element)
            if el:
                if validate is True:
                    validate_element(el, validator)

                if element.tag == 'node':
                    nodes_writer.writerow(el['node'])
                    node_tags_writer.writerows(el['node_tags'])
                elif element.tag == 'way':
                    ways_writer.writerow(el['way'])
                    way_nodes_writer.writerows(el['way_nodes'])
                    way_tags_writer.writerows(el['way_tags'])


if __name__ == '__main__':
    """ Note: Validation is ~ 10X slower. Run on SAMPLE_FILE. If no error, then set validate=False on OSM_FILE.
    """
    process_map(OSM_FILE, validate=False)

# SQL Analysis

In [8]:
import sqlite3
import csv
from pprint import pprint

## Create Tables

Create nodes_tags table.

In [10]:
sqlite_file = 'mydb.db'
connection = sqlite3.connect(sqlite_file)

cursor = connection.cursor()

# Drop the table if it already exists
cursor.execute("""
    DROP TABLE IF EXISTS nodes_tags
    """)
connection.commit()

cursor.execute("""
    CREATE TABLE nodes_tags(id INTEGER, key TEXT, value TEXT, type TEXT)
    """)
connection.commit()

# Read in data
with open('nodes_tags.csv', 'rb') as f:
    g = csv.DictReader(f)
    to_db = [(i['id'], i['key'], i['value'].decode('utf-8'), i['type']) for i in g]

# Insert data
cursor.executemany("""
    INSERT INTO nodes_tags(id, key, value, type) VALUES (?, ?, ?, ?);""",
    to_db)
connection.commit()


cursor.execute('SELECT * FROM nodes_tags LIMIT 10;')

all_rows = cursor.fetchall()
print('1):')
pprint(all_rows)

connection.close()


1):
[(25689368, u'highway', u'turning_circle', u'regular'),
 (25698637, u'name', u'King Soopers', u'regular'),
 (25698637, u'amenity', u'fuel', u'regular'),
 (25757229, u'name', u'King Soopers', u'regular'),
 (25757229, u'shop', u'supermarket', u'regular'),
 (25757782, u'highway', u'traffic_signals', u'regular'),
 (25758250, u'highway', u'crossing', u'regular'),
 (25765902, u'highway', u'traffic_signals', u'regular'),
 (25765915, u'highway', u'crossing', u'regular'),
 (25765941, u'amenity', u'restaurant', u'regular')]


In [12]:
# Drop the table if it already exists
cursor.execute("""
    DROP TABLE IF EXISTS ways_nodes
    """)
connection.commit()

cursor.execute("""
    CREATE TABLE ways_nodes(id INTEGER, node_id INTEGER, position INTEGER)
    """)
connection.commit()

# Read in data
with open('ways_nodes.csv', 'rb') as f:
    g = csv.DictReader(f)
    to_db = [(i['id'], i['node_id'], i['position']) for i in g]

# Insert data
cursor.executemany("""
    INSERT INTO ways_nodes(id, node_id, position) VALUES (?, ?, ?);""",
    to_db)
connection.commit()

ProgrammingError: Cannot operate on a closed database.