OpenStreetMap Data Case Study
11/16/2016
Scott Tse

I chose to investigate the OSM data for the city of Portland, OR where I feel very fortunate to live.
https://www.openstreetmap.org/search?query=portland%2C%20or#map=12/45.5234/-122.6762

I extracted the compressed OSM data in XML format (84MB) from MAPZEN, following this link: 
https://mapzen.com/data/metro-extracts/metro/portland_oregon/



In [17]:
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import xml.etree.cElementTree as ET  # Used cElementTree to speed things up
                                    
OSM_FILE = "portland_oregon.osm"  # unzipped OSM file from MAPZEN
SAMPLE_FILE = "sample.osm" # name of reduced sample file

k = 500 # Parameter: take every k-th top level element
      # initially used k = 100 to reduce size of dataset and parsing time

def get_element(osm_file, tags=('node', 'way', 'relation')):
    """Yield element if it is the right type of tag

    Reference:
    http://stackoverflow.com/questions/3095434/inserting-newlines-in-xml-file-generated-via-xml-etree-elementtree-in-python
    """
    context = iter(ET.iterparse(osm_file, events=('start', 'end')))
    _, root = next(context)
    for event, elem in context:
        if event == 'end' and elem.tag in tags:
            yield elem
            root.clear()


with open(SAMPLE_FILE, 'wb') as output:
    output.write('<?xml version="1.0" encoding="UTF-8"?>\n')
    output.write('<osm>\n  ')

    # Write every kth top level element
    for i, element in enumerate(get_element(OSM_FILE)):
        if i % k == 0:
            output.write(ET.tostring(element, encoding='utf-8'))

    output.write('</osm>')


In [2]:
# use this function to take a look at the first n rows of the SAMPLE FILE, verify that code above ran properly

def file_look(file, rows):
    with open(file, "r") as f:
        for i, row in enumerate(f):
            print row
            if i >= rows:
                break
                       

In [3]:
file_look(SAMPLE_FILE, 50) # look at first 50 rows of sample file

<?xml version="1.0" encoding="UTF-8"?>

<osm>

  <node changeset="7632877" id="27195852" lat="45.5408932" lon="-122.8675556" timestamp="2011-03-21T23:25:58Z" uid="393906" user="Grant Humphries" version="11">

		<tag k="highway" v="traffic_signals" />

	</node>

	<node changeset="8895716" id="27266267" lat="45.5360503" lon="-122.8885243" timestamp="2011-08-01T21:34:27Z" uid="393906" user="Grant Humphries" version="7">

		<tag k="highway" v="traffic_signals" />

	</node>

	<node changeset="9726985" id="27295030" lat="45.5418012" lon="-122.8683008" timestamp="2011-11-03T00:11:51Z" uid="362111" user="Mele Sax-Barnett" version="7">

		<tag k="highway" v="traffic_signals" />

	</node>

	<node changeset="7632877" id="27526582" lat="45.5174292" lon="-122.8026621" timestamp="2011-03-21T23:25:57Z" uid="393906" user="Grant Humphries" version="16" />

	<node changeset="9134483" id="27545312" lat="45.4932047" lon="-122.8324784" timestamp="2011-08-27T02:51:56Z" uid="393906" user="Grant Humphries" ve

In [4]:
"""
Since the OSM data in XML is structured under various "tags", we first investigate which tags exist
as well as the quantity of each. The function below counts types of tags, storing the info in
a dictionay named tags. The get_element function (utilized throughout my data munging code) is used to reduce the memory
footprint using root.clear()
"""

import pprint

def count_tags(filename):
    tags = {}
    
    for element in get_element(filename):
        if element.tag not in tags:
            tags[element.tag] = 1  
                       
        elif element.tag in tags:
            tags[element.tag] += 1
                    
    return tags 
    
def test():

    tags = count_tags(SAMPLE_FILE)
    pprint.pprint(tags)
    

if __name__ == "__main__":
    test()

{'node': 260076, 'relation': 248, 'way': 33432}


