In [82]:
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import io
import csv
import codecs
import pprint
import re
import xml.etree.cElementTree as ET
from collections import defaultdict
import cerberus
from schema import Schema

In [83]:
OSM_PATH = "St. John's_NL_Canada.osm" #"test_file.osm"  

NODES_PATH = "nodes.csv"
NODE_TAGS_PATH = "nodes_tags.csv"
WAYS_PATH = "ways.csv"
WAY_NODES_PATH = "ways_nodes.csv"
WAY_TAGS_PATH = "ways_tags.csv"

LOWER_COLON = re.compile(r'^([a-z]|_)+:([a-z]|_)+')
PROBLEMCHARS = re.compile(r'[=\+/&<>;\'"\?%#$@\,\. \t\r\n]')


# Make sure the fields order in the csvs matches the column order in the sql table schema
NODE_FIELDS = ['id', 'lat', 'lon', 'user', 'uid', 'version', 'changeset', 'timestamp']
NODE_TAGS_FIELDS = ['id', 'key', 'value', 'type']
WAY_FIELDS = ['id', 'user', 'uid', 'version', 'changeset', 'timestamp']
WAY_TAGS_FIELDS = ['id', 'key', 'value', 'type']
WAY_NODES_FIELDS = ['id', 'node_id', 'position']

In [84]:
SCHEMA = {
    'node': {
        'type': 'dict',
        'schema': {
            'id': {'required': True, 'type': 'integer', 'coerce': int},
            'lat': {'required': True, 'type': 'float', 'coerce': float},
            'lon': {'required': True, 'type': 'float', 'coerce': float},
            'user': {'required': True, 'type': 'string'},
            'uid': {'required': True, 'type': 'integer', 'coerce': int},
            'version': {'required': True, 'type': 'string'},
            'changeset': {'required': True, 'type': 'integer', 'coerce': int},
            'timestamp': {'required': True, 'type': 'string'}
        }
    },
    'node_tags': {
        'type': 'list',
        'schema': {
            'type': 'dict',
            'schema': {
                'id': {'required': True, 'type': 'integer', 'coerce': int},
                'key': {'required': True, 'type': 'string'},
                'value': {'required': True, 'type': 'string'},
                'type': {'required': True, 'type': 'string'}
            }
        }
    },
    'way': {
        'type': 'dict',
        'schema': {
            'id': {'required': True, 'type': 'integer', 'coerce': int},
            'user': {'required': True, 'type': 'string'},
            'uid': {'required': True, 'type': 'integer', 'coerce': int},
            'version': {'required': True, 'type': 'string'},
            'changeset': {'required': True, 'type': 'integer', 'coerce': int},
            'timestamp': {'required': True, 'type': 'string'}
        }
    },
    'way_nodes': {
        'type': 'list',
        'schema': {
            'type': 'dict',
            'schema': {
                'id': {'required': True, 'type': 'integer', 'coerce': int},
                'node_id': {'required': True, 'type': 'integer', 'coerce': int},
                'position': {'required': True, 'type': 'integer', 'coerce': int}
            }
        }
    },
    'way_tags': {
        'type': 'list',
        'schema': {
            'type': 'dict',
            'schema': {
                'id': {'required': True, 'type': 'integer', 'coerce': int},
                'key': {'required': True, 'type': 'string'},
                'value': {'required': True, 'type': 'string'},
                'type': {'required': True, 'type': 'string'}
            }
        }
    }
}


In [85]:
# Code to modify street names according to the mapping before saving to excel files

street_type_re = re.compile(r'\b\S+\.?$', re.IGNORECASE)

mapping_street_names = { "st" : "Street",
            "Rd" : "Road",
            "Rd.": "Road",
            "Ave": "Avenue",
            'Extention': "Extension",
            'Monkstown' : 'Monkstown Road',
            'Harvey': 'Harvey Road',
            'Hayward': 'Hayward Avenue', 
            'Larkhall': 'Larkhall Street',
            'Maxse': 'Maxse Street',
            'Monkstown': 'Monkstown Road',
            'Williams': 'Williams Heights',
            'catherine': 'catherine Street'
           
            }

