In [2]:
import xml.etree.cElementTree as ET
import pprint
import re
from collections import defaultdict

In [3]:
tree=ET.parse('./sample.osm')
root = tree.getroot()

In [4]:
root.tag

'osm'

In [5]:
#function to count the # of unique tag types in the file

tags={}

def count_tags(file):
    for child in root:
        if child.tag in tags.keys():
            tags[child.tag]+=1
        else:
            tags[child.tag]=1
    return tags

In [6]:
count_tags(tree)

{'node': 8440, 'way': 1001, 'relation': 10}

In [7]:
del(tree)

In [8]:
#functions for Street Name Audits

#set variables
street_type_re = re.compile(r'\b\S+\.?$', re.IGNORECASE)
street_count = defaultdict(int)

#Main Function - Counts # of instances of street types

def streets_count(osmfile):
    with open(osmfile, "rb") as osm_file:
        for event, elem in ET.iterparse(osm_file):
            if is_street(elem):
                audit_street_count(street_count, elem.attrib['v'])    
        osm_file.close()
    print_sorted_dict(street_count)   

#is street function -- returns true if passed a "street name" element

def is_street(elem):
    return (elem.tag == "tag") and (elem.attrib['k'] == "addr:street")

# audit_street_count function -- looks for variable based on variable set and ups count in dictionary if present
def audit_street_count(street_types, street_name):
    m = street_type_re.search(street_name)
    if m:
        street_type = m.group()
        street_count[street_type] += 1
        
# sets up dictionary to print dictionary created in a columnar view        
def print_sorted_dict(d):
    keys = d.keys()
    keys = sorted(keys, key=lambda s: s.lower())
    for k in keys:
        v = d[k]
        print ("%s: %d" % (k, v) )

In [10]:
streets_count('./sample.osm')

A: 1
Alley: 2
Ave: 18
ave: 1
Avenue: 1648
Boulevard: 3
BowenStreet: 1
Broadway: 11
broadway: 1
Circle: 54
Court: 188
Ct.: 1
Drive: 884
Hill: 2
Lane: 251
Parkway: 3
Pike: 143
PIKE: 1
Pkwy: 2
Place: 26
Plaza: 5
Rd: 4
Road: 667
Sq.: 1
Square: 7
St: 8
st: 1
St.: 1
Street: 2263
street: 1
Terrace: 1
Trail: 9
Way: 118
West: 2
Wy: 2


In [11]:
#function to evaluate specific values from previous data pull
def find_type(filename, search):
    outp = set()
    
    with open(filename, "rb") as osm_file:
        for event, element in ET.iterparse(osm_file):
            for item in element.iter():
                if 'tag' in item.tag:
                    if item.attrib['k'] == "addr:street":
                        match = re.search(r'\b' + search + r'\b',
                                          item.attrib['v'], re.IGNORECASE)
                        if match:
                            outp.add(item.attrib['v'])
    print (outp)

In [12]:
find_type('./sample.osm','pike')

{'Putnam Pike'}


In [13]:
find_type('./sample.osm', 'hill')

{'Brook Hill Drive'}


In [14]:
find_type('./sample.osm','wy')

set()


In [15]:
#additional functions to further evaluate unexpected street types

#set expected street types
expected_street = ['Street', 'Avenue', 'Boulevard', 'Drive', 'Court', 'Place', 'Square', 'Lane', 'Road',
            'Parkway', 'Commons', 'Highway', 'Loop', 'Terrace', 'Trail', 'Way', 'North', 'South',
            'West', 'East', 'Circle', 'Broadway', 'Path', 'View', 'Plaza', 'Pike', 'Hill', 'Alley']

#compare street type to expected types

def audit_street_type(street_types, street_name, expected=expected_street):
    m = street_type_re.search(street_name)
    if m:
        street_type = m.group()
        if street_type not in expected:
            street_types[street_type].add(street_name)
            
# main function - returns a dictionary of unexpected street types           
def audit(osmfile):
    with open(osmfile, "rb") as osm_file:
        street_types = defaultdict(set)
        for event, elem in ET.iterparse(osm_file, events=("start",)):

            if elem.tag == "node" or elem.tag == "way":
                for tag in elem.iter("tag"):
                    if is_street(tag):
                        audit_street_type(street_types, tag.attrib['v'])

    return street_types

In [16]:
audit('./sample.osm')

