# P3: Wrangle OpenStreetMap Data of Austin, TX

## Part 2. Final code to extract data into csv files

After exploring and auditing the osm data in the previous notebook, the codes to extract data from the osm file is done here.

In [1]:
import xml.etree.cElementTree as ET
import csv
import codecs
import cerberus
import schema
import re

The filenames of the osm extracts were assigned to variables as in Part 1.

In [2]:
OSM_FILE = "austin_texas.osm"
SAMPLE_FILE = "sample.osm"

Variables are assigned to output csv files.

In [3]:
NODES_PATH = "nodes.csv"
NODE_TAGS_PATH = "nodes_tags.csv"
WAYS_PATH = "ways.csv"
WAY_NODES_PATH = "ways_nodes.csv"
WAY_TAGS_PATH = "ways_tags.csv"

The following regex are used to find string types for the k attributes of tag elements. Some of the values of the k attributes have a colon and some don't. 

In [4]:
LOWER_COLON = re.compile(r'^([a-z]|_)+:([a-z]|_)+')
PROBLEMCHARS = re.compile(r'[=\+/&<>;\'"\?%#$@\,\. \t\r\n]')

The schema for the output dictionaries will follow this structure.

In [5]:
SCHEMA = {
    'node': {
        'type': 'dict',
        'schema': {
            'id': {'required': True, 'type': 'integer', 'coerce': int},
            'lat': {'required': True, 'type': 'float', 'coerce': float},
            'lon': {'required': True, 'type': 'float', 'coerce': float},
            'user': {'required': True, 'type': 'string'},
            'uid': {'required': True, 'type': 'integer', 'coerce': int},
            'version': {'required': True, 'type': 'string'},
            'changeset': {'required': True, 'type': 'integer', 'coerce': int},
            'timestamp': {'required': True, 'type': 'string'}
        }
    },
    'node_tags': {
        'type': 'list',
        'schema': {
            'type': 'dict',
            'schema': {
                'id': {'required': True, 'type': 'integer', 'coerce': int},
                'key': {'required': True, 'type': 'string'},
                'value': {'required': True, 'type': 'string'},
                'type': {'required': True, 'type': 'string'}
            }
        }
    },
    'way': {
        'type': 'dict',
        'schema': {
            'id': {'required': True, 'type': 'integer', 'coerce': int},
            'user': {'required': True, 'type': 'string'},
            'uid': {'required': True, 'type': 'integer', 'coerce': int},
            'version': {'required': True, 'type': 'string'},
            'changeset': {'required': True, 'type': 'integer', 'coerce': int},
            'timestamp': {'required': True, 'type': 'string'}
        }
    },
    'way_nodes': {
        'type': 'list',
        'schema': {
            'type': 'dict',
            'schema': {
                'id': {'required': True, 'type': 'integer', 'coerce': int},
                'node_id': {'required': True, 'type': 'integer', 'coerce': int},
                'position': {'required': True, 'type': 'integer', 'coerce': int}
            }
        }
    },
    'way_tags': {
        'type': 'list',
        'schema': {
            'type': 'dict',
            'schema': {
                'id': {'required': True, 'type': 'integer', 'coerce': int},
                'key': {'required': True, 'type': 'string'},
                'value': {'required': True, 'type': 'string'},
                'type': {'required': True, 'type': 'string'}
            }
        }
    }
}

Lists of all the fields:

In [6]:
NODE_FIELDS = ['id', 'lat', 'lon', 'user', 'uid', 'version', 'changeset', 'timestamp']
NODE_TAGS_FIELDS = ['id', 'key', 'value', 'type']
WAY_FIELDS = ['id', 'user', 'uid', 'version', 'changeset', 'timestamp']
WAY_TAGS_FIELDS= ['id', 'key', 'value', 'type']
WAY_NODES_FIELDS = ['id', 'node_id', 'position']

As in part 1, the get_element function will make it easier to access the xml data, using the iterparse method which iteratively steps through each top level element in the xml. 

In [7]:
def get_element(osm_file, tags=('node', 'way', 'relation')):
    """Yield element if it is the right type of tag"""

    context = ET.iterparse(osm_file, events=('start', 'end'))
    _, root = next(context)
    for event, elem in context:
        if event == 'end' and elem.tag in tags:
            yield elem
            root.clear()

### Subfunctions for data cleaning

