In [1]:
# importing the modules that will be needed
from collections import defaultdict
import csv 
import xml.etree.ElementTree as ET
import numpy as np
import pandas as pd
import pprint
import re
import os
import codecs
import schema

In [2]:
# Create Sample File


OSM_FILE = "map_AJ.osm"  # Replace this with your osm file
SAMPLE_FILE = "sample.osm"


#!/usr/bin/env python
# -*- coding: utf-8 -*-

k = 5 # Parameter: take every k-th top level element

def get_element(osm_file, tags=('node', 'way', 'relation')):
    """Yield element if it is the right type of tag

    Reference:
    http://stackoverflow.com/questions/3095434/inserting-newlines-in-xml-file-generated-via-xml-etree-elementtree-in-python
    """
    context = iter(ET.iterparse(osm_file, events=('start', 'end')))
    _, root = next(context)
    for event, elem in context:
        if event == 'end' and elem.tag in tags:
            yield elem
            root.clear()


with open(SAMPLE_FILE, 'wb') as output:
    output.write('<?xml version="1.0" encoding="UTF-8"?>\n')
    output.write('<osm>\n  ')

    # Write every kth top level element
    for i, element in enumerate(get_element(OSM_FILE)):
        if i % k == 0:
            output.write(ET.tostring(element, encoding='utf-8'))

    output.write('</osm>')
    


In [3]:
# Iterative Parsing


# Count Tags
def count_tags(filename):
    tags = {}
    for _, elem in ET.iterparse(filename):
        tag = elem.tag
        if tag not in tags.keys():
            tags[tag] = 1
        else:
            tags[tag] += 1
    return tags


# Test count tags function
def test():
    tags = count_tags(OSM_FILE)
    pprint.pprint(tags)

    
test()

{'bounds': 1,
 'member': 13747,
 'meta': 1,
 'nd': 281901,
 'node': 228786,
 'note': 1,
 'osm': 1,
 'relation': 157,
 'tag': 153689,
 'way': 37136}


In [4]:
# Audit file

# Audit street names------------------------------------------------------------------------------
# Regular expression to check for characters at end of string, including optional period.
# Eg "Street" or "St."

street_type_re = re.compile(r'\S+\.?$', re.IGNORECASE)

# Common street names
expected = ["Street", "Avenue", "Boulevard", "Drive", "Court", "Place", "Square", "Lane", "Road", 
            "Road", "Parkway", "Commons", "Close", "Highway", "Circle", "Trail", "US"]


def audit_street_type(street_types, street_name):
    m = street_type_re.search(street_name)
    if m:
        street_type = m.group()
        if street_type not in expected:
            street_types[street_type].add(street_name)



def is_street_name(elem):
    return (elem.tag == "tag") and (elem.attrib['k'] == "addr:street")

# Iterate over the osmfile and create a dictionary mapping from expected street names
# to collected streets.
def audit(osmfile):
    osm_file = open(osmfile, "r")
    street_types = defaultdict(set)
    for event, elem in ET.iterparse(osm_file, events=("start",)):

        if elem.tag == "node" or elem.tag == "way":
            for tag in elem.iter("tag"):
                if is_street_name(tag):
                    audit_street_type(street_types, tag.attrib['v'])

    osm_file.close()
    return street_types   

            

In [5]:
# Test audit function
audit(OSM_FILE)

defaultdict(set,
            {'60': {'E US Hwy 60', 'East US Highway 60'},
             'Ave': {'E Osage Ave'},
             'Cheshire': {'South Cheshire'},
             'Lansing': {'S Lansing'},
             'Rd.': {'5810 Alameda Rd.'},
             'Saguaro': {'South Camino Saguaro'},
             'St': {'North 99th St'}})

In [6]:
# Mapping for names to be updated
mapping = { "St": "Street",
            "Rd": "Road",
            "Rd.": "Road",
            "Ave": "Avenue",
            "S L": "South L",
            "E ": "East ",
            "5810 A":"A",
            "Hwy ":"Highway "
            }