defaultdict(set, {'Ave': {'Atwood Ave'}})

In [17]:
#identify if cities other than providence are included in file
bad_city = set()
osm_file = open('./sample.osm', "rb")
for event, element in ET.iterparse(osm_file):
    for item in element.iter():
        if 'tag' in item.tag:
            if item.attrib['k'] == "addr:city":
                match = re.search(r'\bProvidence\b',item.attrib['v'])
                if not match:
                    bad_city.add(item.attrib['v'])
                    #print(item.attrib['k'], item.attrib['v'])
               
osm_file.close()
print (bad_city)

{'Johnston', 'Rumford', 'Seekonk'}


In [18]:
#check zip codes to make sure that Providence ones are within expected range

#expected zip codes
expected_zip = ('02901', '02901', '02902', '02903','02904', '02905', '02906', '02907', '02908', '02909','02910',
               '02911', '02912','02918', '02919', '02940')

#takes tags and puts in a dictionary for easier iteration
def tags2dict(el):
    if el.tag not in ('node', 'way'): return {}
    return {e.attrib['k']: e.attrib['v'] for e in el.findall('tag')}

#main function -- looks at zip codes associated with providence and returns bad zip codes
def audit_zip(file, zip = expected_zip):
    with open(file, "rb") as osm_file:
        bad_zip = set()
        for event, element in ET.iterparse(osm_file):
            if element.tag in ('node', 'way'):
                #Convert the key/value pairs in the `tag`s under this element to a dictionary.
                details = tags2dict(element)
                if 'addr:postcode' in details and 'addr:city' in details:
                    if details['addr:postcode'] not in zip and 'providence' == details['addr:city'].lower():
                        bad_zip.add(details['addr:postcode'])
    print(bad_zip)

In [30]:
audit_zip('./map.osm')

{'4369', '02906-4800', '02903-2996', '02906-1189', '02860', '02093', '02903-4016'}


In [20]:
#set mapping for codification
street_mapping = {"Ave": "Avenue",
           "St" : "Street",
           "St." : "Street",
           "Ct." : "Court",
           "Rd" : "Road",
           "ave" : "Avenue",
           "Ave." : "Avenue",
           "st" : "Street",
           "Pkwy" : "Parkway",
           "Sq" :  "Square",
           "Wy" : "Way",
           "BowenStreet" : "Street",
           "PIKE" : "Pike",
           "street" : "Street",
           "broadway" : 'Broadway'
          }

zip_mapping = {
    '02906-1189' : '02906',
    '02093' : '02903',
    '4369' : '02909',
    '02906-4800' : '02906',
    '02903-4016': '02903'
}

In [21]:
import csv
import codecs
import cerberus
import schema

OSM_PATH = "example.osm"

NODES_PATH = "nodes.csv"
NODE_TAGS_PATH = "nodes_tags.csv"
WAYS_PATH = "ways.csv"
WAY_NODES_PATH = "ways_nodes.csv"
WAY_TAGS_PATH = "ways_tags.csv"

LOWER_COLON = re.compile(r'^([a-z]|_)+:([a-z]|_)+')
                       # r'^([a-z]|_)*:([a-z]|_)*$'
PROBLEMCHARS = re.compile(r'[=\+/&<>;\'"\?%#$@\,\. \t\r\n]')



# Make sure the fields order in the csvs matches the column order in the sql table schema
NODE_FIELDS = ['id', 'lat', 'lon', 'user', 'uid', 'version', 'changeset', 'timestamp']
NODE_TAGS_FIELDS = ['id', 'key', 'value', 'type']
WAY_FIELDS = ['id', 'user', 'uid', 'version', 'changeset', 'timestamp']
WAY_TAGS_FIELDS = ['id', 'key', 'value', 'type']
WAY_NODES_FIELDS = ['id', 'node_id', 'position']

