In [1]:
#Import necessary libraries
import xml.etree.cElementTree as ET
import pprint
import re
from collections import defaultdict
import csv
import codecs
import cerberus
import sqlite3
import schema

In [2]:
#!/usr/bin/env python

OSM_FILE = "nashville.osm"  # Replace this with your osm file
SAMPLE_FILE = "nashville_sample.osm"

k = 15 # Parameter: take every k-th top level element
def get_element(osm_file, tags=('node', 'way', 'relation')):
    context = iter(ET.iterparse(osm_file, events=('start', 'end')))
    _, root = next(context)
    for event, elem in context:
        if event == 'end' and elem.tag in tags:
            yield elem
            root.clear()
            
with open(SAMPLE_FILE, 'wb') as output:
    output.write(bytes('<?xml version="1.0" encoding="UTF-8"?>\n', encoding='utf-8'))
    output.write(bytes('<osm>\n  ', encoding = 'utf-8'))

    # Write every kth top level element
    for i, element in enumerate(get_element(OSM_FILE)):
        if i % k == 0:
            output.write(ET.tostring(element, encoding='utf-8'))

    output.write(bytes('</osm>', encoding = 'utf-8'))

In [3]:
nashville = "nashville_sample.osm"

In [4]:
def count_tags(filename):
    tree = ET.parse(filename)
    root = tree.getroot()
    tag_list = {}
    for row in root.iter():
        if row.tag not in tag_list:
            tag_list[row.tag] = 1
        else:
            tag_list[row.tag] +=1
    return tag_list

In [5]:
count_tags(nashville)

{'osm': 1,
 'node': 22642,
 'tag': 10399,
 'way': 3019,
 'nd': 26064,
 'relation': 34,
 'member': 1914}

In our sample data of Nashville, we have 3019 ways and 10399 tags we'll be mostly focusing on.

In [6]:
#regular expressions
lower = re.compile(r'^([a-z]|_)*$')
lower_colon = re.compile(r'^([a-z]|_)*:([a-z]|_)*$')
problemchars = re.compile(r'[=\+/&<>;\'"\?%#$@\,\. \t\r\n]')

#count the key types
def key_type(element, keys):
    if element.tag == "tag":
        if lower.search(element.attrib["k"]):
            keys["lower"] += 1
        elif lower_colon.search(element.attrib["k"]):
            keys["lower_colon"] += 1
        elif problemchars.search(element.attrib["k"]):
            keys["problemchars"] += 1
        else:
            keys["other"] +=1
        
    return keys

#locate the colon in the tag
def find_colon(element, c_list):
    if element.tag == 'tag':
        if lower_colon.search(element.attrib["k"]):
            if element.attrib["k"] not in c_list:
                c_list[element.attrib["k"]] = 1
            else:
                c_list[element.attrib["k"]] += 1
    return c_list

#save the tags into a couple of lists, one to count and one to list them all
def process_map(filename):
    keys = {"lower": 0, "lower_colon": 0, "problemchars": 0, "other": 0}
    colon_list = {}
    for _, element in ET.iterparse(filename):
        keys = key_type(element, keys)
        colon_list = find_colon(element, colon_list)
    return keys, colon_list

In [7]:
keys = process_map(nashville)
pprint.pprint(keys)