In [6]:
import re
"""
The functions below explore the OSM data further. We check the
"k" value for each "<tag>" and see if there are any potential problems using regex.
We would like to change the data model and expand the "addr:street" type of keys to a dictionary like this:
{"address": {"street": "Some value"}} 
So, we have to see if we have such tags, and if we have any tags with
problematic characters, which we will compile in a dictionary as follows:

  "lower", for tags that contain only lowercase letters and are valid,
  "lower_colon", for otherwise valid tags with a colon in their names,
  "problemchars", for tags with problematic characters, and
  "other", for other tags that do not fall into the other three categories.
"""

# regex 
lower = re.compile(r'^([a-z]|_)*$')
lower_colon = re.compile(r'^([a-z]|_)*:([a-z]|_)*$')
problemchars = re.compile(r'[=\+/&<>;\'"\?%#$@\,\. \t\r\n]')


def key_type(element, keys):
    
    if element.tag == "tag":       
        if lower.search(element.attrib['k']):
            keys["lower"] += 1
        elif lower_colon.search(element.attrib['k']):
            keys["lower_colon"] += 1
        elif problemchars.search(element.attrib['k']):
            keys["problemchars"] += 1
        else:
            keys["other"] += 1
    return keys



def process_map(filename):
    
    keys = {"lower": 0, "lower_colon": 0, "problemchars": 0, "other": 0}
    for element in get_element(filename):
        keys = key_type(element, keys)
    
    return keys



def test():

    keys = process_map(SAMPLE_FILE)
    pprint.pprint(keys)

if __name__ == "__main__":
    test()

{'lower': 0, 'lower_colon': 0, 'other': 0, 'problemchars': 0}


In [7]:
import pprint
""" 
The function process_map returns a set of unique user IDs ("uid") so we can see how many unique users there are.
"""

def get_user(element):
    if 'uid' in element.attrib:
        user = element.attrib['uid']
        return user
    else:
        return None


def process_map(filename):
    users = set()
    for element in get_element(filename):
        if get_user(element):
            users.add(get_user(element))
    return users


def test():

    users = process_map(SAMPLE_FILE)
    print "Number of unique users: " + str(len(users))

if __name__ == "__main__":
    test()

Number of unique users: 567


In [8]:
from collections import defaultdict

osmfile = "sample.osm"
street_type_re = re.compile(r'\b\S+\.?$', re.IGNORECASE)

# 10/31 added Terrace, Circle 
expected = ["Street", "Avenue", "Boulevard", "Drive", "Court", "Place", "Square", "Lane", "Road", 
            "Trail", "Parkway", "Commons", "Terrace", "Circle", "Circus", "Crescent", "Crest", "East", "End",
            "Highway", "Loop", "North", "Run", "Terrace", "Way"]

# Added Ave. and Blvd
mapping = { "St": "Street",
            "St.": "Street",
            "Rd.": "Road",
            "Rd": "Road", 
            "Ave": "Avenue",
            "Ave.": "Avenue",
            "Blvd": "Boulevard",
            "Blvd.": "Boulevard",
            "Dr": "Drive",
            "Dr.": "Drive",
            "Hwy": "Highway"
            }

# this function uses regex to add "unexpected" street types to the street_types set
def audit_street_type(street_types, street_name):
    m = street_type_re.search(street_name)
    if m:
        street_type = m.group() # grabs output of regex (last part of string) and assigns to street_type
        if street_type not in expected:
            street_types[street_type].add(street_name)

# this function returns True if the k element is "addr:street"
def is_street_name(elem):
    return (elem.attrib['k'] == "addr:street")

# this is the main audit function 
# takes osmfile and opens it
# creates street_types dictionary of sets

def audit(osmfile):
    osm_file = open(osmfile, "r")
    street_types = defaultdict(set)
    # for all node and way tags, iterate through the child tags and if the tag is "tag"
    # check if it's a stree name: if yes, then call audit_street_type function
    #for event, elem in ET.iterparse(osm_file, events=("start",)):
    for i, elem in enumerate(get_element(osmfile)):    # inserted this to preserve memory

        if elem.tag == "node" or elem.tag == "way":
            for tag in elem.iter("tag"):
                if is_street_name(tag):
                    audit_street_type(street_types, tag.attrib['v'])
    osm_file.close()
    return street_types
    