In [22]:
schema = {
    'node': {
        'type': 'dict',
        'schema': {
            'id': {'required': True, 'type': 'integer', 'coerce': int},
            'lat': {'required': True, 'type': 'float', 'coerce': float},
            'lon': {'required': True, 'type': 'float', 'coerce': float},
            'user': {'required': True, 'type': 'string'},
            'uid': {'required': True, 'type': 'integer', 'coerce': int},
            'version': {'required': True, 'type': 'string'},
            'changeset': {'required': True, 'type': 'integer', 'coerce': int},
            'timestamp': {'required': True, 'type': 'string'}
        }
    },
    'node_tags': {
        'type': 'list',
        'schema': {
            'type': 'dict',
            'schema': {
                'id': {'required': True, 'type': 'integer', 'coerce': int},
                'key': {'required': True, 'type': 'string'},
                'value': {'required': True, 'type': 'string'},
                'type': {'required': True, 'type': 'string'}
            }
        }
    },
    'way': {
        'type': 'dict',
        'schema': {
            'id': {'required': True, 'type': 'integer', 'coerce': int},
            'user': {'required': True, 'type': 'string'},
            'uid': {'required': True, 'type': 'integer', 'coerce': int},
            'version': {'required': True, 'type': 'string'},
            'changeset': {'required': True, 'type': 'integer', 'coerce': int},
            'timestamp': {'required': True, 'type': 'string'}
        }
    },
    'way_nodes': {
        'type': 'list',
        'schema': {
            'type': 'dict',
            'schema': {
                'id': {'required': True, 'type': 'integer', 'coerce': int},
                'node_id': {'required': True, 'type': 'integer', 'coerce': int},
                'position': {'required': True, 'type': 'integer', 'coerce': int}
            }
        }
    },
    'way_tags': {
        'type': 'list',
        'schema': {
            'type': 'dict',
            'schema': {
                'id': {'required': True, 'type': 'integer', 'coerce': int},
                'key': {'required': True, 'type': 'string'},
                'value': {'required': True, 'type': 'string'},
                'type': {'required': True, 'type': 'string'}
            }
        }
    }
}


SCHEMA = schema

In [23]:
def load_new_tag(elid, tag, default_tag_type):
    """
    Turn tag (a `tag` element) into a dictionary.
    """
    new = {}
    new['id'] = elid
    value = tag.attrib['v']
    
    if ":" in tag.attrib['k']:
        typ, key = tag.attrib['k'].split(':', 1)
    else:
        key = tag.attrib['k']
        typ = default_tag_type
        
    # Fix broken ZIP Code.
    if key == 'postcode' and value in zip_mapping.keys():
        value = f'="zip_mapping[value]"'
    if key == 'postcode':
        value = format_numeric_for_workbook(value)
    
    # Fix broken street names.
    elif is_street(tag): 
        st = get_street_type(value) 
        if st in street_mapping.keys():
            newst = street_mapping['st']
            value = re.sub(street_type_re, newst, value)
        
    new['key'] = key
    new['value'] = value
    new['type'] = typ
    return new

In [24]:
def format_numeric_for_workbook(val):
    """ Given a numeric value, return the special incantations needed to prevent Excel,
    Numbers, or Google Sheets from dropping leading zeros.
    """
    if not val:
        return ""
    return f'="{val}"'

In [25]:
def shape_element(element, node_attr_fields=NODE_FIELDS, way_attr_fields=WAY_FIELDS,
                  problem_chars=PROBLEMCHARS, default_tag_type='regular'):
    """Clean and shape node or way XML element to Python dict"""

    node_attribs = {}
    way_attribs = {}
    way_nodes = []
    tagels = []  # Handle secondary tags the same way for both node and way elements

    if element.tag == 'node':
        for attrib, value in element.attrib.items():
            if attrib in node_attr_fields:
                node_attribs[attrib] = value
        
        # for elements within the top element
        for tag in element.iter('tag'):
            if problem_chars.match(tag.attrib['k']) is not None:
                continue
            else:
                new = load_new_tag(node_attribs['id'], tag, default_tag_type)
                if new['key'] == 'city' and new['value'].lower() != 'providence':
                    return None
                tagels.append(new)
        outp = {'node': node_attribs, 'node_tags': tagels}
        #print(outp)
        return outp
    elif element.tag == 'way':
        #TODO
        #return False
        for attrib, value in element.attrib.items():
            if attrib in way_attr_fields:
                way_attribs[attrib] = value 
        counter = 0
        for secondary in element.iter():
            if secondary.tag == 'tag':
                if problem_chars.match(secondary.attrib['k']) is not None:
                    continue
                else:
                    new = load_new_tag(element.attrib['id'], secondary, default_tag_type)
                    tagels.append(new)
            if secondary.tag == 'nd':
                newnd = {}
                newnd['id'] = element.attrib['id']
                newnd['node_id'] = secondary.attrib['ref']
                newnd['position'] = counter
                counter += 1
                way_nodes.append(newnd)
        output = {'way': way_attribs, 'way_nodes': way_nodes, 'way_tags':tagels}
        return output

