# Project 3: OpenStreetMap Data Wrangling with SQL


In [5]:

# Importing libraries
import xml.etree.cElementTree as ET
from pprint import pprint
import re
from collections import defaultdict
import csv
import codecs
import cerberus


In [6]:
# %load schema.py
# Note: The schema is stored in a .py file in order to take advantage of the
# int() and float() type coercion functions. Otherwise it could easily stored as
# as JSON or another serialized format.

schema = {
    'node': {
        'type': 'dict',
        'schema': {
            'id': {'required': True, 'type': 'integer', 'coerce': int},
            'lat': {'required': True, 'type': 'float', 'coerce': float},
            'lon': {'required': True, 'type': 'float', 'coerce': float},
            'user': {'required': True, 'type': 'string'},
            'uid': {'required': True, 'type': 'integer', 'coerce': int},
            'version': {'required': True, 'type': 'string'},
            'changeset': {'required': True, 'type': 'integer', 'coerce': int},
            'timestamp': {'required': True, 'type': 'string'}
        }
    },
    'node_tags': {
        'type': 'list',
        'schema': {
            'type': 'dict',
            'schema': {
                'id': {'required': True, 'type': 'integer', 'coerce': int},
                'key': {'required': True, 'type': 'string'},
                'value': {'required': True, 'type': 'string'},
                'type': {'required': True, 'type': 'string'}
            }
        }
    },
    'way': {
        'type': 'dict',
        'schema': {
            'id': {'required': True, 'type': 'integer', 'coerce': int},
            'user': {'required': True, 'type': 'string'},
            'uid': {'required': True, 'type': 'integer', 'coerce': int},
            'version': {'required': True, 'type': 'string'},
            'changeset': {'required': True, 'type': 'integer', 'coerce': int},
            'timestamp': {'required': True, 'type': 'string'}
        }
    },
    'way_nodes': {
        'type': 'list',
        'schema': {
            'type': 'dict',
            'schema': {
                'id': {'required': True, 'type': 'integer', 'coerce': int},
                'node_id': {'required': True, 'type': 'integer', 'coerce': int},
                'position': {'required': True, 'type': 'integer', 'coerce': int}
            }
        }
    },
    'way_tags': {
        'type': 'list',
        'schema': {
            'type': 'dict',
            'schema': {
                'id': {'required': True, 'type': 'integer', 'coerce': int},
                'key': {'required': True, 'type': 'string'},
                'value': {'required': True, 'type': 'string'},
                'type': {'required': True, 'type': 'string'}
            }
        }
    }
}

# Extract Sample File

In [7]:
# Creating sample file as original OSM is 1.28 GB unzipped.
# Parameter: take every k-th top level element
#Sample size is 128.9 MB

OSM_FILE = "new-orleans_louisiana.osm"
SAMPLE_FILE = "new-orleans_louisiana_sample.osm"

k = 10

def get_element(osm_file, tags=('node', 'way', 'relation')):
    '''Yield element if it is the right type of tag

    Reference:
    http://stackoverflow.com/questions/3095434/inserting-newlines-in-xml-file-generated-via-xml-etree-elementtree-in-python
    '''
    context = iter(ET.iterparse(osm_file, events=('start', 'end')))
    _, root = next(context)
    for event, elem in context:
        if event == 'end' and elem.tag in tags:
            yield elem
            root.clear()


with open(SAMPLE_FILE, 'wb') as output:
    output.write(b'<?xml version="1.0" encoding="UTF-8"?>\n')
    output.write(b'<osm>\n  ')

    # Write every kth top level element
    for i, element in enumerate(get_element(OSM_FILE)):
        if i % k == 0:
            output.write(ET.tostring(element, encoding='utf-8'))

    output.write(b'</osm>')

# 1 - Load dataset and search for incorrect street abbreviations.

In [8]:
# Load the sample OSM file as provided in GitHub repository
OSMFILE = "new-orleans_louisiana_sample.osm"
street_type_re = re.compile(r'\b\S+\.?$', re.IGNORECASE)