Deriving these functions were shown in Part 1.

#### update_street_name function

In [8]:
def is_street_name(elem):
    return (elem.attrib['k'] == "addr:street")

In [9]:
mapping_street = { "St": "Street",
            "St.": "Street",
            "st": "Street",
            "street": "Street",
            "Street,": "Street",
            "Ave": "Avenue",
            "Ave.": "Avenue",
            "Avene": "Avenue",
            "Blvd": "Boulevard",
            "Blvd.": "Boulevard",
            "Boulevard,": "Boulevard",
            "Blvd,": "Boulevard",
            "Blvd.,": "Boulevard",
            "Dr": "Drive",
            "Dr.": "Drive",
            "Ct": "Court",
            "Ct.": "Court",
            "court": "Court",
            "Cv": "Cove",
            "cove": "Cove",
            "Cc": "Cove",
            "Pl": "Place",
            "Pl.": "Place",
            "lane": "Lane",
            "Ln": "Lane",
            "Rd": "Road", 
            "Rd.": "Road",
            "R": "Road",
            "Trl": "Trail",
            "Tr": "Trail",
            "Pkwy": "Parkway",
            "Hwy": "Highway",
            "HWY": "Highway",
            "Hwy,": "Highway",
            "H": "Highway",
            "I35": "Interstate Highway 35",
            "IH35": "Interstate Highway 35",
            "IH35,": "Interstate Highway 35",
            "IH-35": "Interstate Highway 35",
            "I-35": "Interstate Highway 35",
            "IH": "Interstate Highway",
            "I": "Interstate",
            "35,": "35",
            "main": "Main",
            "N": "North",
            "N.": "North",
            "Ovlk": "Overlook",
            "pass": "Pass",
            "Ps": "Pass",
            "W": "West",
            "W.": "West",
            "E": "East",
            "E.": "East",
            "texas": "Texas",
            "TX": "Texas",
            "FM": "Farm-to-Market Road",
            "F.M.": "Farm-to-Market Road",
            "U.S.": "United States",
            "US": "United States",
            "RM": "Ranch-to-Market Road",
            "RM1431": "Ranch-to-Market Road 1431",
            "RR": "Ranch Road",
            "S": "South",
            "S.": "South",
            "south": "South",
            "Bldg": "Building",
            "Bldg.": "Building",
            "Bld": "Building",
            "Ste": "Suite",
            "Ste,": "Suite",
            "STE": "Suite",
            "Ste.": "Suite",
            "suite": "Suite",
            "C": "Country",
            "church": "Church",
            "brigadoon": "Brigadoon",
            "Drive/Rd": "Drive/Road",
            "Mo-Pac": "MoPac", 
            "Avenue,Ste": "Avenue Suite",
            "suite#L131": "Suite L131"}

##### Appends "Highway" if is it needed:

In [10]:
def append_highway(name):
    newparts = []
    parts = name.split()
    for item in parts:
        if (item == "Interstate" or item == "States") and "Highway" not in parts:
            newparts.append(item)
            newparts.append("Highway")
        else:
            newparts.append(item)
    name = ' '.join(newparts)
    return name

##### Updates the FM and RM roads:

In [11]:
def update_farm_ranch_to_market(name):
    parts = name.split()
    if "Farm-to-Market" in parts or "Ranch-to-Market" in parts:
        if "Road" in parts:
            return name
        else:
            try:
                parts.insert(parts.index("Farm-to-Market")+1, "Road")
                name = " ".join(parts)
            except ValueError:
                parts.insert(parts.index("Ranch-to-Market")+1, "Road")  
                name = " ".join(parts)
    elif "Farm" in parts and "to" in parts and "Market" in parts:
        newname = []
        for i in range(parts.index("Farm")):
            newname.append(parts[parts.index(i)])
        newname.append("Farm-to-Market")
        newname += parts[parts.index("Market")+1:]

        if "Road" in parts:
            name = " ".join(newname)
        else:
            newname.insert(newname.index("Farm-to-Market")+1,"Road")
            name = " ".join(newname)
    
    elif "Ranch" in parts and "to" in parts and "Market" in parts:
        newname = []
        for i in range(parts.index("Ranch")):
            newname.append(parts[parts.index(i)])
        newname.append("Ranch-to-Market")
        newname += parts[parts.index("Market")+1:]

        if "Road" in parts:
            name = " ".join(newname)
        else:
            newname.insert(newname.index("Ranch-to-Market")+1,"Road")
            name = " ".join(newname)
        
    return name