In [26]:
# ================================================== #
#               Helper Functions                     #
# ================================================== #
def get_street_type(street_name):
    m = street_type_re.search(street_name)
    if m:
        return m.group()
    else:
        return None
    
def get_element(osm_file, tags=('node', 'way', 'relation')):
    """Yield element if it is the right type of tag"""

    context = ET.iterparse(osm_file, events=('start', 'end'))
    _, root = next(context)
    for event, elem in context:
        if event == 'end' and elem.tag in tags:
            yield elem
            root.clear()
            
def validate_element(element, validator, schema=SCHEMA):
    """Raise ValidationError if element does not match schema"""
    if validator.validate(element, schema) is not True:
        field, errors = next(iter(validator.errors.items()))
        #Iterator objects: d.iteritems() -> iter(d.items())
        message_string = "\nElement of type '{0}' has the following errors:\n{1}"
        error_string = pprint.pformat(errors)
        print(element)
        raise Exception(message_string.format(field, error_string))



class UnicodeDictWriter(csv.DictWriter, object):
    pass

In [27]:
# ================================================== #
#               Main Function                        #
# ================================================== #
def process_map(file_in, validate):
    """Iteratively process each XML element and write to csv(s)"""

    with codecs.open(NODES_PATH, 'w', encoding='utf-8') as nodes_file, \
         codecs.open(NODE_TAGS_PATH, 'w', encoding='utf-8') as nodes_tags_file, \
         codecs.open(WAYS_PATH, 'w', encoding='utf-8') as ways_file, \
         codecs.open(WAY_NODES_PATH, 'w', encoding='utf-8') as way_nodes_file, \
         codecs.open(WAY_TAGS_PATH, 'w', encoding='utf-8') as way_tags_file:

        nodes_writer = csv.DictWriter(nodes_file, NODE_FIELDS)
        node_tags_writer = csv.DictWriter(nodes_tags_file, NODE_TAGS_FIELDS)
        ways_writer = csv.DictWriter(ways_file, WAY_FIELDS)
        way_nodes_writer = csv.DictWriter(way_nodes_file, WAY_NODES_FIELDS)
        way_tags_writer = csv.DictWriter(way_tags_file, WAY_TAGS_FIELDS)

        nodes_writer.writeheader()
        node_tags_writer.writeheader()
        ways_writer.writeheader()
        way_nodes_writer.writeheader()
        way_tags_writer.writeheader()

        validator = cerberus.Validator(schema)

        for element in get_element(file_in, tags=('node', 'way')):
            el = shape_element(element)
            if el:
                if validate is True:
                    validate_element(el, validator)

                if element.tag == 'node':
                    nodes_writer.writerow(el['node'])
                    node_tags_writer.writerows(el['node_tags'])
                elif element.tag == 'way':
                    ways_writer.writerow(el['way'])
                    way_nodes_writer.writerows(el['way_nodes'])
                    way_tags_writer.writerows(el['way_tags'])

In [28]:
process_map('.\sample.osm', validate=True)

In [1]:
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import xml.etree.ElementTree as ET  # Use cElementTree or lxml if too slow

OSM_FILE = ".\map.osm"  # Replace this with your osm file
SAMPLE_FILE = "sample.osm"

k = 100 # Parameter: take every k-th top level element

def get_element(osm_file, tags=('node', 'way', 'relation')):
    """Yield element if it is the right type of tag

    Reference:
    http://stackoverflow.com/questions/3095434/inserting-newlines-in-xml-file-generated-via-xml-etree-elementtree-in-python
    """
    context = iter(ET.iterparse(osm_file, events=('start', 'end')))
    _, root = next(context)
    for event, elem in context:
        if event == 'end' and elem.tag in tags:
            yield elem
            root.clear()


with open(SAMPLE_FILE, 'w', encoding = 'utf-8') as output:
    output.write('<?xml version="1.0" encoding="UTF-8"?>\n')
    output.write('<osm>\n  ')

    # Write every kth top level element
    for i, element in enumerate(get_element(OSM_FILE)):
        if i % k == 0:
            output.write(ET.tostring(element, encoding='utf-8').decode('utf-8'))

    output.write('</osm>')