def is_street_name(elem):
    return (elem.attrib['k'] == "addr:street")


def update_name(name, mapping):
    
    m = street_type_re.search(name)
    if m:        
        st_type = m.group()
        if st_type in mapping:
            name = re.sub(street_type_re, mapping[st_type], name)
                            
    return name

In [86]:
# Code to modify post codes if required before saving to excel files

def is_post_code(elem):
    return (elem.attrib['k'] == "addr:postcode")

def update_post_code(post_code):
        
        if len(post_code) == 6:
            better_post_code = post_code[0:3]+" "+ post_code[3:]
            return  (better_post_code)
        else:
            return (post_code)

In [87]:
# Code to modify city names if required before saving to excel files

mapping_city_name = {           
            "Saint John's": "St. John's",
            "St john's": "St. John's",
            'St. John': "St. John's",
            "St. John's": "St. John's",
            'St. John´s': "St. John's",                   
            "st. John's": "St. John's",
            "st. john's": "St. John's",
            "Town of Portugal Cove - St. Philip's":"Portugal Cove-St. Philip's",
            "St. Phillips":"Portugal Cove-St. Philip's" ,
            "Portugal Cove - St. Philips": "Portugal Cove-St. Philip's",
            "Portugal Cove-St. Philip’s": "Portugal Cove-St. Philip's",
            'PORTUGAL COVE-ST PHILIPS': "Portugal Cove-St. Philip's",
            'St. Phillips': "Portugal Cove-St. Philip's"   
           
            }


def is_city_name(elem):
    return (elem.attrib['k'] == "addr:city")


def update_city_names(city_name,mapping):
    if city_name in mapping:        
        city_name = mapping[city_name]
    return city_name

In [88]:

def shape_element(element, node_attr_fields=NODE_FIELDS, way_attr_fields=WAY_FIELDS,
                  problem_chars=PROBLEMCHARS, default_tag_type='regular'):
    """Clean and shape node or way XML element to Python dict"""

   
    node_attribs = {}
    way_attribs = {}
    way_nodes = []
    tags = []  # Handle secondary tags the same way for both node and way elements
    
    
    
    if element.tag == 'node':
        for attr_name in node_attr_fields:
            if attr_name in element.attrib:
                node_attribs[attr_name] = {}
                node_attribs[attr_name] = element.attrib[attr_name]
        
        for tag in element.iter('tag'):
                            
                tag_dict = {}
                tag_dict["id"] = int(node_attribs["id"])
                tag_dict["key"] = tag.attrib["k"].split(":")[0]
                
                # Update citynames if required
                if is_city_name(tag):
                    city_name = tag.attrib["v"]
                    tag_dict["value"] = update_city_names(city_name, mapping_city_name)                            
                # Update post codes if required
                elif is_post_code(tag):
                    post_code = tag.attrib["v"]
                    tag_dict["value"] = update_post_code(post_code)                
                # Update Street names if required
                elif is_street_name(tag):
                    name = tag.attrib["v"]
                    tag_dict["value"] = update_name(name, mapping_street_names)
                else:
                    tag_dict["value"] = tag.attrib["v"]
                
                if len(tag.attrib["k"].split(":")) > 1:
                    tag_dict["type"] = tag.attrib["k"].split(":")[1]
                else:
                    tag_dict["type"] = "regular"                
                                  
                tags.append(tag_dict)          
                
        #pprint.pprint ({'node': node_attribs, 'node_tags': tags})
        return {'node': node_attribs, 'node_tags': tags}
        
    elif element.tag == 'way':
        for attr_name in  way_attr_fields:
            if attr_name in element.attrib:
                way_attribs[attr_name] = {}
                way_attribs[attr_name] = element.attrib[attr_name]
        
        i=0
        for nd in element.iter('nd'):
                way_node_dict = {}
                way_node_dict["id"] = way_attribs["id"]
                way_node_dict["node_id"] = nd.attrib["ref"]
                way_node_dict["position"] = i
                i += 1
                way_nodes.append(way_node_dict) 
                
        for tag in element.iter('tag'):
                tag_dict = {}
                tag_dict["id"] = int(way_attribs["id"])
                tag_dict["key"] = tag.attrib["k"].split(":")[0]
                
                # Update citynames if required
                if is_city_name(tag):
                    city_name = tag.attrib["v"]
                    tag_dict["value"] = update_city_names(city_name, mapping_city_name) 
                # Update post codes if required
                elif is_post_code(tag):
                    post_code = tag.attrib["v"]
                    tag_dict["value"] = update_post_code(post_code)                
                # Update Street names if required
                elif is_street_name(tag):
                    name = tag.attrib["v"]
                    tag_dict["value"] = update_name(name,mapping_street_names)
                else:
                    tag_dict["value"] = tag.attrib["v"]
                
                if len(tag.attrib["k"].split(":")) > 1:
                    tag_dict["type"] = tag.attrib["k"].split(":")[1]
                else:
                    tag_dict["type"] = "regular"
                
                tags.append(tag_dict) 
        
        #pprint.pprint ({'way': way_attribs, 'way_nodes': way_nodes, 'way_tags': tags})        
        return {'way': way_attribs, 'way_nodes': way_nodes, 'way_tags': tags}