##### Updates the name based on mapping dictionary and also fixes other items, such as St., N, C, I , H:

In [12]:
def update_name(name, mapping):
    parts = name.split()
    newparts = []
    for item in parts:
        if item == "St" and "Rue" in parts:
            newparts.append("Saint")
        elif item == "N" or item == "C" or item == "I" or item == "H": 
            try:
                if newparts[0] == "Avenue":
                    newparts.append(item)
                else:
                    newparts.append(mapping[item])
            except IndexError:
                newparts.append(mapping[item])
        else:
            if item in mapping.keys():
                newparts.append(mapping[item])
            else:
                newparts.append(item)
    name = ' '.join(newparts)
    name = append_highway(name)
    name = update_farm_ranch_to_market(name)
    return name

#### update_phone function

In [13]:
def is_phone(element):
    return (element.attrib['k'] == "phone" or element.attrib['k'] == "contact:phone")

In [14]:
def update_phone(number):
    phone_re = re.compile(r'^\d\d\d\-\d\d\d\-\d\d\d\d$')
    if phone_re.search(number) == None:
        phno = []
        number = list(number.lstrip("+1"))
        for char in number:
            try:
                if int(char) in [x for x in range(10)]:
                    if len(phno) == 10:
                        continue
                    phno.append(char)
            except ValueError:
                continue
        number = "".join(phno)
        number = number[:3] + "-" + number[3:6] + "-" + number[6:]
                    
    return number

#### update_postcode function

In [15]:
def is_postcode(element):
    return (element.attrib['k'] == "addr:postcode" or element.attrib['k'] == "postal_code")

In [16]:
def update_postcode(postcode):
    try:
        postcode = re.compile(r'7\d\d\d\d').search(postcode).group()
    except AttributeError:
        postcode = 'None'
    return postcode

#### update_city function

In [17]:
def is_city(element):
    return element.attrib['k'] == "addr:city"

In [18]:
expectedcities = ['Austin', 'Buda', 'Round Rock', 'Leander', 'Lago Vista', 'Kyle', 'Pflugerville',
                  'Cedar Park', 'Manchaca', 'Del Valle', 'Dripping Springs', 'Bee Cave', 'Bastrop', 
                  'Cedar Creek', 'Creedmoor', 'Driftwood', 'Elgin', 'Georgetown', 'Hutto', 
                  'Jonestown', 'Lakeway', 'Lost Pines', 'Manor', 'Maxwell', 'San Marcos', 
                  'Smithville', 'Spicewood', 'Sunset Valley', 'Taylor', 'Webberville', 
                  'West Lake Hills', 'Wimberley']

In [19]:
mapping_city = {
    "austin": "Austin",
    "Austin, TX": "Austin",
    "Cedar Park, TX": "Cedar Park",
    "Round Rock, TX": "Round Rock",
    "Dripping Springs, TX": "Dripping Springs",
    "Westlake Hills, TX": "West Lake Hills",
    "kyle": "Kyle",
    "Austin;TX;USA": "Austin",
    "Barton Creek": "Austin",
    "Austin, Tx": "Austin",
    "Taylor, TX": "Taylor",
    "San Gabriel Village Boulevard": "Georgetown",
    "Spicewood, TX": "Spicewood",
    "Ste 128, Austin": "Austin",
    "Pflugerville, TX": "Pflugerville",
    "Manchaca,": "Manchacha",
    "Elgin, TX": "Elgin",
    "Kyle, TX": "Kyle",
    "Dripping Springs, Tx": "Dripping Springs",
    "Bastrop, TX": "Bastrop",
    "Georgetown, TX": "Georgetown",
    "Dripping Springs TX": "Dripping Springs",
    "Leander, TX": "Leander",
    "N Austin": "Austin"
}

In [20]:
def update_city(city, expectedcities, mapping_city):
    if city not in expectedcities:
        if city in mapping_city.keys():
            city = mapping_city[city]
        else:
            city = "None"
    return city

#### Clean function: combination of all update functions