expected = ["Street", "Avenue", "Boulevard", "Drive", "Court", "Place", "Square", "Lane", "Road"]

There were a number of street names that were just inconsistent and had to be changed.

In [9]:
# Create a group of auditing functions for street suffix
def audit_street_type(street_types, street_name):
    """ Checks if street name contains incorrect abbreviations, if so, adds it to the dictionary. """
    m = street_type_re.search(street_name)
    if m:
        street_type = m.group()
        if street_type not in expected:
            street_types[street_type].add(street_name)


def is_street_name(elem):
    """ Returns a Boolean value """
    return (elem.attrib['k'] == "addr:street")


def audit(osmfile):
    """ Iterates through document tags, and returns dictionary
        of incorrect abbreviations (keys) and street names (value) that contain these abbreviations.
    """
    osm_file = open(osmfile, "r")
    street_types = defaultdict(set)
    for event, elem in ET.iterparse(osm_file, events=("start",)):

        if elem.tag == "node" or elem.tag == "way":
            for tag in elem.iter("tag"):
                if is_street_name(tag):
                    audit_street_type(street_types, tag.attrib['v'])
    osm_file.close()
    return street_types

# Run audit and print results
st_types = audit(OSMFILE)
pprint(dict(st_types))

{'1': set(['Highway 1']),
 '10': set(['Interstate 10']),
 '132': set(['Road 132']),
 '134': set(['Road 134']),
 '141': set(['Road 141']),
 '18': set(['LA 18']),
 '204': set(['Manhattan Boulevard Bldg F #204']),
 '232': set(['Rd. 232', 'Road 232']),
 '234': set(['Road 234']),
 '254': set(['Road 254']),
 '259': set(['Road 259']),
 '266': set(['Road 266']),
 '302': set(['Road 302']),
 '307': set(['Road 307']),
 '309': set(['Road 309']),
 '310': set(['Road 310']),
 '326': set(['Road 326']),
 '330': set(['Road 330']),
 '346': set(['Road 346']),
 '357': set(['Road 357']),
 '361': set(['Road 361']),
 '372': set(['Road 372']),
 '373': set(['Road 373']),
 '378': set(['Road 378']),
 '380': set(['Road 380']),
 '4058': set(['Northshore Blvd #4058']),
 '43': set(['Highway 43']),
 '433': set(['Road 433']),
 '49': set(['Highway 49', 'Old Highway 49']),
 '503': set(['Road 503']),
 '528': set(['Road 528']),
 '53': set(['Highway 53']),
 '534': set(['Road 534']),
 '541': set(['Road 541']),
 '546': set(['

In [10]:
# Function to correct street names using wrong suffix
def update_name(name, mapping):
    """ Substitutes incorrect abbreviation with correct one. """
    m = street_type_re.search(name)
    if m:
        street_type = m.group()
        
        temp= 0
        try:
            temp = int(street_type)
        except:
            pass
        
        if street_type not in expected and temp == 0:
            try:
                name = re.sub(street_type_re, mapping[street_type], name)
            except:
                pass
    return name

In [11]:
# Dictionary mapping incorrect abbreviations to correct one.
mapping = { "St": "Street",
            "St.": "Street",
            "ST": "Street",
            "st": "Street",
            "Rd.": "Road",
            "Rd": "Road",
            "RD": "Road",
            "Ave": "Avenue",
            "Ave.": "Avenue",
            "Blvd": "Boulevard",
            "BLVD": "Boulevard",
            "Cir": "Circle",
            "Ct": "Court",
            "Dr": "Drive",
            "Trl": "Trail",
            "Ter": "Terrace",
            "Pl": "Place",
            "Pkwy": "Parkway",
            "Bnd": "Bend",
            "Mnr": "Manor",
            "Ln": "Lane",
            "street": "Street",
            "AVE": "Avenue",
            "Blvd.": "Boulevard",
            "Cirlce": "Circle",
            "DRIVE": "Drive",
            "Cv": "Cove",
            "Dr.": "Drive",
            "Druve": "Drive",
            "Holw": "Hollow",
            "Hwy": "Highway",
            "HWY": "Highway",
            "Pt": "Point",
            "Trce": "Trace",
            "ave": "Avenue",
            "Cres": "Crescent"
            }

In [12]:
# Apply corrections where incorrect detected v. mapping.
for st_type, ways in st_types.iteritems():
    for name in ways:
        better_name = update_name(name, mapping)
        print name, "=>", better_name

Rue La Terre => Rue La Terre
Pine Ridge => Pine Ridge
Westgrove PARK => Westgrove PARK
Drinkwater => Drinkwater
Road 132 => Road 132
Honeymoon Hill => Honeymoon Hill
Pleasant Hill => Pleasant Hill
Chapel Hill => Chapel Hill
Whispering Branch => Whispering Branch
Road 134 => Road 134
Bay Oaks => Bay Oaks
Espana => Espana
Blue Bird => Blue Bird
Lakeshore => Lakeshore
Off Hightower => Off Hightower
Rue Nichole => Rue Nichole
North Cove => North Cove
Brandi Cove => Brandi Cove
Eagle Cove => Eagle Cove
Cardinal Cove => Cardinal Cove
Pecanwood Cove => Pecanwood Cove
Tarpon Cove => Tarpon Cove
Brianna Cove => Brianna Cove
Ol' Mans Cove => Ol' Mans Cove
Pelican Cove => Pelican Cove
Ironwood Cove => Ironwood Cove
Banbury Cove => Banbury Cove
Dundee Cove => Dundee Cove
Magnolia Cove => Magnolia Cove
Palmetto Cove => Palmetto Cove
Kirsten Cove => Kirsten Cove
Beth Cove => Beth Cove
Pine Cove => Pine Cove
Wheaton Cove => Wheaton Cove
Creekside Cove => Creekside Cove
Maple Cove => Maple Cove
Leewar

# Zip Code

In [13]:
# # Create a group of auditing functions for postal codes

def audit_postcode(post_code, digits):
    """ Checks if postal code is incompatible and adds it to the list if so. """
    if len(digits) != 5 or (digits[0:2] != '01' and digits[0:2] != '02'):
        post_code.append(digits)
        
def is_postalcode(elem):
    """ Returns a Boolean value."""
    return (elem.attrib['k'] == "addr:postcode")


def audit(osmfile):
    """ Iterates and returns list of inconsistent postal codes found in the document. """
    osm_file = open(osmfile, "r")
    post_code = []
    for event, elem in ET.iterparse(osm_file, events=("start",)):

        if elem.tag == "node" or elem.tag == "way":
            for tag in elem.iter("tag"):
                if is_postalcode(tag):
                    audit_postcode(post_code, tag.attrib['v'])
    osm_file.close()
    return post_code

In [14]:
# Run audit and print results
postal_codes = audit(OSMFILE)
print postal_codes

['70118', '70122', '70115', '70002', '70119', '70112', '70130', '70002', '70118', '70118', '70115', '70130', '70130', '70403', '70130', '70115', '70115', '70130', '70012', '70072', '70130', '70433', '70447', '70118', '70115', '70115', '70118', '70460', '70002', '70462', '70058', '70130', '70112', '70130', '70130', '70130', '70117', '70032', '70446', '39571', '39503', '39560', '39571', '39571', '39571', '39571', '39571', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', '39503', 

In [15]:
# Apply corrections where incorrect detected (No problems with Zip Code)
#for code in postal_codes:
#    better_code = update_zip(code)
#    print code, "=>", better_code

#  3. Convert XML to CSV format


In [16]:
NODES_PATH = "nodes.csv"
NODE_TAGS_PATH = "nodes_tags.csv"
WAYS_PATH = "ways.csv"
WAY_NODES_PATH = "ways_nodes.csv"
WAY_TAGS_PATH = "ways_tags.csv"

In [17]:
NODE_FIELDS = ['id', 'lat', 'lon', 'user', 'uid', 'version', 'changeset', 'timestamp']
NODE_TAGS_FIELDS = ['id', 'key', 'value', 'type']
WAY_FIELDS = ['id', 'user', 'uid', 'version', 'changeset', 'timestamp']
WAY_TAGS_FIELDS = ['id', 'key', 'value', 'type']
WAY_NODES_FIELDS = ['id', 'node_id', 'position']

In [18]:

# Regular expression compiler patterns.
LOWER_COLON = re.compile(r'^([a-z]|_)+:([a-z]|_)+')
PROBLEMCHARS = re.compile(r'[=\+/&<>;\'"\?%#$@\,\. \t\r\n]')

SCHEMA = schema

In [19]:
# Check if input element is a "node" or a "way" then clean, shape and parse to corresponding dictionary.

def shape_element(element, node_attr_fields=NODE_FIELDS, way_attr_fields=WAY_FIELDS,
                  problem_chars=PROBLEMCHARS, default_tag_type='regular'):
    """Clean and shape node or way XML element to Python dict"""

    node_attribs = {}
    way_attribs = {}
    way_nodes = []
    tags = []  # Handle secondary tags the same way for both node and way elements
    
    if element.tag == 'node':
        for field in node_attr_fields:
            node_attribs[field] = element.attrib[field]
    
    if element.tag == 'way':
        for field in way_attr_fields:
            way_attribs[field] = element.attrib[field]
        
        position = 0
        temp = {}
        for tag in element.iter("nd"):
            temp['id'] = element.attrib["id"]
            temp['node_id'] = tag.attrib["ref"]
            temp['position'] = position
            position += 1
            way_nodes.append(temp.copy())

    temp = {}
    for tag in element.iter("tag"):
        temp['id'] = element.attrib["id"]
        if ":" in tag.attrib["k"]:
            newKey = re.split(":",tag.attrib["k"],1)
            temp['key'] = newKey[1]
            elif temp['key'] == 'street':
                temp['value'] = update_name(tag.attrib["v"],mapping)
            else:
                temp['value'] = tag.attrib["v"]
            temp["type"] = newKey[0]
        else:
            temp['key'] = tag.attrib["k"]
            if temp['key'] == 'postcode':
                temp['value'] = update_zip(tag.attrib["v"])
            elif temp['key'] == 'street':
                temp['value'] = update_name(tag.attrib["v"],mapping)
            else:
                temp['value'] = tag.attrib["v"]
            temp["type"] = default_tag_type
        tags.append(temp.copy())  
        
    
    if element.tag == 'node':
        return {'node': node_attribs, 'node_tags': tags}
    elif element.tag == 'way':
        return {'way': way_attribs, 'way_nodes': way_nodes, 'way_tags': tags}

SyntaxError: invalid syntax (<ipython-input-19-aecafea7d6da>, line 35)

In [None]:
# ================================================== #
#               Helper Functions                     #
# ================================================== #
def get_element(osm_file, tags=('node', 'way', 'relation')):
    """Yield element if it is the right type of tag"""

    context = ET.iterparse(osm_file, events=('start', 'end'))
    _, root = next(context)
    for event, elem in context:
        if event == 'end' and elem.tag in tags:
            yield elem
            root.clear()

def validate_element(element, validator, schema=SCHEMA):
    """Raise ValidationError if element does not match schema"""
    if validator.validate(element, schema) is not True:
        field, errors = next(validator.errors.iteritems())
        message_string = "\nElement of type '{0}' has the following errors:\n{1}"
        error_strings = (
            "{0}: {1}".format(k, v if isinstance(v, str) else ", ".join(v))
            for k, v in errors.iteritems()
        )
        raise cerberus.ValidationError(
            message_string.format(field, "\n".join(error_strings))
        )

class UnicodeDictWriter(csv.DictWriter, object):
    """Extend csv.DictWriter to handle Unicode input"""

    def writerow(self, row):
        super(UnicodeDictWriter, self).writerow({
            k: (v.encode('utf-8') if isinstance(v, unicode) else v) for k, v in row.iteritems()
        })

    def writerows(self, rows):
        for row in rows:
            self.writerow(row)

In [None]:
# ================================================== #
#               Main Function                        #
# ================================================== #
def process_map(file_in, validate):
    """Iteratively process each XML element and write to csv(s)"""

    with codecs.open(NODES_PATH, 'w') as nodes_file, \
         codecs.open(NODE_TAGS_PATH, 'w') as nodes_tags_file, \
         codecs.open(WAYS_PATH, 'w') as ways_file, \
         codecs.open(WAY_NODES_PATH, 'w') as way_nodes_file, \
         codecs.open(WAY_TAGS_PATH, 'w') as way_tags_file:

        nodes_writer = UnicodeDictWriter(nodes_file, NODE_FIELDS)
        node_tags_writer = UnicodeDictWriter(nodes_tags_file, NODE_TAGS_FIELDS)
        ways_writer = UnicodeDictWriter(ways_file, WAY_FIELDS)
        way_nodes_writer = UnicodeDictWriter(way_nodes_file, WAY_NODES_FIELDS)
        way_tags_writer = UnicodeDictWriter(way_tags_file, WAY_TAGS_FIELDS)

        nodes_writer.writeheader()
        node_tags_writer.writeheader()
        ways_writer.writeheader()
        way_nodes_writer.writeheader()
        way_tags_writer.writeheader()

        validator = cerberus.Validator()

        for element in get_element(file_in, tags=('node', 'way')):
            el = shape_element(element)
            if el:
                if validate is True:
                    validate_element(el, validator)

                if element.tag == 'node':
                    nodes_writer.writerow(el['node'])
                    node_tags_writer.writerows(el['node_tags'])
                elif element.tag == 'way':
                    ways_writer.writerow(el['way'])
                    way_nodes_writer.writerows(el['way_nodes'])
                    way_tags_writer.writerows(el['way_tags'])

In [None]:
process_map(OSMFILE, validate=False)

In [None]:
import sqlite3
db = sqlite3.connect("Nola.db")
c = db.cursor()

In [None]:
c.execute('''
CREATE TABLE nodes (
    id INTEGER PRIMARY KEY NOT NULL,
    lat REAL,
    lon REAL,
    user TEXT,
    uid INTEGER,
    version INTEGER,
    changeset INTEGER,
    timestamp TEXT
);
''')

c.execute('''
CREATE TABLE nodes_tags (
    id INTEGER,
    key TEXT,
    value TEXT,
    type TEXT,
    FOREIGN KEY (id) REFERENCES nodes(id)
);
''')

c.execute('''
CREATE TABLE ways (
    id INTEGER PRIMARY KEY NOT NULL,
    user TEXT,
    uid INTEGER,
    version TEXT,
    changeset INTEGER,
    timestamp TEXT
);
''')

c.execute('''
CREATE TABLE ways_tags (
    id INTEGER NOT NULL,
    key TEXT NOT NULL,
    value TEXT NOT NULL,
    type TEXT,
    FOREIGN KEY (id) REFERENCES ways(id)
);
''')

c.execute('''
CREATE TABLE ways_nodes (
    id INTEGER NOT NULL,
    node_id INTEGER NOT NULL,
    position INTEGER NOT NULL,
    FOREIGN KEY (id) REFERENCES ways(id),
    FOREIGN KEY (node_id) REFERENCES nodes(id)
);
''')

db.commit()

In [None]:
# Read in the csv file as a dictionary, format the data as a list of tuples:
with open('nodes.csv','rb') as fin:
    dr = csv.DictReader(fin) # comma is default delimiter
    to_db = [(i['id'], i['lat'], i['lon'], i['user'].decode("utf-8"), i['uid'], i['version'], i['changeset'], i['timestamp']) for i in dr]
    
# insert the formatted data
c.executemany("INSERT INTO nodes(id, lat, lon, user, uid, version, changeset, timestamp) VALUES (?, ?, ?, ?, ?, ?, ?, ?);", to_db)
# commit the changes
db.commit()

In [None]:
with open('nodes_tags.csv','rb') as fin:
    dr = csv.DictReader(fin) # comma is default delimiter
    to_db = [(i['id'], i['key'], i['value'].decode("utf-8"), i['type']) for i in dr]
    
# insert the formatted data
c.executemany("INSERT INTO nodes_tags(id, key, value,type) VALUES (?, ?, ?, ?);", to_db)
# commit the changes
db.commit()

In [None]:
with open('ways.csv','rb') as fin:
    dr = csv.DictReader(fin) # comma is default delimiter
    to_db = [(i['id'], i['user'].decode("utf-8"), i['uid'], i['version'], i['changeset'], i['timestamp']) for i in dr]
    
# insert the formatted data
c.executemany("INSERT INTO ways(id, user, uid, version, changeset, timestamp) VALUES (?, ?, ?, ?, ?, ?);", to_db)
# commit the changes
db.commit()

In [None]:

with open('ways_nodes.csv','rb') as fin:
    dr = csv.DictReader(fin) # comma is default delimiter
    to_db = [(i['id'], i['node_id'], i['position']) for i in dr]
    
# insert the formatted data
c.executemany("INSERT INTO ways_nodes(id, node_id, position) VALUES (?, ?, ?);", to_db)
# commit the changes
db.commit()

In [None]:

with open('ways_tags.csv','rb') as fin:
    dr = csv.DictReader(fin) # comma is default delimiter
    to_db = [(i['id'], i['key'], i['value'].decode("utf-8"), i['type']) for i in dr]
    
# insert the formatted data
c.executemany("INSERT INTO ways_tags(id, key, value, type) VALUES (?, ?, ?, ?);", to_db)
# commit the changes
db.commit()

# Overview of Data

<b>File sizes</b>
<br>new-orleans_louisiana.osm .............. 424 MB</br>
<br>new-orleans_louisiana_sample.osm .. 129.6 MB</br>
<br>Nola.db ................................... 66.9 MB</br>
<br>nodes.csv ............................... 52.9 MB</br>
<br>nodes_tags.csv .......................... 7 MB</br>
<br>ways.csv ................................. 2.3 MB</br>
<br>ways_nodes.csv ......................... 17 MB</br>
<br>ways_tags.csv .......................... 4.9 MB</br>

In [None]:
import sqlite3
db = sqlite3.connect("Nola.db")
c = db.cursor()

# Number of Ways

In [None]:
query = "SELECT count(*) FROM ways;"
c.execute(query)
c.fetchall()[0][0]

# Number of Common Way Tags (Top 5)

In [None]:
query = "SELECT key, count(*) FROM ways_tags GROUP BY 1 ORDER BY count(*) DESC LIMIT 5;"
c.execute(query)
c.fetchall()

# Number of Nodes

In [None]:
query = "SELECT count(*) FROM nodes;"
c.execute(query)
c.fetchall()[0][0]

# Number of Common Node Tags (Top 5)

In [None]:
query = "SELECT key,count(*) FROM nodes_tags GROUP BY 1 ORDER BY count(*) DESC LIMIT 5;"
c.execute(query)
c.fetchall()

# Contributors

In [None]:
query = "SELECT temp.user, count(*) as posts FROM (SELECT user, uid FROM ways UNION ALL SELECT user, uid FROM nodes) as temp \
GROUP BY temp.user ORDER BY posts DESC LIMIT 10;"
c.execute(query)
c.fetchall()

# Top Users

In [None]:
query = "SELECT count(DISTINCT(temp.uid)) FROM (SELECT user, uid FROM ways UNION ALL SELECT user, uid FROM nodes) as temp;"
c.execute(query)
c.fetchall()[0][0]

# Top 10 Amenities

In [None]:
query = "SELECT temp.value, count(*) as num \
FROM (SELECT key,value FROM ways_tags UNION ALL SELECT key,value FROM nodes_tags) as temp \
WHERE temp.key='amenity' GROUP BY temp.value ORDER BY num DESC LIMIT 10;"
c.execute(query)
c.fetchall()

# Cuisine

In [None]:
query = "SELECT temp.value, count(*) as num \
FROM (SELECT key,value FROM ways_tags UNION ALL SELECT key,value FROM nodes_tags) as temp \
WHERE temp.key='cuisine' GROUP BY temp.value ORDER BY num DESC LIMIT 10;"
c.execute(query)
c.fetchall()