In [89]:
# ================================================== #
#               Helper Functions                     #
# ================================================== #
def get_element(osm_file, tags=('node', 'way', 'relation')):
    """Yield element if it is the right type of tag"""

    context = ET.iterparse(osm_file, events=('start', 'end'))
    _, root = next(context)
    for event, elem in context:
        if event == 'end' and elem.tag in tags:
            yield elem
            root.clear()


def validate_element(element, validator, schema=SCHEMA):
    """Raise ValidationError if element does not match schema"""
    if validator.validate(element, schema) is not True:
        field, errors = next(validator.errors.items())
        message_string = "\nElement of type '{0}' has the following errors:\n{1}"
        error_string = pprint.pformat(errors)
        
        raise Exception(message_string.format(field, error_string))




In [90]:


# ================================================== #
#               Main Function                        #
# ================================================== #
def process_map(file_in, validate):
    """Iteratively process each XML element and write to csv(s)"""

    with io.open(NODES_PATH, "w", encoding='utf8', newline='') as nodes_file, \
         open(NODE_TAGS_PATH, 'w', encoding='utf8', newline='') as nodes_tags_file, \
         codecs.open(WAYS_PATH, 'w', encoding='utf-8') as ways_file, \
         codecs.open(WAY_NODES_PATH, 'w', encoding='utf-8') as way_nodes_file, \
         codecs.open(WAY_TAGS_PATH, 'w', encoding='utf-8') as way_tags_file:        

        
        nodes_writer = csv.DictWriter(nodes_file,delimiter=",", fieldnames= NODE_FIELDS)
        node_tags_writer = csv.DictWriter(nodes_tags_file,delimiter=",", fieldnames= NODE_TAGS_FIELDS)
        ways_writer = csv.DictWriter(ways_file,delimiter=",", fieldnames= WAY_FIELDS)
        way_nodes_writer = csv.DictWriter(way_nodes_file,delimiter=",", fieldnames= WAY_NODES_FIELDS)
        way_tags_writer = csv.DictWriter(way_tags_file,delimiter=",", fieldnames= WAY_TAGS_FIELDS)

        nodes_writer.writeheader()
        node_tags_writer.writeheader()
        ways_writer.writeheader()
        way_nodes_writer.writeheader()
        way_tags_writer.writeheader()

        validator = cerberus.Validator()

        for element in get_element(file_in, tags=('node', 'way')):
            el = shape_element(element)
            if el:
                if validate is True:
                    validate_element(el, validator)

                if element.tag == 'node':                    
                    nodes_writer.writerow(el['node']) 
                    for row in el['node_tags']:
                        node_tags_writer.writerow(row)
                    
                elif element.tag == 'way':
                    ways_writer.writerow(el['way'])
                    for row in el['way_nodes']:
                        way_nodes_writer.writerow(row)
                    for row in el['way_tags']:
                        way_tags_writer.writerow(row)


if __name__ == '__main__':
    # Note: Validation is ~ 10X slower. For the project consider using a small
    # sample of the map when validating.
    process_map(OSM_PATH, validate=True)