def update_name(name, mapping):

    address_list = name.split()
    #print address_list[-1]
    if address_list[-1] in mapping:
        address_list[-1] = mapping[address_list[-1]]
        name = " ".join(str(x) for x in address_list)
    return name   


def test():
    st_types = audit(osmfile)
    #assert len(st_types) == 3
    pprint.pprint(dict(st_types))

    for st_type, ways in st_types.iteritems():
        for name in ways:
            better_name = update_name(name, mapping)    
            print name, "=>", better_name
            
if __name__ == "__main__":
    test()

{'155th': set(['Southwest 155th']),
 '211': set(['Highway 211', 'Southeast Highway 211']),
 '212': set(['Southeast Highway 212']),
 '213': set(['Highway 213', 'South Highway 213']),
 '224': set(['Southeast Highway 224']),
 '47': set(['Southwest Old Highway 47']),
 '99': set(['Northeast Highway 99']),
 '99E': set(['Highway 99E']),
 '99W': set(['Northeast State Highway 99W', 'Southwest Old Highway 99W']),
 'Ave.': set(['NW 19th Ave.', 'Northeast 122ND Ave.']),
 'Blvd.': set(['21200 Northwest Rock Creek Blvd.']),
 'Broadway': set(['Northeast Broadway', 'Southwest Broadway']),
 'Byway': set(['Southwest Kings Byway']),
 'Cervantes': set(['Cervantes']),
 'Churchill': set(['Southwest Churchill']),
 'Curve': set(['Horseshoe Curve']),
 'D': set(['Northeast 82nd Avenue #D']),
 'Downs': set(['Churchill Downs']),
 'Dr': set(['SW Griffith Dr']),
 'Fieldcrest': set(['Southeast Fieldcrest']),
 'Jamaica': set(['Southwest Jamaica']),
 'Miami': set(['Southwest Miami']),
 'Northbound': set(['I5 Freeway N

In [9]:
import codecs
from time import time
import json

# I used functions below to look at all the possible attributes and then select various ones to investigate for issues

ATT_FILE = "sample_attributes"

attrib_set = set() # set of all unique attributes in dataset
attrib_list = set() # for a given attribute, set of all unique values


# Call this function to collect the 'k' attributes as a set
def child_k_info(attrib_set, element):
    for child in element:
        if child.tag == 'tag':
            attrib_set.add(child.attrib['k'])
    return attrib_set         

# Call this function to investigate a particular 'k' attribute 
def att_info(attrib_dict, element, k_attribute):
    for child in element:
        if child.tag == 'tag':
            if child.attrib['k'] == k_attribute:
                attrib_list.add(child.attrib['v'])
    return attrib_list     

k_attribute = 'landuse'
    
# Call the auditing functions
t0 = time()
for i, element in enumerate(get_element(SAMPLE_FILE)):
    attrib_set = child_k_info(attrib_set, element)
    attrib_list = att_info(attrib_set, element, k_attribute)
    element.clear()
print 'Time Taken To Audit Attributes: {} seconds'.format(time()-t0)
    


Time Taken To Audit Attributes: 3.7650001049 seconds


In [None]:
# prints all the attributes
#pprint(attrib_set)

In [10]:
# prin attrib_list generated for particular 'k' attribute in att_info function above
pprint(attrib_list)

set(['basin',
     'brownfield',
     'cemetery',
     'commercial',
     'construction',
     'farm',
     'farmland',
     'farmyard',
     'forest',
     'garages',
     'government',
     'grass',
     'greenhouse_horticulture',
     'industrial',
     'landfill',
     'meadow',
     'military',
     'orchard',
     'plant_nursery',
     'quarry',
     'railway',
     'reservoir',
     'residential',
     'retail',
     'village_green',
     'vineyard'])


Next step is to do specific audit of each of the k attributes and then correct them in the data.

In [11]:
#create mappings for k attribute 'landuse'

landuse_mapping = ["farmland", "farmyard"]

def is_landuse(elem):
    return (elem.attrib['k'] == "landuse")

def update_landuse(name, landuse_mapping):
    if name in landuse_mapping:
        name = "farm"
    return name   


In [12]:
#create mappings for k attribute 'name' Starbucks

starbucks_mapping = ["Starbucks"]

def is_name(elem):
    return (elem.attrib['k'] == "name")

def update_name_starbucks(name, starbucks_mapping):
    if name in starbucks_mapping:
        name = "Starbucks Coffee"
    return name   

In [13]:
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import csv
import cerberus
import schema

OSM_PATH = "sample.osm"

NODES_PATH = "nodes.csv"
NODE_TAGS_PATH = "nodes_tags.csv"
WAYS_PATH = "ways.csv"
WAY_NODES_PATH = "ways_nodes.csv"
WAY_TAGS_PATH = "ways_tags.csv"

LOWER_COLON = re.compile(r'^([a-z]|_)+:([a-z]|_)+')
PROBLEMCHARS = re.compile(r'[=\+/&<>;\'"\?%#$@\,\. \t\r\n]')

SCHEMA = schema.schema

NODE_FIELDS = ['id', 'lat', 'lon', 'user', 'uid', 'version', 'changeset', 'timestamp']
NODE_TAGS_FIELDS = ['id', 'key', 'value', 'type']
WAY_FIELDS = ['id', 'user', 'uid', 'version', 'changeset', 'timestamp']
WAY_TAGS_FIELDS = ['id', 'key', 'value', 'type']
WAY_NODES_FIELDS = ['id', 'node_id', 'position']


def shape_element(element, node_attr_fields=NODE_FIELDS, node_tags = NODE_TAGS_FIELDS, way_attr_fields=WAY_FIELDS,
                  problem_chars=PROBLEMCHARS, way_n_fields = WAY_NODES_FIELDS, default_tag_type='regular'):
    
    """Clean and shape node or way XML element to Python dict"""
    node_attribs = {}
    way_attribs = {}
    way_nodes = []
    tags = []  # Handle secondary tags the same way for both node and way elements

    if element.tag == 'node':
        for tag in node_attr_fields:
            node_attribs[tag] = element.get(tag)

        for child in element.iter("tag"):
            # update street names if neccesary
            if is_street_name(child):
                child.attrib['v'] = update_name(child.attrib['v'], mapping)
                
            # update landuse if necessary
            elif is_landuse(child):
                child.attrib['v'] = update_landuse(child.attrib['v'], landuse_mapping)
            
            # update name if Starbucks to Starbucks Coffee
            elif is_name(child):
                child.attrib['v'] = update_name_starbucks(child.attrib['v'], starbucks_mapping)
            
            
            sec_tag = {el:0 for el in node_tags}
            sec_tag['id'] = node_attribs['id']
            
            sec_tag['value'] = child.attrib['v']
            
            if PROBLEMCHARS.search(child.attrib["k"]):
                continue
                
            elif ":" in child.attrib['k']:
                sec_tag['type'] = child.attrib['k'][:child.attrib['k'].find(":")]
                sec_tag['key'] = child.attrib['k'][child.attrib['k'].find(":")+1:]
                
            else:
                sec_tag['type'] = 'regular'
                sec_tag['key'] = child.attrib['k']
                
            tags.append(sec_tag)    
        
        tag_return = {'node': node_attribs, 'node_tags': tags}
        #print tag_return
        return {'node': node_attribs, 'node_tags': tags}  
    
        #print node_attribs
        #print tags
        #print ""
    
    elif element.tag == 'way':
        way_attribs = {el:0 for el in way_attr_fields}
        for tag in way_attr_fields:
            way_attribs[tag] = element.attrib[tag]
        
        # handle way tags
        for child in element.iter("tag"):
            
            # update street names if neccesary
            if is_street_name(child):
                child.attrib['v'] = update_name(child.attrib['v'], mapping)   
                
            # update landuse if necessary
            elif is_landuse(child):
                child.attrib['v'] = update_landuse(child.attrib['v'], landuse_mapping)
             
            # update name if Starbucks to Starbucks Coffee
            elif is_name(child):
                child.attrib['v'] = update_name_starbucks(child.attrib['v'], starbucks_mapping)
            
            sec_tag = {el:0 for el in node_tags}
            sec_tag['id'] = way_attribs['id']
            
            sec_tag['value'] = child.attrib['v']
            
            if PROBLEMCHARS.search(child.attrib["k"]):
                continue
                
            elif ":" in child.attrib['k']:
                sec_tag['type'] = child.attrib['k'][:child.attrib['k'].find(":")]
                sec_tag['key'] = child.attrib['k'][child.attrib['k'].find(":")+1:]
                
            else:
                sec_tag['type'] = 'regular'
                sec_tag['key'] = child.attrib['k']
                
            tags.append(sec_tag)  
    

        # handle way nodes
        position = 0
        for child in element.iter("nd"):
            #way_node = {el:0 for el in way_n_fields}
            way_node = {}
            way_node['id'] = way_attribs['id']
            way_node['node_id'] = child.attrib['ref']
            way_node['position'] = position
            way_nodes.append(way_node)
            position += 1
            
        
        way_return = {'way': way_attribs, 'way_nodes': way_nodes, 'way_tags': tags}
        #print way_return
        return {'way': way_attribs, 'way_nodes': way_nodes, 'way_tags': tags}
        
    


# ================================================== #
#               Helper Functions                     #
# ================================================== #
def get_element(osm_file, tags=('node', 'way', 'relation')):
    """Yield element if it is the right type of tag"""

    context = ET.iterparse(osm_file, events=('start', 'end'))
    _, root = next(context)
    for event, elem in context:
        if event == 'end' and elem.tag in tags:
            yield elem
            root.clear()


def validate_element(element, validator, schema=SCHEMA):
    """Raise ValidationError if element does not match schema"""
    if validator.validate(element, schema) is not True:
        field, errors = next(validator.errors.iteritems())
        message_string = "\nElement of type '{0}' has the following errors:\n{1}"
        error_strings = (
            "{0}: {1}".format(k, v if isinstance(v, str) else ", ".join(v))
            for k, v in errors.iteritems()
        )
        raise cerberus.ValidationError(
            message_string.format(field, "\n".join(error_strings))
        )


class UnicodeDictWriter(csv.DictWriter, object):
    """Extend csv.DictWriter to handle Unicode input"""

    def writerow(self, row):
        super(UnicodeDictWriter, self).writerow({
            k: (v.encode('utf-8') if isinstance(v, unicode) else v) for k, v in row.iteritems()
        })

    def writerows(self, rows):
        for row in rows:
            self.writerow(row)


# ================================================== #
#               Main Function                        #
# ================================================== #
def process_map(file_in, validate):
    """Iteratively process each XML element and write to csv(s)"""

    with codecs.open(NODES_PATH, 'w') as nodes_file, \
         codecs.open(NODE_TAGS_PATH, 'w') as nodes_tags_file, \
         codecs.open(WAYS_PATH, 'w') as ways_file, \
         codecs.open(WAY_NODES_PATH, 'w') as way_nodes_file, \
         codecs.open(WAY_TAGS_PATH, 'w') as way_tags_file:

        nodes_writer = UnicodeDictWriter(nodes_file, NODE_FIELDS)
        node_tags_writer = UnicodeDictWriter(nodes_tags_file, NODE_TAGS_FIELDS)
        ways_writer = UnicodeDictWriter(ways_file, WAY_FIELDS)
        way_nodes_writer = UnicodeDictWriter(way_nodes_file, WAY_NODES_FIELDS)
        way_tags_writer = UnicodeDictWriter(way_tags_file, WAY_TAGS_FIELDS)

        nodes_writer.writeheader()
        node_tags_writer.writeheader()
        ways_writer.writeheader()
        way_nodes_writer.writeheader()
        way_tags_writer.writeheader()

        validator = cerberus.Validator()

        for element in get_element(file_in, tags=('node', 'way')):
            el = shape_element(element)
            if el:
                if validate is True:
                    validate_element(el, validator)

                if element.tag == 'node':
                    nodes_writer.writerow(el['node'])
                    node_tags_writer.writerows(el['node_tags'])
                elif element.tag == 'way':
                    ways_writer.writerow(el['way'])
                    way_nodes_writer.writerows(el['way_nodes'])
                    way_tags_writer.writerows(el['way_tags'])


if __name__ == '__main__':
    # Note: Validation is ~ 10X slower. For the project consider using a small
    # sample of the map when validating.
    process_map(OSM_PATH, validate=True)
