# from OSM to CSV files

In [6]:
import os, sys

PROJECT_PATH = "C:\\Users\\TO72078\\Documents\\BIG_DATA\\UDACITY\\projects\\openstreetmap"
OSM_NAME = "toulouse"
OSM_PATH = os.path.join(PROJECT_PATH, '%s.osm' % OSM_NAME)

In [7]:
import cerberus
import sys
sys.path.append(PROJECT_PATH)
import schema
SCHEMA = schema.schema

In [8]:
# checking OSM file size
# must be larger than 50Mb as required by the project specification
# must be small enough for the sake of project effiency (e.g. 5Gb)
osm_size = os.path.getsize(OSM_PATH)
print 'OSM uncompressed file size is %.0fMb' % (osm_size/1.e6)

OSM uncompressed file size is 460Mb


In [11]:
# limits validation step to small test files
validateBool = True if osm_size < 5e6 else False

In [12]:
import csv
import codecs
import pprint
import re
import xml.etree.cElementTree as ET


NODES_PATH = os.path.join(PROJECT_PATH, "%s_nodes.csv" % OSM_NAME)
NODE_TAGS_PATH = os.path.join(PROJECT_PATH, "%s_node_tags.csv" % OSM_NAME)
WAYS_PATH = os.path.join(PROJECT_PATH, "%s_ways.csv" % OSM_NAME)
WAY_NODES_PATH = os.path.join(PROJECT_PATH, "%s_way_nodes.csv" % OSM_NAME)
WAY_TAGS_PATH = os.path.join(PROJECT_PATH, "%s_way_tags.csv" % OSM_NAME)

LOWER_COLON = re.compile(r'^([a-z]|_)+:([a-z]|_)+')
PROBLEMCHARS = re.compile(r'[=\+/&<>;\'"\?%#$@\,\. \t\r\n]')

# Make sure the fields order in the csvs matches the column order in the sql table schema
NODE_FIELDS = ['id', 'lat', 'lon', 'user', 'uid', 'version', 'changeset', 'timestamp']
NODE_TAGS_FIELDS = ['id', 'key', 'value', 'type', 'valid']
WAY_FIELDS = ['id', 'user', 'uid', 'version', 'changeset', 'timestamp']
WAY_TAGS_FIELDS = ['id', 'key', 'value', 'type', 'valid']
WAY_NODES_FIELDS = ['id', 'node_id', 'position']


def shape_element(element, node_attr_fields=NODE_FIELDS, way_attr_fields=WAY_FIELDS,
                      problem_chars=PROBLEMCHARS, default_tag_type='regular'):
    """Clean and shape node or way XML element to Python dict"""

    node_attribs = {}
    way_attribs = {}
    way_nodes = []
    tags = []  # Handle secondary tags the same way for both node and way elements
    
    # get element attributes
    
    NODE_FIELDS_TYPES = [int, float, float, str_encode, int, str_encode, int, str_encode]
    WAY_FIELDS_TYPES  = [int, str_encode, int, str_encode, int, str_encode]
    SEARCH_LOWER_BEFORE_FIRSTCOLON = re.compile(r'^([a-z_]+):([a-z_:]+)')
    
    
    if  element.tag == 'node':
        for i, attr in enumerate(node_attr_fields):
            node_attribs[attr] = apply(NODE_FIELDS_TYPES[i], [element.attrib[attr]])
            
    elif element.tag == 'way':
        for i, attr in enumerate(way_attr_fields):
            way_attribs[attr] = apply(WAY_FIELDS_TYPES[i], [element.attrib[attr]])
            
        position = 0
        for way_node in element.iter("nd"):
            way_node_attribs = {}
            way_node_attribs['id'] = int(element.attrib['id'])
            way_node_attribs['node_id'] = int(way_node.attrib['ref'])
            way_node_attribs['position'] = int(position)
            way_nodes.append(way_node_attribs)
            position += 1
    
    for tag in element.iter("tag"):
            tag_attribs = dict()            
            tag_attribs['id'] = int(element.attrib['id'])
            keySearch = SEARCH_LOWER_BEFORE_FIRSTCOLON.search(tag.attrib['k'])
            if keySearch:
                tag_type = keySearch.group(1)
                tag_key  = keySearch.group(2)
            else:
                tag_type = default_tag_type
                tag_key = tag.attrib['k']
            tag_attribs['key'] = tag_key
            tag_attribs['value'] = tag.attrib['v']
            tag_attribs['type'] = tag_type
            tag_attribs['valid'] = 'no' if problem_chars.match(tag.attrib['k']) else 'yes'
            tags.append(tag_attribs)
        
    if element.tag == 'node':
        return {'node': node_attribs, 'node_tags': tags}
    elif element.tag == 'way':
        return {'way': way_attribs, 'way_tags': tags, 'way_nodes': way_nodes}