# Improving Street names
def update_name(name, mapping):
    for key in mapping.iterkeys():
        if re.search(key, name):
            name = re.sub(key, mapping[key], name)

    return name

def improve_street_name():
    st_types = audit(OSM_FILE)
    pprint.pprint(dict(st_types))

    for st_type, ways in st_types.iteritems():
        for name in ways:
            better_name = update_name(name, mapping)            
            print name, "=>", better_name
    
    #Second Pass        
    if "Road." in better_name:
        better_name = better_name.replace("Road.", "Road")
        print name, "=>", better_name    
            

In [7]:
# Clean streets 
improve_street_name()

{'60': set(['E US Hwy 60', 'East US Highway 60']),
 'Ave': set(['E Osage Ave']),
 'Cheshire': set(['South Cheshire']),
 'Lansing': set(['S Lansing']),
 'Rd.': set(['5810 Alameda Rd.']),
 'Saguaro': set(['South Camino Saguaro']),
 'St': set(['North 99th St'])}
S Lansing => South Lansing
East US Highway 60 => East US Highway 60
E US Hwy 60 => East US Highway 60
North 99th St => North 99th Street
South Camino Saguaro => South Camino Saguaro
South Cheshire => South Cheshire
E Osage Ave => East Osage Avenue
5810 Alameda Rd. => Alameda Road.
5810 Alameda Rd. => Alameda Road


In [8]:
# Check Postalcodes for addresses 

# Regular expression to check whether postalcode is in appropriate format
postcode_re = re.compile('^[A-Z]{1,2}[0-9]{1,2}[A-Z]? [0-9][A-Z]{2}$') 

def is_postcode(elem):
    return (elem.tag == "tag") and (elem.attrib['k'] == "addr:postcode")


# Search for postcodes within "way" and "node"
def find_postcode():
    osm_file = open(OSM_FILE, "r")
    postcode_types = set()
    odd_postcode = set()
    for event, elem in ET.iterparse(osm_file, events=("start",)):

        if elem.tag == "node" or elem.tag == "way":
            for tag in elem.iter("tag"):
                if is_postcode(tag):
                    m = postcode_re.search(tag.attrib['v'])
                    if m:
                        postcode_types.add(tag.attrib['v'])  
                    else:
                        odd_postcode.add(tag.attrib['v'])
                        

    osm_file.close()


    return (postcode_types, odd_postcode)



In [9]:
# Audit postal codes
find_postcode()

(set(),
 {'85118',
  '85119',
  '85120',
  '85207',
  '85208',
  '85208-2305',
  '85209',
  '85212',
  '85219',
  '85220',
  '85270'})

In [10]:
# Noted a postalcode with a +4 number, will drop this and maintain only 9-digit postal code

area_postcode_re = re.compile('^[A-Z]{1,2}[0-9]{1,2}[A-Z]? ?[0-9]?$')

def update_postcode(odd_postcode):
    if area_postcode_re.search(odd_postcode):
        postcode = " "
    else:
        postcode = odd_postcode.split("-")[0]
    return postcode


def improve_postcode():
    postcode_all = find_postcode()

    for postcode in postcode_all[1]:
        better_postcode = update_postcode(postcode)
        print postcode, "=>", better_postcode

In [11]:
# Fix postal codes
improve_postcode()

85208 => 85208
85209 => 85209
85220 => 85220
85219 => 85219
85208-2305 => 85208
85212 => 85212
85207 => 85207
85118 => 85118
85120 => 85120
85270 => 85270
85119 => 85119


In [12]:
import xml.etree.cElementTree as ET
import pprint
import re

def get_user(element):
    return

# Generates list of users
def process_users(filename):
    users = set()
    for _, element in ET.iterparse(filename):
        name = element.attrib.get('user')
        if name != None:
            if name not in users:
                users.add(name)
                
    pass
    return users

In [13]:
# Check Users
process_users(OSM_FILE)

