# OpenStreet Data Wrangling with Python and SQL
### by Sergei Neviadomski

## Project Summary
### Map area: Pittsburgh, PA, United States

https://mapzen.com/data/metro-extracts/metro/pittsburgh_pennsylvania/

This map is of place where I currently live. I'd like to explore open-source map of this area, reveal some inconsistencies in data and contribute to its improvement on OpenStreetMap.org

## Data auditing and processing

In this section I want to solve typical problem of openstreen data.  That is street abbreviation inconsistency. I used regular expressions to catch this abbreviations and to build mapping for solving this problem. 

In [1]:
# Importing needed libraries
import xml.etree.cElementTree as ET
from collections import defaultdict
import re
import pprint
import csv
import codecs
import cerberus
import schema
import sqlite3


In [2]:
# OSM file
osm_file = "pittsburgh_pennsylvania.osm"

# re function
street_type_re = re.compile(r'\b\S+\.?$', re.IGNORECASE)

# Expected street types list
expected = ["Street", "Avenue", "Boulevard", "Drive", "Court", "Place",
            "Square", "Lane", "Road", "Trail", "Parkway", "Commons", "Alley",
            "Bridge", "Highway", "Circle", "Terrace", "Way"]

# Adding street names in dictionary by type
def audit_street_type(street_types, street_name):
    m = street_type_re.search(street_name)
    if m:
        street_type = m.group()
        if street_type not in expected:
            street_types[street_type].add(street_name)

# Checking tag for street data content 
def is_street_name(elem):
    return (elem.attrib['k'] == "addr:street")

# Auditing file for different abbreviations of same street types 
def audit(osmfile):
    osm_file = open(osmfile, "r")    
    street_types = defaultdict(set)
    for event, elem in ET.iterparse(osm_file, events=("start",)):
        if elem.tag == "node" or elem.tag == "way":
            for tag in elem.iter("tag"):
                if is_street_name(tag):
                    audit_street_type(street_types, tag.attrib['v'])
    osm_file.close()
    return street_types

In [3]:
# Auditing our osm file
st_types = audit(osm_file)

# Printing results 
pprint.pprint(dict(st_types))