# ================================================== #
#               Helper Functions                     #
# ================================================== #
def get_element(osm_file, tags=('node', 'way', 'relation')):
    """Yield element if it is the right type of tag"""

    context = ET.iterparse(osm_file, events=('start', 'end'))
    _, root = next(context)
    for event, elem in context:
        if event == 'end' and elem.tag in tags:
            yield elem
            root.clear()

def validate_element(element, validator, schema=SCHEMA):
    """Raise ValidationError if element does not match schema"""
    if validator.validate(element, schema) is not True:
        field, errors = next(validator.errors.iteritems())
        message_string = "\nElement of type '{0}' has the following errors:\n{1}"
        error_string = pprint.pformat(errors)
        
        raise Exception(message_string.format(field, error_string))


def str_encode(v):
    """Return string object properly encoded if necessary"""
    return v.encode('utf-8') if isinstance(v, unicode) else str(v)

class UnicodeDictWriter(csv.DictWriter, object):
    """Extend csv.DictWriter to handle Unicode input"""

    def writerow(self, row):
        super(UnicodeDictWriter, self).writerow({
            k: str_encode(v) for k, v in row.iteritems()
        })

    def writerows(self, rows):
        for row in rows:
            self.writerow(row)


# ================================================== #
#               Main Function                        #
# ================================================== #
def process_map(file_in, validate):
    """Iteratively process each XML element and write to csv(s)"""

    with codecs.open(NODES_PATH, 'w') as nodes_file, \
         codecs.open(NODE_TAGS_PATH, 'w') as nodes_tags_file, \
         codecs.open(WAYS_PATH, 'w') as ways_file, \
         codecs.open(WAY_NODES_PATH, 'w') as way_nodes_file, \
         codecs.open(WAY_TAGS_PATH, 'w') as way_tags_file:

        nodes_writer = UnicodeDictWriter(nodes_file, NODE_FIELDS)
        node_tags_writer = UnicodeDictWriter(nodes_tags_file, NODE_TAGS_FIELDS)
        ways_writer = UnicodeDictWriter(ways_file, WAY_FIELDS)
        way_nodes_writer = UnicodeDictWriter(way_nodes_file, WAY_NODES_FIELDS)
        way_tags_writer = UnicodeDictWriter(way_tags_file, WAY_TAGS_FIELDS)

        nodes_writer.writeheader()
        node_tags_writer.writeheader()
        ways_writer.writeheader()
        way_nodes_writer.writeheader()
        way_tags_writer.writeheader()

        validator = cerberus.Validator()

        for element in get_element(file_in, tags=('node', 'way')):
            el = shape_element(element)
            if el:
                if validate is True:
                    validate_element(el, validator)
                if element.tag == 'node':
                    #if len(el['node_tags']) > 0:
                    #    print '##### node #####'
                    #    pprint.pprint(el['node'])
                    #    print '***** tags *****'                        
                    #    pprint.pprint(el['node_tags'])
                    nodes_writer.writerow(el['node'])
                    node_tags_writer.writerows(el['node_tags'])
                elif element.tag == 'way':
                    #if len(el['way_tags']) > 0:
                    #    print '##### way #####'
                    #    pprint.pprint(el['way'])
                    #    print '***** tags *****'
                    #    pprint.pprint(el['way_tags'])
                    ways_writer.writerow(el['way'])
                    way_nodes_writer.writerows(el['way_nodes'])
                    way_tags_writer.writerows(el['way_tags'])


if __name__ == '__main__':
    # Note: Validation is ~ 10X slower. For the project consider using a small
    # sample of the map when validating.
    process_map(OSM_PATH, validate=validateBool)