In [21]:
def clean(value, tag, mapping_street, expectedcities, mapping_city):
    if is_street_name(tag):
        value = update_name(value, mapping_street)
    elif is_phone(tag):
        value = update_phone(value)
    elif is_postcode(tag):
        value = update_postcode(value)
    elif is_city(tag):
        value = update_city(value, expectedcities, mapping_city)
    return value

### Functions for extracting data into csv files

In [22]:
def shape_element(element, node_attr_fields=NODE_FIELDS, way_attr_fields=WAY_FIELDS,
                  problem_chars=PROBLEMCHARS, lower_colon=LOWER_COLON, default_tag_type='regular'):
    """Clean and shape node or way XML element to Python dict"""

    node_attribs = {}
    way_attribs = {}
    way_nodes = []
    tags = []  # Handle secondary tags the same way for both node and way elements

    if element.tag == 'node':
        for field in node_attr_fields:
            
            # Makes sure that the empty fields are field with appropriate values:
            if element.attrib[field] == '':
                node_attribs[field] = '999999'
            else:
                node_attribs[field] = element.attrib[field]
    
        for tag in element.iter("tag"):
            nodetags = {}
            if element.attrib['id'] == '':
                nodetags['id'] = '999999'
            else:
                nodetags['id'] = element.attrib['id']
                
            # Adding the nodetags['type'] and nodetags['key'] values and fixing them:
            
            if problem_chars.search(tag.attrib['k']) != None:
                continue  # any k values with problematic characters are ignored/removed
                
            else:
                if lower_colon.search(tag.attrib['k']) != None:
                    kvalue = tag.attrib['k'].split(":")
                    nodetags['type'] = kvalue[0]  # the first string becomes the type
                    
                    if len(kvalue) == 2:                        
                        nodetags['key'] = kvalue[1] # the second string (if there's only one colon, 
                                                    # becomes the key value
                    else:
                        nodetags['key'] = ':'.join(kvalue[1:]) # if there are more than one colon, 
                                                               # the rest are joined to become the key value
                        
                else:
                    nodetags['type'] = default_tag_type
                    nodetags['key'] = tag.attrib['k']
            
            if tag.attrib['v'] == '':
                nodetags['value'] == 'None' # makes sure that if the field is empty, it is field with 'None' (str)
                
            else:
                # cleaning of v attributes values:
                nodetags['value'] = clean(tag.attrib['v'], tag, mapping_street, expectedcities, mapping_city)
                
            tags.append(nodetags)
        
        return {'node': node_attribs, 'node_tags': tags}

    elif element.tag == 'way':
        for field in way_attr_fields:
            if element.attrib[field] == '':
                way_attribs[field] = '999999'
            else:
                way_attribs[field] = element.attrib[field]
        
        for tag in element.iter("tag"):
            waytags = {}
            if element.attrib['id'] == '':
                waytags['id'] = '999999'
            else:
                waytags['id'] = element.attrib['id']
           
            if problem_chars.search(tag.attrib['k']) != None:
                continue
            else:
                if lower_colon.search(tag.attrib['k']) != None:
                    kvalue = tag.attrib['k'].split(":")
                    waytags['type'] = kvalue[0]

                    if len(kvalue) == 2:
                        waytags['key'] = kvalue[1]
                        
                    else:
                        waytags['key'] = ':'.join(kvalue[1:])
                
                else:
                    waytags['type'] = default_tag_type
                    waytags['key'] = tag.attrib['k']
                    
            if tag.attrib['v'] == '':
                waytags['value'] = 'None'
            else:
                waytags['value'] = clean(tag.attrib['v'], tag, mapping_street, expectedcities, mapping_city)
        
            tags.append(waytags)
                
        position = 0
        for waytag in element.iter("nd"):
            waynd = {}
            if element.attrib['id'] == '':
                waynd['id'] = '999999'
            else:
                waynd['id'] = element.attrib['id']
            if waytag.attrib['ref'] == '':
                waynd['node_id'] = '999999'
            else:
                waynd['node_id'] = waytag.attrib['ref']

            waynd['position'] = position
            position += 1
            
            way_nodes.append(waynd)
        
        return {'way': way_attribs, 'way_nodes': way_nodes, 'way_tags': tags}

####### version provided by Myles