({'lower': 7110, 'lower_colon': 3076, 'other': 213, 'problemchars': 0},
 {'addr:city': 41,
  'addr:country': 1,
  'addr:housenumber': 53,
  'addr:postcode': 51,
  'addr:state': 40,
  'addr:street': 54,
  'addr:suite': 1,
  'addr:unit': 2,
  'brand:wikidata': 26,
  'brand:wikipedia': 26,
  'building:color': 1,
  'building:levels': 17,
  'building:material': 1,
  'building:part': 2,
  'building:roof': 1,
  'building:units': 1,
  'change:lanes': 2,
  'cycleway:left': 2,
  'cycleway:right': 1,
  'demolished:highway': 1,
  'destination:ref': 12,
  'destination:street': 4,
  'generator:source': 1,
  'gnis:county_id': 44,
  'gnis:county_name': 6,
  'gnis:created': 44,
  'gnis:feature_id': 55,
  'gnis:id': 2,
  'gnis:import_uuid': 6,
  'gnis:reviewed': 6,
  'gnis:state_id': 44,
  'hgv:national_network': 1,
  'historic:name': 1,
  'internet_access:fee': 1,
  'internet_access:ssid': 1,
  'is_in:state': 3,
  'junction:ref': 8,
  'lanes:backward': 10,
  'lanes:both_ways': 4,
  'lanes:forward': 10,

No problem characters is a relief but let's see if we can find some other errors in our data. Sadly, I went through all of the "addr:street" tags looking for errors and only found one. **Rosa Parks Blvd** used the abbreviated version of Boulevard. This is an easily correctable fix (even manually!) but we'll continue on with our corrective functions to finish out our analysis. This data looks clean overall - postal codes, street names, abbreviations, street directions - all look in good shape. The open street map users for the city of Nashville have been busy!

In [8]:
osmfile = "nashville_sample.osm"
street_type_re = re.compile(r'\b\S+\.?$', re.IGNORECASE)

expected = ["Street", "Avenue", "Boulevard", "Drive", "Court", "Place", "Square", "Circle", "Lane", 
            "Road", "Trail", "Parkway", "Expressway", "Highway", "Tunnel", "Park", "Plaza", "Pike", "Bridge", "School"]

# This will look for the values on the left and ultimately replace with those on the right
mapping = { "St": "Street",
            "St.": "Street",
            "Hwy": "Highway",
            "Hwy.": "Highway",
           #This should rename Rosa L Parks Blvd => Rosa L Parks Boulevard
            "Blvd": "Boulevard",
            "Blvd.": "Boulevard",
            "Rd.": "Road",
            "Rd": "Road",
            "Pkwy": "Parkway",
            "PKWY": "Parkway",
            "Ave": "Avenue",
            "Ave.": "Avenue",
            "S": "South",
            "N": "North",
            "W": "West",
            "E": "East",
            "S.": "South",
            "N.": "North",
            "W.": "West",
            "E.": "East",
            }


def audit_street_type(street_types, street_name):
    m = street_type_re.search(street_name)
    if m:
        street_type = m.group()
        if street_type not in expected:
            street_types[street_type].add(street_name)

def audit(osmfile):
    osm_file = open(osmfile, "r")
    street_types = defaultdict(set)
    for _, elem in ET.iterparse(osmfile):
        if elem.tag == "node" or elem.tag == "way":
            for tag in elem.iter("tag"):
                audit_street_type(street_types, tag.attrib['v'])
                    
    osm_file.close()
    return street_types

In [9]:
def update_name(name, mapping):
    m = street_type_re.search(name)
    fixed_name = name
    if m:
        # check if the street type is a key in the mapping dictionary:
        if m.group() in mapping.keys():
            fixed_street_type = mapping[m.group()]
            fixed_name = street_type_re.sub(fixed_street_type, name)
    return fixed_name

In [10]:
def test():
    st_types = audit(osmfile)
    #pprint.pprint(dict(st_types))

    for st_type, ways in st_types.items():
        for name in ways:
            if name == "Rosa L Parks Blvd":
                fixed_name = update_name(name, mapping)
                print(name, "=>", fixed_name)
            
if __name__ == '__main__':
    test()

Rosa L Parks Blvd => Rosa L Parks Boulevard


We can see that the **Blvd** has been extended to Boulevard. We have completed our 1 fix! Hoorah!

In [11]:
#We're using the sample data for faster load and validation times
OSM_PATH = "nashville_sample.osm"

NODES_PATH = "nodes.csv"
NODE_TAGS_PATH = "nodes_tags.csv"
WAYS_PATH = "ways.csv"
WAY_NODES_PATH = "ways_nodes.csv"
WAY_TAGS_PATH = "ways_tags.csv"

LOWER_COLON = re.compile(r'^([a-z]|_)+:([a-z]|_)+')
PROBLEMCHARS = re.compile(r'[=\+/&<>;\'"\?%#$@\,\. \t\r\n]')

SCHEMA = schema.schema

# Make sure the fields order in the csvs matches the column order in the sql table schema
NODE_FIELDS = ['id', 'lat', 'lon', 'user', 'uid', 'version', 'changeset', 'timestamp']
NODE_TAGS_FIELDS = ['id', 'key', 'value', 'type']
WAY_FIELDS = ['id', 'user', 'uid', 'version', 'changeset', 'timestamp']
WAY_TAGS_FIELDS = ['id', 'key', 'value', 'type']
WAY_NODES_FIELDS = ['id', 'node_id', 'position']


def shape_element(element, node_attr_fields=NODE_FIELDS, way_attr_fields=WAY_FIELDS,
                  problem_chars=PROBLEMCHARS, default_tag_type='regular'):
    """Clean and shape node or way XML element to Python dict"""

    node_attribs = {}
    way_attribs = {}
    way_nodes = []
    tags = []  # Handle secondary tags the same way for both node and way elements
    
    if element.tag == 'node':
        for key in element.attrib.keys():
            if key in node_attr_fields:
                node_attribs[key] = element.attrib[key]
        for child in element:
            if child.tag == 'tag':
                if problem_chars.search(child.attrib['k']):
                    pass
                else:
                    tags_list = {}
                    tags_list['id'] = element.attrib['id']
                    if child.attrib['k'] == 'name:en':
                        tags_list['value'] = update_name(child.attrib['v'], mapping)
                    else:
                        tags_list['value'] = child.attrib['v']
                    if LOWER_COLON.search(child.attrib['k']):
                        colon_position = child.attrib['k'].find(':')
                        tags_list['key'] = child.attrib['k'][colon_position+1:]
                        tags_list['type'] = child.attrib['k'][:colon_position]
                    else:
                        tags_list['key'] = child.attrib['k']
                        tags_list['type'] = 'regular'
                    tags.append(tags_list)    
        
    if element.tag == 'way':
        for key in element.attrib.keys():
            if key in way_attr_fields:
                way_attribs[key] = element.attrib[key]
        position = 0
        for child in element:
            
            if child.tag == 'nd':
                way_nodes_list = {}
                way_nodes_list['id'] = element.attrib['id']
                way_nodes_list['node_id'] = child.attrib['ref']
                way_nodes_list['position'] = position
                position += 1
                way_nodes.append(way_nodes_list)
            if child.tag == 'tag':
                if problem_chars.search(child.attrib['k']):
                    pass
                else:
                    tags_list = {}
                    tags_list['id'] = element.attrib['id']
                    if child.attrib['k'] == 'name:en':
                        tags_list['value'] = update_name(child.attrib['v'], mapping)
                    else:
                        tags_list['value'] = child.attrib['v']
                    if LOWER_COLON.search(child.attrib['k']):
                        colon_position = child.attrib['k'].find(':')
                        tags_list['key'] = child.attrib['k'][colon_position+1:]
                        tags_list['type'] = child.attrib['k'][:colon_position]
                    else:
                        tags_list['key'] = child.attrib['k']
                        tags_list['type'] = 'regular'
                    tags.append(tags_list)  
            
    if element.tag == 'node':
        return {'node': node_attribs, 'node_tags': tags}
    elif element.tag == 'way':
        return {'way': way_attribs, 'way_nodes': way_nodes, 'way_tags': tags}


# ================================================== #
#               Helper Functions                     #
# ================================================== #
def get_element(osm_file, tags=('node', 'way', 'relation')):
    """Yield element if it is the right type of tag"""

    context = ET.iterparse(osm_file, events=('start', 'end'))
    _, root = next(context)
    for event, elem in context:
        if event == 'end' and elem.tag in tags:
            yield elem
            root.clear()


def validate_element(element, validator, schema=SCHEMA):
    """Raise ValidationError if element does not match schema"""
    if validator.validate(element, schema) is not True:
        field, errors = next(validator.errors.items())
        message_string = "\nElement of type '{0}' has the following errors:\n{1}"
        error_string = pprint.pformat(errors)
        
        raise Exception(message_string.format(field, error_string))


class UnicodeDictWriter(csv.DictWriter, object):
    """Extend csv.DictWriter to handle Unicode input"""

    def writerow(self, row):
        super(UnicodeDictWriter, self).writerow({
            k: (v.encode('utf-8') if isinstance(v, bytes) else v) for k, v in row.items()
        })

    def writerows(self, rows):
        for row in rows:
            self.writerow(row)


# ================================================== #
#               Main Function                        #
# ================================================== #
def process_map(file_in, validate):
    """Iteratively process each XML element and write to csv(s)"""
    

    with codecs.open(NODES_PATH, 'w', encoding="utf-8") as nodes_file, \
        codecs.open(NODE_TAGS_PATH, 'w', encoding="utf-8") as nodes_tags_file, \
        codecs.open(WAYS_PATH, 'w', encoding="utf-8") as ways_file, \
        codecs.open(WAY_NODES_PATH, 'w', encoding="utf-8") as way_nodes_file, \
        codecs.open(WAY_TAGS_PATH, 'w', encoding="utf-8") as way_tags_file:

        nodes_writer = UnicodeDictWriter(nodes_file, NODE_FIELDS)
        node_tags_writer = UnicodeDictWriter(nodes_tags_file, NODE_TAGS_FIELDS)
        ways_writer = UnicodeDictWriter(ways_file, WAY_FIELDS)
        way_nodes_writer = UnicodeDictWriter(way_nodes_file, WAY_NODES_FIELDS)
        way_tags_writer = UnicodeDictWriter(way_tags_file, WAY_TAGS_FIELDS)

        nodes_writer.writeheader()
        node_tags_writer.writeheader()
        ways_writer.writeheader()
        way_nodes_writer.writeheader()
        way_tags_writer.writeheader()

        validator = cerberus.Validator()

        for element in get_element(file_in, tags=('node', 'way')):
            el = shape_element(element)
            if el:
                if validate is True:
                    validate_element(el, validator)

                if element.tag == 'node':
                    nodes_writer.writerow(el['node'])
                    node_tags_writer.writerows(el['node_tags'])
                elif element.tag == 'way':
                    ways_writer.writerow(el['way'])
                    way_nodes_writer.writerows(el['way_nodes'])
                    way_tags_writer.writerows(el['way_tags'])



# Note: Validation is ~ 10X slower. For the project consider using a small
# sample of the map when validating.
    
process_map(OSM_PATH, validate=True)
print('Successfully Processed {} Map'.format(OSM_PATH[:-4]))

Successfully Processed nashville_sample Map


Now that our five (5) csv files have been written using the provided schema, we can load them into a sqlite database to perform some further analysis.

In [12]:
osmdb = 'nashville_osm.db'

connection = sqlite3.connect(osmdb)
write_cursor = connection.cursor()
write_cursor.execute('''
                    CREATE TABLE nodes(id INTEGER, lat TEXT, lon TEXT, user TEXT, uid INTEGER, version TEXT, changeset TEXT, timestamp TEXT)''')

connection.commit()

with open('nodes.csv', 'r', encoding="utf-8") as csvfile:
    middleman = csv.DictReader(csvfile) # comma is default delimiter
    to_db = [(i['id'], i['lat'], i['lon'], i['user'], i["uid"], i["version"],i["changeset"],i["timestamp"]) for i in middleman]

write_cursor.executemany("INSERT INTO nodes (id, lat, lon, user, uid, version, changeset, timestamp) VALUES (?,?,?,?,?,?,?,?);", to_db)

connection.commit()
connection.close()


In [13]:
connection = sqlite3.connect(osmdb)
write_cursor = connection.cursor()
write_cursor.execute('''
                    CREATE TABLE nodes_tags(id INTEGER, key TEXT, value TEXT, type TEXT)''')

connection.commit()

with open('nodes_tags.csv', 'r', encoding="utf-8") as csvfile:
    middleman = csv.DictReader(csvfile) # comma is default delimiter
    to_db = [(i['id'], i['key'], i['value'], i['type']) for i in middleman]

write_cursor.executemany("INSERT INTO nodes_tags(id, key, value, type) VALUES (?,?,?,?);", to_db)

connection.commit()
connection.close()

In [14]:
connection = sqlite3.connect(osmdb)
write_cursor = connection.cursor()
write_cursor.execute('''
                    CREATE TABLE ways(id INTEGER, user TEXT, uid TEXT, version TEXT, changeset TEXT, timestamp TEXT)''')

connection.commit()

with open('ways.csv', 'r', encoding="utf-8") as csvfile:
    middleman = csv.DictReader(csvfile) # comma is default delimiter
    to_db = [(i['id'], i['user'], i['uid'], i['version'], i["changeset"], i["timestamp"]) for i in middleman]

write_cursor.executemany("INSERT INTO ways (id, user, uid, version, changeset, timestamp) VALUES (?,?,?,?,?,?);", to_db)

connection.commit()
connection.close()

In [15]:
connection = sqlite3.connect(osmdb)
write_cursor = connection.cursor()
write_cursor.execute('''
                    CREATE TABLE ways_tags(id INTEGER, key TEXT, value TEXT, type TEXT)''')

connection.commit()

with open('ways_tags.csv', 'r', encoding="utf-8") as csvfile:
    middleman = csv.DictReader(csvfile) # comma is default delimiter
    to_db = [(i['id'], i['key'], i['value'], i['type']) for i in middleman]

write_cursor.executemany("INSERT INTO ways_tags(id, key, value, type) VALUES (?,?,?,?);", to_db)

connection.commit()
connection.close()

In [16]:
connection = sqlite3.connect(osmdb)
write_cursor = connection.cursor()
write_cursor.execute('''
                    CREATE TABLE ways_nodes(id INTEGER, node_id INTEGER, position INTEGER)''')

connection.commit()

with open('ways_nodes.csv', 'r', encoding="utf-8") as csvfile:
    middleman = csv.DictReader(csvfile) # comma is default delimiter
    to_db = [(i['id'], i['node_id'], i['position']) for i in middleman]

write_cursor.executemany("INSERT INTO ways_nodes(id, node_id, position) VALUES (?,?,?);", to_db)

connection.commit()
connection.close()

Now let's explore our newly created database (nashville_osm.db) and some of our new tables we created.

**File Sizes:**<br>
nashville.osm => 75,813 KB<br>
nashville_sample.osm => 5,104 KB<br>
nodes.csv => 1,880 KB<br>
nodes_tags.csv => 60 KB<br>
ways.csv => 179 KB<br>
ways_nodes.csv => 614 KB<br>
ways_tags.csv => 292 KB



**Number of Nodes**<br>
sqlite > SELECT COUNT(*) FROM NODES;<br>
***22642***<br>
<br>
**Number of Ways**<br>
sqlite > SELECT COUNT(*) FROM WAYS;<br>
***3019***<br>
<br>

**Number of Unique Users**<br>
sqlite > SELECT COUNT(DISTINCT(e.uid))<br>
FROM (SELECT uid FROM nodes UNION ALL SELECT uid FROM ways) e;<br>
***1044***

**Top 10 Contributing Users**<br>
sqlite> SELECT e.user, COUNT(*) as num<br>
FROM (SELECT user FROM nodes UNION ALL SELECT user FROM ways) e<br>
GROUP BY e.user<br>
ORDER BY num DESC<br>
LIMIT 10;<br>
<br>
>Gedwards724 | 10706<br>
>wward | 3135<br>
>woodpeck_fixbot | 726<br>
>bobby22 | 637<br>
>Tom_Holland | 629<br>
>greggerm | 381<br>
>ChesterKiwi | 361<br>
>maxerickson | 360<br>
>42429 | 312<br>
>Rub21 | 254<br>

**Top 10 Amenities in Nashville**<br>
sqlite> SELECT value, COUNT(*) as num<br>FROM nodes_tags<br>WHERE key='amenity'<br>GROUP BY value<br>ORDER BY num DESC<br> LIMIT 10;<br>
<br>
> place_of_worship | 40 <= Not a surprise! <br>
> restaurant | 10<br>
> school | 8<br>
> cafe | 7<br>
> fast_food | 6<br>
> fountain | 5<br>
> bench | 5<br>
> bar | 5<br>
> fuel | 4<br>
> pub | 2<br>

### References:
https://stackoverflow.com/questions/15356641/how-to-write-xml-declaration-using-xml-etree-elementtree<br>
https://stackoverflow.com/questions/30418481/error-dict-object-has-no-attribute-iteritems<br>
https://stackoverflow.com/questions/53089403/how-to-fix-python3-errror-name-unicode-is-not-defined<br>
https://stackoverflow.com/questions/28583565/str-object-has-no-attribute-decode-python-3-error<br>
https://medium.com/ibm-data-science-experience/markdown-for-jupyter-notebooks-cheatsheet-386c05aeebed