{'18': set(['PA 18', 'Route 18']),
 '19': set(['Route 19', 'US 19']),
 '202': set(['Wilkins Avenue #202']),
 '217': set(['217']),
 '228': set(['Pennsylvania 228', 'Pennylvania 228', 'State Route 228']),
 '30': set(['Route 30', 'State Route 30', 'U.S. 30']),
 '400': set(['West Kensinger Drive #400']),
 '51': set(['Route 51', 'State Route 51']),
 '519': set(['PA 519', 'Route 519']),
 '8': set(['Route 8']),
 '837': set(['Route 837']),
 '885': set(['Route 885']),
 '910': set(['Route 910']),
 'Allegheny': set(['South Allegheny']),
 'Allies': set(['Boulevard of the Allies']),
 'Automotive': set(['California Automotive']),
 'Av.': set(['Fifth Av.']),
 'Ave': set(['5th Ave',
             '6th Ave',
             'Arlington Ave',
             'Atlantic Ave',
             'Center Ave',
             'Centre Ave',
             'Elizabeth Ave',
             'Fifth Ave',
             'Forbes Ave',
             'Friendship Ave',
             'Glenn Ave',
             'Highland Ave',
             'Libe

After scrutinizing audit results I've built mapping of abbreviations.

In [4]:
mapping = { "St": "Street",
            "St.": "Street",
            "ST": "Street",
            "Ave": "Avenue",
            "Ave.": "Avenue",
            "Av.": "Avenue",
            "Av": "Avenue",
            "Sq": "Square",
            "CT": "Court",
            "Ct": "Court",
            "DR": "Drive",
            "Dr": "Drive",
            "Dr.": "Drive", 
            "Rd.": "Road",
            "Rd": "Road",
            "Pl": "Place",
            "Hwy": "Highway",
            "Ln": "Lane",
            "Blvd": "Boulevard",
            "Blvd.": "Boulevard",
            "Brdg":"Bridge",
            "Ter": "Terrace" 
            }

Now It's time to modify our osm file by replacement of abbreviations by full names.

In [5]:
osm_file_new = "pittsburgh_pennsylvania_new.osm"

# This function replace abbrevition by full name in string
def update_name(name, mapping):
    words = name.split(' ')
    last_word = words[-1]
    if last_word in mapping.keys():
        name2 = name.replace(last_word, mapping[last_word])
        return name2
    return name

# Supporting function to get element from osm file
def get_element(osm_file, tags=('node', 'way', 'relation')):
    context = iter(ET.iterparse(osm_file, events=('start', 'end')))
    _, root = next(context)
    for event, elem in context:
        if event == 'end' and elem.tag in tags:
            yield elem
            root.clear()
            
# This function replace abbreviations in osm file
def modify(old_file, new_file):
    with open(new_file, 'wb') as output:
        output.write('<?xml version="1.0" encoding="UTF-8"?>\n')
        output.write('<osm>\n  ') 
        for i, element in enumerate(get_element(old_file)):
            for tag in element.iter("tag"):
                if is_street_name(tag):
                    tag.set('v',update_name(tag.attrib['v'], mapping))
            output.write(ET.tostring(element, encoding='utf-8'))
        output.write('</osm>')

# Modifying osm file
modify(osm_file, osm_file_new)

## Preparing for SQL database

After auditing is complete the next step is to prepare the data to be inserted into a SQL database. To do so you will parse the elements in the OSM XML file, transforming them from document format to tabular format, thus making it possible to write to .csv files. These csv files can then easily be imported to a SQL database as tables.

In [18]:
# Path to new csv files
NODES_PATH = "nodes.csv"
NODE_TAGS_PATH = "nodes_tags.csv"
WAYS_PATH = "ways.csv"
WAY_NODES_PATH = "ways_nodes.csv"
WAY_TAGS_PATH = "ways_tags.csv"

# Re functions
LOWER_COLON = re.compile(r'^([a-z]|_)+:([a-z]|_)+')
PROBLEMCH = re.compile(r'[=\+/&<>;\'"\?%#$@\,\. \t\r\n]')

# Importing schema for pransformation from schema.py file
SCHEMA = Schema.schema

# Fields of new csv files
# Make sure the fields order in the csvs matches the column order in the 
# sql table schema
NODE_FIELDS = ['id', 'lat', 'lon', 'user', 'uid', 'version',
               'changeset', 'timestamp']
NODE_TAGS_FIELDS = ['id', 'key', 'value', 'type']
WAY_FIELDS = ['id', 'user', 'uid', 'version', 'changeset', 'timestamp']
WAY_TAGS_FIELDS = ['id', 'key', 'value', 'type']
WAY_NODES_FIELDS = ['id', 'node_id', 'position']


# Main function for transformation of XML data Python dict
def shape_element(element, node_attr_fields=NODE_FIELDS,
                  way_attr_fields=WAY_FIELDS, prob_ch=PROBLEMCH,
                  default_tag_type='regular'):
    
    tag_attribs = {}
    way_nodes = []
    tags = []
    count = 0
    if element.tag == 'node':
        tagfields = node_attr_fields
    elif element.tag == 'way':
        tagfields = way_attr_fields
    
    if element.tag == 'node' or 'way':
        for attrib in element.attrib:
            if attrib in tagfields:
                tag_attribs[attrib] = element.attrib[attrib]
    for subelem in element:
        if subelem.tag == 'tag' and prob_ch.match(subelem.attrib['k']) == None:
            tag = {}
            tag['id'] = tag_attribs['id']
            tag['value'] = subelem.attrib['v']
            key = subelem.attrib['k']
            tag['key'] = key[key.find(':') + 1:]
            if ':' in key:
                tag['type'] = key[:key.find(':')]
            else:
                tag['type'] = default_tag_type
            tags.append(tag)
        elif subelem.tag == 'nd':
            way_node = {}
            way_node['id'] = tag_attribs['id']
            way_node['node_id'] = subelem.attrib['ref']
            way_node['position'] = count
            count += 1
            way_nodes.append(way_node)
    
    if element.tag == 'node':
        return {'node': tag_attribs, 'node_tags': tags}
    elif element.tag == 'way':
        return {'way': tag_attribs, 'way_nodes': way_nodes, 'way_tags': tags}
    
# Validate element to match schema
def validate_element(element, validator, schema=SCHEMA):
    """Raise ValidationError if element does not match schema"""
    if validator.validate(element, schema) is not True:
        field, errors = next(validator.errors.iteritems())
        message_string = "\nElement of type '{0}' has the following errors:\n{1}"
        error_strings = (
            "{0}: {1}".format(k, v if isinstance(v, str) else ", ".join(v))
            for k, v in errors.iteritems()
        )
        raise cerberus.ValidationError(
            message_string.format(field, "\n".join(error_strings))
        )


# Extend csv.DictWriter to handle Unicode input
class UnicodeDictWriter(csv.DictWriter, object):
    
    def writerow(self, row):
        super(UnicodeDictWriter, self).writerow({
            k: (v.encode('utf-8') if isinstance(v, unicode) else v) for k, v in row.iteritems()
        })

    def writerows(self, rows):
        for row in rows:
            self.writerow(row)
            
# Main function to process osm file to 5 csv files
def process_map(file_in, validate):
    """Iteratively process each XML element and write to csv(s)"""

    with codecs.open(NODES_PATH, 'w') as nodes_file, \
         codecs.open(NODE_TAGS_PATH, 'w') as nodes_tags_file, \
         codecs.open(WAYS_PATH, 'w') as ways_file, \
         codecs.open(WAY_NODES_PATH, 'w') as way_nodes_file, \
         codecs.open(WAY_TAGS_PATH, 'w') as way_tags_file:

        nodes_writer = UnicodeDictWriter(nodes_file, NODE_FIELDS)
        node_tags_writer = UnicodeDictWriter(nodes_tags_file, NODE_TAGS_FIELDS)
        ways_writer = UnicodeDictWriter(ways_file, WAY_FIELDS)
        way_nodes_writer = UnicodeDictWriter(way_nodes_file, WAY_NODES_FIELDS)
        way_tags_writer = UnicodeDictWriter(way_tags_file, WAY_TAGS_FIELDS)

        nodes_writer.writeheader()
        node_tags_writer.writeheader()
        ways_writer.writeheader()
        way_nodes_writer.writeheader()
        way_tags_writer.writeheader()

        validator = cerberus.Validator()

        for element in get_element(file_in, tags=('node', 'way')):
            el = shape_element(element)
            if el:
                if validate is True:
                    validate_element(el, validator)

                if element.tag == 'node':
                    nodes_writer.writerow(el['node'])
                    node_tags_writer.writerows(el['node_tags'])
                elif element.tag == 'way':
                    ways_writer.writerow(el['way'])
                    way_nodes_writer.writerows(el['way_nodes'])
                    way_tags_writer.writerows(el['way_tags'])
                    
# Note: Validation is ~ 15X slower. Consider using a small
# sample of the map when validating.
process_map(osm_file_new, validate=True)

## Building SQL database

In this section we will build SQL database and import tables to this database from csv file from previous section. We'll use sqlite3 shell for this purpose. It's needed to read create_db.csv file.
.read create_csv.db
Next command will execute this file:

.read create_csv.db

Content of file:

CREATE TABLE nodes (
    id INTEGER PRIMARY KEY NOT NULL,
    lat REAL,
    lon REAL,
    user TEXT,
    uid INTEGER,
    version INTEGER,
    changeset INTEGER,
    timestamp TEXT
);

CREATE TABLE nodes_tags (
    id INTEGER,
    key TEXT,
    value TEXT,
    type TEXT,
    FOREIGN KEY (id) REFERENCES nodes(id)
);

CREATE TABLE ways (
    id INTEGER PRIMARY KEY NOT NULL,
    user TEXT,
    uid INTEGER,
    version TEXT,
    changeset INTEGER,
    timestamp TEXT
);

CREATE TABLE ways_tags (
    id INTEGER NOT NULL,
    key TEXT NOT NULL,
    value TEXT NOT NULL,
    type TEXT,
    FOREIGN KEY (id) REFERENCES ways(id)
);

CREATE TABLE ways_nodes (
    id INTEGER NOT NULL,
    node_id INTEGER NOT NULL,
    position INTEGER NOT NULL,
    FOREIGN KEY (id) REFERENCES ways(id),
    FOREIGN KEY (node_id) REFERENCES nodes(id)
);


.mode csv

.import nodes.csv nodes

.import ways.csv ways

.import nodes_tags.csv nodes_tags

delete from nodes_tags where id = 'id';

.import ways_tags.csv ways_tags

delete from ways_tags where id = 'id';

.import ways_nodes.csv ways_nodes

delete from ways_nodes where id = 'id';

## Quering SQL database

For quering database I'll use not sqlite3 shell, but sqlite3 Puthon API. 

1) Number of nodes

2) Number of ways

In [5]:
# Importing SQLite3 API
import sqlite3

#Esteblishing connection and cursor
conn = sqlite3.connect("osm.db")
cursor = conn.cursor()

#Executing and printing 
cursor.execute("select count(id) from nodes;")
print 'There are {} nodes in database.'.format(cursor.fetchall()[0][0])
cursor.execute("select count(id) from ways;")
print 'There are {} ways in database.'.format(cursor.fetchall()[0][0])

There are 1911654 nodes in database.
There are 195498 ways in database.


3) Number of unique users