def validate_element(element, validator, schema=SCHEMA):
    
    """Raise ValidationError if element does not match schema"""
    
    if validator.validate(element, schema) is not True:
    
        field, errors = next(validator.errors.iteritems())
        
        message_string = "\nElement of type '{0}' has the following errors:\n{1}"
        
        error_strings = pprint.pformat(errors)
        
        raise Exception(message_string.format(field,error_strings))

In [23]:
def validate_element(element, validator, schema=SCHEMA):
    """Raise ValidationError if element does not match schema"""
    if validator.validate(element, schema) is not True:
        field, errors = next(validator.errors.iteritems())
        message_string = "\nElement of type '{0}' has the following errors:\n{1}"
        error_strings = (
            "{0}: {1}".format(k, v if isinstance(v, str) else ",".join(v))
            for k, v in errors.iteritems()
        )                 
        raise cerberus.ValidationError(
            message_string.format(field, "\n".join(error_strings))
        )

In [24]:
class UnicodeDictWriter(csv.DictWriter, object):
    """Extend csv.DictWriter to handle Unicode input"""

    def writerow(self, row):
        super(UnicodeDictWriter, self).writerow({
            k: (v.encode('utf-8') if isinstance(v, unicode) else v) for k, v in row.iteritems()
        })

    def writerows(self, rows):
        for row in rows:
            self.writerow(row)

In [25]:
def process_map_db(file_in, validate):
    """Iteratively process each XML element and write to csv(s)"""

    with codecs.open(NODES_PATH, 'w') as nodes_file, codecs.open(NODE_TAGS_PATH, 'w') as nodes_tags_file, codecs.open(WAYS_PATH, 'w') as ways_file, codecs.open(WAY_NODES_PATH, 'w') as way_nodes_file, codecs.open(WAY_TAGS_PATH, 'w') as way_tags_file:
        nodes_writer = UnicodeDictWriter(nodes_file, NODE_FIELDS)
        node_tags_writer = UnicodeDictWriter(nodes_tags_file, NODE_TAGS_FIELDS)
        ways_writer = UnicodeDictWriter(ways_file, WAY_FIELDS)
        way_nodes_writer = UnicodeDictWriter(way_nodes_file, WAY_NODES_FIELDS)
        way_tags_writer = UnicodeDictWriter(way_tags_file, WAY_TAGS_FIELDS)

        nodes_writer.writeheader()
        node_tags_writer.writeheader()
        ways_writer.writeheader()
        way_nodes_writer.writeheader()
        way_tags_writer.writeheader()

        validator = cerberus.Validator()

        for element in get_element(file_in, tags=('node', 'way')):
            el = shape_element(element)
            if el:
                if validate is True:
                    validate_element(el, validator)

                if element.tag == 'node':
                    nodes_writer.writerow(el['node'])
                    node_tags_writer.writerows(el['node_tags'])
                elif element.tag == 'way':
                    ways_writer.writerow(el['way'])
                    way_nodes_writer.writerows(el['way_nodes'])
                    way_tags_writer.writerows(el['way_tags'])

### Processing the sample file (validation False):

MacBook Pro Intel Core I7 2.66GHz, 8GB 1067 MHz DDR3 (Mid-2010) OS X El Capitan

In [39]:
from time import time
t0 = time()
process_map_db(SAMPLE_FILE, validate=False)
print "procesing_time: ", round(time() - t0, 3), "s"

procesing_time:  54.7 s


Dell Studio XPS 435T PC desktop Intel Core I7 920 @ 2.67GHz 8GB, Windows7

In [27]:
from time import time
t0 = time()
process_map_db(SAMPLE_FILE, validate=False)
print "procesing_time: ", round(time() - t0, 3), "s"

procesing_time:  40.114 s


### Processing the whole file:

Dell Studio XPS 435T PC desktop Intel Core I7 920 @ 2.67GHz 8GB, Windows7

validate=False:

In [26]:
from time import time
t0 = time()
process_map_db(OSM_FILE, validate=False)
print "procesing_time: ", round(time() - t0, 3), "s"

procesing_time:  409.512 s


validate=True

In [28]:
from time import time
t0 = time()
process_map_db(OSM_FILE, validate=True)
print "procesing_time: ", round(time() - t0, 3), "s"

procesing_time:  16756.77 s


Previous run (clean function not correct):

In [53]:
t0 = time()
process_map_db(OSM_FILE, validate=True)
print "procesing_time: ", round(time() - t0, 3), "s"

procesing_time:  17157.326 s