{'0xStephen',
 'AJ Riley',
 'Adam Martin',
 'Adam Schneider',
 'Adamant1',
 'Alan Bragg',
 'AndyAyre',
 'Arcticmarine',
 'ArminGh',
 'Baloo Uriza',
 'Bhojaraj',
 'Bopcommander',
 'Bryan_W',
 'Caboosey',
 'CamelCaseNick',
 'Carnildo',
 'CartoCrazy',
 'Cato_d_Ae',
 'Chargerrt28',
 'Chris Bell in California',
 'Chris Lawrence',
 'Chris-Eleatha',
 'ChrisMorris',
 'Conan Brink',
 'Cool_DPS',
 'DannyAiquipa',
 'Daungg',
 'David Maciaszek',
 'David Paleino',
 'Derick Rethans',
 'Dilys',
 'Dr Kludge',
 'DrHog',
 'Duff614',
 'Edward',
 'ErichRitz',
 'Fluffy89502',
 'FvGordon',
 'GREGMAP1',
 'Gerard Jeronowitz',
 'GerdP',
 'Glassman',
 'GoWestTravel',
 'Grant Anderson',
 'GreggTownsend',
 'Guylamar2006',
 'HJUdall',
 'HoloDuke',
 'Hoodzow',
 'Iowa Kid',
 'Iqhra',
 'Jamie Mueller',
 'Jesse Hamlin',
 'JesseFW',
 'Jon Hanson',
 'JulienBalas',
 'KR-KRKR-KR',
 'KinkyKinkles',
 'KripaluShanti',
 'KristenK',
 'LastNameConnors',
 'Luis36995',
 'Map King',
 'MapClick',
 'MatthewAndersonUS80',
 'Megan A',

In [14]:
# Count Tags 
import xml.etree.cElementTree as ET
import pprint
import re

# Regex
lower = re.compile(r'^([a-z]|_)*$')
lower_colon = re.compile(r'^([a-z]|_)*:([a-z]|_)*$')
problemchars = re.compile(r'[=\+/&<>;\'"\?%#$@\,\. \t\r\n]')

# Checks tags for different kinds of characters and formats
def key_type(element, keys):
    if element.tag == 'tag':
        if lower.search(element.attrib['k']):
            keys['lower'] += 1
        elif lower_colon.search(element.attrib['k']):
            keys['lower_colon'] += 1
        elif problemchars.search(element.attrib['k']):
            keys['problemchars'] += 1
        else:
            keys['other'] += 1
        pass
    
    return keys

# Main function - counts up number of different types of tags
def process_keys(filename):
    keys = {'lower': 0, 'lower_colon': 0, 'problemchars': 0, 'other': 0}
    for _, element in ET.iterparse(filename):
        keys = key_type(element, keys)
        
    return keys

In [15]:
process_keys(OSM_FILE)

{'lower': 92690, 'lower_colon': 57758, 'other': 3241, 'problemchars': 0}

In [16]:
# Data to CSV
import csv
import codecs
import re
import xml.etree.cElementTree as ET
from unittest import TestCase
import cerberus
import schema

# Make sure the fields order in the csvs matches the column order in the
# sql table schema
NODES_PATH = "nodes.csv"
NODE_TAGS_PATH = "nodes_tags.csv"
WAYS_PATH = "ways.csv"
WAY_NODES_PATH = "ways_nodes.csv"
WAY_TAGS_PATH = "ways_tags.csv"

LOWER_COLON = re.compile(r'^([a-z]|_)+:([a-z]|_)+')
PROBLEMCHARS = re.compile(r'[=\+/&<>;\'"\?%#$@\,\. \t\r\n]')

SCHEMA = schema.schema

# Make sure the fields order in the csvs matches the column order in the sql table schema
NODE_FIELDS = ['id', 'lat', 'lon', 'user', 'uid', 'version', 'changeset', 'timestamp']
NODE_TAGS_FIELDS = ['id', 'key', 'value', 'type']
WAY_FIELDS = ['id', 'user', 'uid', 'version', 'changeset', 'timestamp']
WAY_TAGS_FIELDS = ['id', 'key', 'value', 'type']
WAY_NODES_FIELDS = ['id', 'node_id', 'position']

# looks for the incorrect street types in the street names by comparing them to the "expected" list
# and then puts them in a list called street_types
# uses the regular expression "street_type_re" defined prevously to locate the street type within the street name
def audit_street_type(street_types, street_name):
    m = street_type_re.search(street_name)
    if m:
        street_type = m.group()
        if street_type not in expected:
            street_types[street_type].add(street_name)
            
# finds the street names in the map.xml file
def is_street_name(elem):
    return (elem.attrib['k'] == "addr:street")

# executes the audit_street_type and is_street_name functions to fill the street_types dictionary
def audit(osmfile):
    osm_file = open(osmfile, "r")
    street_types = defaultdict(set)
    for event, elem in ET.iterparse(osm_file, events=("start",)):

        if elem.tag == "node" or elem.tag == "way":
            for tag in elem.iter("tag"):
                if is_street_name(tag):
                    audit_street_type(street_types, tag.attrib['v'])
    osm_file.close()
    return street_types

# fixes the street type in the street name
def update_name(name, mapping):
    m = street_type_re.search(name)
    if m:
        for i in mapping:
            if i == m.group():
                name = re.sub(street_type_re, mapping[i], name)
    return name

# finds the zip codes in the address 
def is_postcode(elem): 
    return (elem.attrib['k'] == "addr:postcode" or elem.attrib['k'] == "postal_code")

# creates a list of zipcodes
def audit_postcode(postcodes, postcode):
    postcodes[postcode].add(postcode)
    return postcodes

# updates/cleans the zipcodes 
def update_postcode(postcode):
    if re.findall(r'^\d{5}$', postcode): # 5 digits
        valid_postcode = postcode
        return valid_postcode  
    else:
        return None

# Shape each element into several data structures
# Clean and shape node or way XML element to Python dict
def shape_element(element, node_attr_fields=NODE_FIELDS, way_attr_fields=WAY_FIELDS,
                  problem_chars=PROBLEMCHARS, default_tag_type='regular'):
    
    node_attribs = {} 
    way_attribs = {}
    way_nodes = []
    tags = []

    if element.tag == 'node':
        for i in NODE_FIELDS:
            node_attribs[i] = element.attrib[i]
        for tag in element.iter("tag"):  
            problem = PROBLEMCHARS.search(tag.attrib['k'])
            if not problem:
                node_tag = {} 
                node_tag['id'] = element.attrib['id'] 
                node_tag['value'] = tag.attrib['v']  

                match = LOWER_COLON.search(tag.attrib['k'])
                if not match:
                    node_tag['type'] = 'regular'
                    node_tag['key'] = tag.attrib['k']
                else:
                    bef_colon = re.findall('^(.+):', tag.attrib['k'])
                    aft_colon = re.findall('^[a-z|_]+:(.+)', tag.attrib['k'])
                    node_tag['type'] = bef_colon[0]
                    node_tag['key'] = aft_colon[0]
                    if node_tag['type'] == "addr" and node_tag['key'] == "street":
                        # update street name
                        node_tag['value'] = update_name(tag.attrib['v'], mapping) 
                    elif node_tag['type'] == "addr" and node_tag['key'] == "postcode":
                        # update post code
                        node_tag['value'] = update_postcode(tag.attrib['v']) 
            tags.append(node_tag)
        
        return {'node': node_attribs, 'node_tags': tags}
    
    elif element.tag == 'way':
        for i in WAY_FIELDS:
            way_attribs[i] = element.attrib[i]
        for tag in element.iter("tag"):
            problem = PROBLEMCHARS.search(tag.attrib['k'])
            if not problem:
                way_tag = {}
                way_tag['id'] = element.attrib['id'] 
                way_tag['value'] = tag.attrib['v']
                match = LOWER_COLON.search(tag.attrib['k'])
                if not match:
                    way_tag['type'] = 'regular'
                    way_tag['key'] = tag.attrib['k']
                else:
                    bef_colon = re.findall('^(.+?):+[a-z]', tag.attrib['k'])
                    aft_colon = re.findall('^[a-z|_]+:(.+)', tag.attrib['k'])

                    way_tag['type'] = bef_colon[0]
                    way_tag['key'] = aft_colon[0]
                    if way_tag['type'] == "addr" and way_tag['key'] == "street":
                        way_tag['value'] = update_name(tag.attrib['v'], mapping) 
                    elif way_tag['type'] == "addr" and way_tag['key'] == "postcode":
                        way_tag['value'] = update_postcode(tag.attrib['v']) 
            tags.append(way_tag)
        position = 0
        for tag in element.iter("nd"):  
            nd = {}
            nd['id'] = element.attrib['id'] 
            nd['node_id'] = tag.attrib['ref'] 
            nd['position'] = position  
            position += 1
            
            way_nodes.append(nd)
    
        return {'way': way_attribs, 'way_nodes': way_nodes, 'way_tags': tags}




# Helper Functions            

# Yield element if it is the right type of tag
def get_element(osm_file, tags=('node', 'way', 'relation')):
    context = ET.iterparse(osm_file, events=('start', 'end'))
    _, root = next(context)
    for event, elem in context:
        if event == 'end' and elem.tag in tags:
            yield elem
            root.clear()

# Raise ValidationError if element does not match schema
def validate_element(element, validator, schema=SCHEMA):
    if validator.validate(element, schema) is not True:
        field, errors = next(validator.errors.iteritems())
        message_string = "\nElement of type '{0}' has the following errors:\n{1}"
        error_string = pprint.pformat(errors)
        
        raise Exception(message_string.format(field, error_string))

# Extend csv.DictWriter to handle Unicode input
class UnicodeDictWriter(csv.DictWriter, object):
    def writerow(self, row):
        super(UnicodeDictWriter, self).writerow({
            k: (v.encode('utf-8') if isinstance(v, unicode) else v) for k, v in row.iteritems()
        })

    def writerows(self, rows):
        for row in rows:
            self.writerow(row)


# Main Function                    

# Iteratively process each XML element and write to csv(s)
def process_map(file_in, validate):

    with codecs.open(NODES_PATH, 'w') as nodes_file, \
         codecs.open(NODE_TAGS_PATH, 'w') as nodes_tags_file, \
         codecs.open(WAYS_PATH, 'w') as ways_file, \
         codecs.open(WAY_NODES_PATH, 'w') as way_nodes_file, \
         codecs.open(WAY_TAGS_PATH, 'w') as way_tags_file:

        nodes_writer = UnicodeDictWriter(nodes_file, NODE_FIELDS)
        node_tags_writer = UnicodeDictWriter(nodes_tags_file, NODE_TAGS_FIELDS)
        ways_writer = UnicodeDictWriter(ways_file, WAY_FIELDS)
        way_nodes_writer = UnicodeDictWriter(way_nodes_file, WAY_NODES_FIELDS)
        way_tags_writer = UnicodeDictWriter(way_tags_file, WAY_TAGS_FIELDS)

        nodes_writer.writeheader()
        node_tags_writer.writeheader()
        ways_writer.writeheader()
        way_nodes_writer.writeheader()
        way_tags_writer.writeheader()



        for element in get_element(file_in, tags=('node', 'way')):
            el = shape_element(element)
            if el:
                if validate is True:
                    validate_element(el, validator)

                if element.tag == 'node':
                    nodes_writer.writerow(el['node'])
                    node_tags_writer.writerows(el['node_tags'])
                elif element.tag == 'way':
                    ways_writer.writerow(el['way'])
                    way_nodes_writer.writerows(el['way_nodes'])
                    way_tags_writer.writerows(el['way_tags'])


if __name__ == '__main__':
    process_map(OSM_FILE, validate=False)

In [17]:
process_map(OSM_FILE, validate=False)