In [6]:
cursor.execute("select count(distinct(uid)) from (select uid from nodes union select uid from ways);")
print 'There are {} uniqe users in database.'.format(cursor.fetchall()[0][0])

There are 1227 uniqe users in database.


4) Way with the biggest nodes count

In [7]:
cursor.execute("select id, count(*) as nodes_count from ways_nodes group by id order by nodes_count desc limit 1;")
way_id, count = cursor.fetchall()[0]
print "There're {} nodes in the biggest way in database. Way id is {}.".format(count, way_id)
cursor.execute("select * from ways_tags where id = {};".format(way_id))
print 'This way is:'
pprint.pprint(cursor.fetchall())

There're 1975 nodes in the biggest way in database. Way id is 186341673.
This way is:
[(186341673, u'name', u'Breakneck Creek', u'regular\r'),
 (186341673, u'source', u'bing', u'regular\r'),
 (186341673, u'waterway', u'stream', u'regular\r'),
 (186341673, u'city', u'Mars', u'addr\r')]


5) Number of bridges in Pittsburgh

In [8]:
cursor.execute("select count(key) from ways_tags where key = 'bridge' and value != 'yes' group by key;")
print "There are {} bridges in Pitt. That's a second Venice.".format(cursor.fetchall()[0][0])

There are 410 bridges in Pitt. That's a second Venice.


6) Top 5 zip codes in Pittsburgh

In [9]:
cursor.execute("select value, count(*) as count from nodes_tags where key = 'postcode' group by value order by count desc limit 5;")
pprint.pprint(cursor.fetchall())

[(u'15206', 6653),
 (u'15044', 5333),
 (u'15025', 4950),
 (u'15216', 4853),
 (u'15017', 3454)]


## Conclusion

OpenStreetMap data is not perfect as any human modified project. It'll take a lot of time to find and clean all human-made errors. But we've made our first step. We modified street names and made them more consistent and uniform. Then we transformed XML to CSV format and imported it into SQL database. And finaly we answered some interesting questions using SQL queries. 