# Wrangle OpenStreetMap Data - Denton, TX

The objective of this project is to obtain the open street map of a location - audit, clean the data and import to an SQL database - and perform additional analysis. I chose to analyze Denton city in the state of Texas.

I used the Open Street Map Export Tool to find the coordinates of Denton, TX and used Overpass_API link to download it.

In [1]:
import xml.etree.cElementTree as ET
import os
import re
from collections import defaultdict



def tags_overview(filename):
    '''Parsing through the OSM data file and creating a dictionary - where the 
    key represents element type and the value represents number of unique 
    occurances - to understand the overall structure.
    '''
    tags = {}
    for event, element in ET.iterparse(filename):
        if element.tag in tags: 
            tags[element.tag] += 1
        else:
            tags[element.tag] = 1
    return tags

xml_data = os.path.join("/Users/Srikanth/Documents/dand/data_wrangling","OSM.xml") #mapping data directory, file location on system 
tags_overview(xml_data)

{'bounds': 1,
 'member': 5235,
 'meta': 1,
 'nd': 293608,
 'node': 256686,
 'note': 1,
 'osm': 1,
 'relation': 141,
 'tag': 112714,
 'way': 23061}

These terms are explained in [Open street map wiki](https://wiki.openstreetmap.org/wiki/Beginners_Guide_1.3). We see that nodes, ways, relations, etc are some of the core elements. For this project I'd explore nodes(dots used to mark locations) and ways (connected line of nodes).

## 1. Auditing Data

Before we process the data and add it into a database, let's check the "k" value for each tag and see if there are any potential problems. I used the following regular expressions to filter these tags:

In [2]:
lower = re.compile(r'^([a-z]|_)*$') #for tags that contain only lowercase letters and are valid
lower_colon = re.compile(r'^([a-z]|_)*:([a-z]|_)*$') #for otherwise valid tags with a colon in their names
problemchars = re.compile(r'[=\+/&<>;\'"\?%#$@\,\. \t\r\n]') #for tags with problematic characters

def key_type(element, keys):
    '''Function that checks with the regular expressions'''
    if element.tag == "tag":
        for tag in element.iter('tag'):
            k = tag.get('k')
            if lower.search(k):
                keys['lower'] += 1
            elif lower_colon.search(k):
                keys['lower_colon'] += 1
            elif problemchars.search(k):
                keys['problemchars'] += 1
            else:
                keys['other'] += 1
    return keys


def process_map(filename):
    keys = {"lower": 0, "lower_colon": 0, "problemchars": 0, "other": 0}
    for _, element in ET.iterparse(filename):
        keys = key_type(element, keys)

    return keys

keys = process_map(xml_data)
keys

{'lower': 50383, 'lower_colon': 60470, 'other': 1861, 'problemchars': 0}

Now let's check how many unique users have contributed to this area

In [3]:
def process_map(filename):
    users = set()
    for _, element in ET.iterparse(filename):
        for item in element:
            if 'uid' in item.attrib:
                users.add(item.attrib['uid'])
    return users
unique_users = process_map(xml_data)
len(unique_users)

313

## 2. Cleaning Dataset

Now let's explore some of the common problems encountered:
 - Inconsistent street names
 - Inconsistent postal codes
  
We'll check the dataset and fix these problems. Starting with street names, we can create a list of expected street names.

In [4]:
street_type_re = re.compile(r'\b\S+\.?$', re.IGNORECASE)
osm_file = open(xml_data, "r")
street_types = defaultdict(set)

expected = ["Avenue", "Bend", "Boulevard", "Circle", "Commons", "Court", "Cove", "Crossing", "Drive",  "Hill",
            "Hollow", "Lane", "Loop", "Park", "Parkway", "Pass", "Path", "Place", "Plaza", "Point", "Ridge", "Road", "Row",
            "Run", "Square", "Street", "Trail", "View", "Vista", "Way", "North", "East", "West", "South"]

mapping = { "St": "Street",
            "St.": "Street",
           "street" :"Street",
           "st": "Street",
           "Ave"  : "Avenue",
           "ave"  : "Avenue",
           "Blvd" : "Boulevard",
           "blvd" : "Boulevard",
           "Dr"   : "Drive",
           "dr"   : "Drive",
           "Ln"   : "Lane",
           "ln"   : "Lane",
           "Pkwy" : "Parkway",
           "pkwy" : "Parkway",
           "PKWY" : "Parkway",
           "Rd"   : "Road",
           "Rd."  : "Road",
           "rd"   : "Road",
           "Ct"   : "Court",
           "ct"   : "Court",
           "Cir"  : "Circle",
           "cir"  : "Circle",
           "Hwy"  : "Highway",
           "HWY"  : "Highway",
           "hwy"  : "Highway",
           "Sq"   : "Square",
           "sq"   : "Square",
           "N"    : "North",
           "N."   : "North",
           "E"    : "East",
           "W"    : "West",
           "S"    : "South"}

def audit_street_type(street_types, street_name):
    m = street_type_re.search(street_name)
    if m:
        street_type = m.group()
        if street_type not in expected:
            street_types[street_type].add(street_name)

def is_street_name(elem):
    return (elem.attrib['k'] == "addr:street")

def audit(osmfile):
    for event, elem in ET.iterparse(osm_file, events=("start",)):
        if (elem.tag == "node") or (elem.tag == "way"):
            for tag in elem.iter("tag"):
                if is_street_name(tag):
                    audit_street_type(street_types, tag.attrib['v'])
    osm_file.close()
    return street_types

audit(xml_data)
dict(street_types)

{'2181': {'FM 2181', 'Farm-to-Market Road 2181'},
 '288': {'N. Loop 288', 'S Loop 288', 'South Loop 288'},
 '316': {'Fm 2181 #316'},
 '35': {'North Interstate 35'},
 '377': {'South US Highway 377'},
 '380': {'US 380'},
 '4201': {'4201'},
 'A': {'Avenue A'},
 'B': {'South Avenue B'},
 'Blvd': {'Hickory Creek Blvd'},
 'C': {'Avenue C'},
 'Dr': {'Dallas Dr'},
 'E': {'S Interstate 35 E'},
 'Fwy': {'S stemmons Fwy', 'South Stemmons Fwy'},
 'I-35': {'North I-35', 'South I-35'},
 'W': {'Ganzar Rd W'}}

In [5]:
def update_street_name(name, mapping):
    """ 
        Replaces unexpected street names with better names 

        Args:
            name: An unexpected street name
            mapping: Dictionary of expected street names

        Returns:
            name: The updated street name
    """
    words = name.split()
    for w in range(len(words)):
        if words[w] in mapping:
            words[w] = mapping[words[w]]
    name = " ".join(words)
    return name

for street_type, ways in street_types.items():
    for name in ways:
        better_name = update_street_name(name, mapping)
        print (name, "=>", better_name)

4201 => 4201
Avenue C => Avenue C
Avenue A => Avenue A
US 380 => US 380
Fm 2181 #316 => Fm 2181 #316
S Loop 288 => South Loop 288
N. Loop 288 => North Loop 288
South Loop 288 => South Loop 288
South US Highway 377 => South US Highway 377
Ganzar Rd W => Ganzar Road West
Dallas Dr => Dallas Drive
North Interstate 35 => North Interstate 35
Hickory Creek Blvd => Hickory Creek Boulevard
FM 2181 => FM 2181
Farm-to-Market Road 2181 => Farm-to-Market Road 2181
North I-35 => North I-35
South I-35 => South I-35
South Avenue B => South Avenue B
S Interstate 35 E => South Interstate 35 East
S stemmons Fwy => South stemmons Fwy
South Stemmons Fwy => South Stemmons Fwy


Now let's check zip codes

In [6]:
from collections import defaultdict

def audit_zipcode(invalid_zipcodes, zipcodes):
    if not (re.match(r'^()\d{3}$', zipcodes)): 
        invalid_zipcodes[zipcodes[:2]].add(zipcodes)
        
def is_zipcode(elem):
    return (elem.attrib['k'] == "addr:postcode")

def audit_zip(osmfile):
    osm_file = open(osmfile, "r")
    invalid_zipcodes = defaultdict(set)
    for event, elem in ET.iterparse(osm_file, events=("start",)):
        if elem.tag == "node" or elem.tag == "way":
            for tag in elem.iter("tag"):
                if is_zipcode(tag):
                    audit_zipcode(invalid_zipcodes,tag.attrib['v'])

    return invalid_zipcodes

cal_zipcode = audit_zip(xml_data)
dict(cal_zipcode)

{'75': {'75065', '75077', '75201'},
 '76': {'76201',
  '76203',
  '76205',
  '76207',
  '76208',
  '76209',
  '76209-1540',
  '76210',
  '76226',
  '76227',
  '76249',
  '76259'}}

In [7]:
def update_zipcode(zipcode):
    test = re.findall('[a-zA-z]*', zipcode)
    if test:
        better_zipcode = re.findall(r'\d+', zipcode)
        if better_zipcode:
            if len(better_zipcode) == 2:
                return better_zipcode[0]
            else:
                return better_zipcode

for zipcode, ways in cal_zipcode.items():
    for name in ways:
        better_name = update_zipcode(name)
        print (name, "=>", better_name)

76205 => ['76205']
76259 => ['76259']
76249 => ['76249']
76207 => ['76207']
76208 => ['76208']
76227 => ['76227']
76201 => ['76201']
76226 => ['76226']
76209 => ['76209']
76209-1540 => 76209
76203 => ['76203']
76210 => ['76210']
75077 => ['75077']
75065 => ['75065']
75201 => ['75201']


Now that the auditing is complete the next step is to prepare the data to be inserted into a SQL database. To do so we need to parse the elements in the OSM XML file, transforming them from document format to tabular format, thus making it possible to write to .csv files. These csv files can then easily be imported to a SQL database as tables.

The process for this transformation is as follows:
 - Use iterparse to iteratively step through each top level element in the XML
 - Shape each element into several data structures using a custom function
 - Utilize a schema and validation library to ensure the transformed data is in the correct format
 - Write each data structure to the appropriate .csv files

In [8]:
!pip install cerberus
import csv
import codecs
import re
import xml.etree.cElementTree as ET
import cerberus
import schema
import unicodecsv


OSM_PATH = "OSM.xml"

NODES_PATH = "nodes.csv"
NODE_TAGS_PATH = "nodes_tags.csv"
WAYS_PATH = "ways.csv"
WAY_NODES_PATH = "ways_nodes.csv"
WAY_TAGS_PATH = "ways_tags.csv"

LOWER_COLON = re.compile(r'^([a-z]|_)+:([a-z]|_)+')
PROBLEMCHARS = re.compile(r'[=\+/&<>;\'"\?%#$@\,\. \t\r\n]')

SCHEMA = schema.schema

NODE_FIELDS = ['id', 'lat', 'lon', 'user', 'uid', 'version', 'changeset', 'timestamp']
NODE_TAGS_FIELDS = ['id', 'key', 'value', 'type']
WAY_FIELDS = ['id', 'user', 'uid', 'version', 'changeset', 'timestamp']
WAY_TAGS_FIELDS = ['id', 'key', 'value', 'type']
WAY_NODES_FIELDS = ['id', 'node_id', 'position']

def shape_element(element, node_attr_fields = NODE_FIELDS, way_attr_fields = WAY_FIELDS,
                  problem_chars = PROBLEMCHARS, default_tag_type = 'regular'):
    """Clean and shape node or way XML element to Python dict"""

    node_attribs = {}
    way_attribs = {}
    way_nodes = []
    tags = []  # Handle secondary tags the same way for both node and way elements

    if element.tag == 'node':
        for node in NODE_FIELDS:
            node_attribs[node] = element.attrib[node]
        for child in element:
            tag = {}
            if PROBLEMCHARS.search(child.attrib["k"]):
                continue
        
            elif LOWER_COLON.search(child.attrib["k"]):
                tag_type = child.attrib["k"].split(':',1)[0]
                tag_key = child.attrib["k"].split(':',1)[1]
                tag["key"] = tag_key
                if tag_type:
                    tag["type"] = tag_type
                else:
                    tag["type"] = 'regular'
            
                tag["id"] = element.attrib["id"]
                tag["value"] = child.attrib["v"]
            else:
                tag["value"] = child.attrib["v"]
                tag["key"] = child.attrib["k"]
                tag["type"] = "regular"
                tag["id"] = element.attrib["id"]
            if tag:
                tags.append(tag)
        return {'node': node_attribs, 'node_tags': tags}
    elif element.tag == 'way':
        for way in WAY_FIELDS:
            way_attribs[way] = element.attrib[way]
        for child in element:
            nd = {}
            tag = {}
            if child.tag == 'tag':
                if PROBLEMCHARS.search(child.attrib["k"]):
                    continue
                elif LOWER_COLON.search(child.attrib["k"]):
                    tag_type = child.attrib["k"].split(':',1)[0]
                    tag_key = child.attrib["k"].split(':',1)[1]
                    tag["key"] = tag_key
                    if tag_type:
                        tag["type"] = tag_type
                    else:
                        tag["type"] = 'regular'
                    tag["id"] = element.attrib["id"]
                    tag["value"] = child.attrib["v"]
    
                else:
                    tag["value"] = child.attrib["v"]
                    tag["key"] = child.attrib["k"]
                    tag["type"] = "regular"
                    tag["id"] = element.attrib["id"]
                if tag:
                    tags.append(tag)
                    
            elif child.tag == 'nd':
                nd['id'] = element.attrib["id"]
                nd['node_id'] = child.attrib["ref"]
                nd['position'] = len(way_nodes)
            
                if nd:
                    way_nodes.append(nd)
            else:
                continue
        return {'way': way_attribs, 'way_nodes': way_nodes, 'way_tags': tags}
    
# ================================================== #
#               Helper Functions                     #
# ================================================== #
def get_element(osm_file, tags=('node', 'way', 'relation')):
    """Yield element if it is the right type of tag"""

    context = iter(ET.iterparse(osm_file, events=('start', 'end')))
    _, root = next(context)
    for event, elem in context:
        if event == 'end' and elem.tag in tags:
            yield elem
            root.clear()


def validate_element(element, validator, schema=SCHEMA):
    """Raise ValidationError if element does not match schema"""
    if validator.validate(element, schema) is not True:
        field, errors = next(validator.errors.items())
        message_string = "\nElement of type '{0}' has the following errors:\n{1}"
        error_strings = (
            "{0}: {1}".format(k, v if isinstance(v, str) else ", ".join(v))
            for k, v in errors.items()
        )
        raise cerberus.ValidationError(
            message_string.format(field, "\n".join(error_strings))
        )


class UnicodeDictWriter(csv.DictWriter, object):
    """Extend csv.DictWriter to handle Unicode input"""

    def writerow(self, row):
        super(UnicodeDictWriter, self).writerow({
            k: (v.encode('utf-8') if isinstance(v, str) else v) for k, v in row.items()
        })

    def writerows(self, rows):
        for row in rows:
            self.writerow(row)

def process_map(file_in, validate):
    """Iteratively process each XML element and write to csv(s)"""

    with codecs.open(NODES_PATH, 'w') as nodes_file, \
         codecs.open(NODE_TAGS_PATH, 'w') as nodes_tags_file, \
         codecs.open(WAYS_PATH, 'w') as ways_file, \
         codecs.open(WAY_NODES_PATH, 'w') as way_nodes_file, \
         codecs.open(WAY_TAGS_PATH, 'w') as way_tags_file:

        nodes_writer = UnicodeDictWriter(nodes_file, NODE_FIELDS)
        node_tags_writer = UnicodeDictWriter(nodes_tags_file, NODE_TAGS_FIELDS)
        ways_writer = UnicodeDictWriter(ways_file, WAY_FIELDS)
        way_nodes_writer = UnicodeDictWriter(way_nodes_file, WAY_NODES_FIELDS)
        way_tags_writer = UnicodeDictWriter(way_tags_file, WAY_TAGS_FIELDS)

        nodes_writer.writeheader()
        node_tags_writer.writeheader()
        ways_writer.writeheader()
        way_nodes_writer.writeheader()
        way_tags_writer.writeheader()

        validator = cerberus.Validator()

        for element in get_element(file_in, tags=('node', 'way')):
            el = shape_element(element)
            if el:
                if validate is True:
                    validate_element(el, validator)

                if element.tag == 'node':
                    nodes_writer.writerow(el['node'])
                    node_tags_writer.writerows(el['node_tags'])
                elif element.tag == 'way':
                    ways_writer.writerow(el['way'])
                    way_nodes_writer.writerows(el['way_nodes'])
                    way_tags_writer.writerows(el['way_tags'])

process_map(OSM_PATH, validate=True)



In [18]:
# Create a new db
import sqlite3
import pandas as pd

db = sqlite3.connect("/Users/Srikanth/Documents/dand/data_wrangling/OpenStreetMap_Denton.db")
db.text_factory = str
c = db.cursor()

In [19]:
csv_files = ['nodes', 'nodes_tags', 'ways', 'ways_tags', 'ways_nodes']
for file in csv_files:
    df = pd.read_csv(file + '.csv')
    df.to_sql(file, db, if_exists='append', index=False)
db.commit()

In [134]:
# !pip install ipython-sql
# !pip install pymysql
# %reload_ext sql
%sql sqlite:///OpenStreetMap_Denton.db
%config SqlMagic.feedback = False

sqlite_file = '/Users/Srikanth/Documents/dand/data_wrangling/OpenStreetMap_Denton.db'
conn = sqlite3.connect(sqlite_file)
c = conn.cursor()

%sql SELECT COUNT(*) as Nodes FROM nodes

Nodes
256686


In [136]:
%sql SELECT COUNT(*) as Ways FROM ways 

Ways
23061


In [190]:
%sql SELECT COUNT(DISTINCT(user.uid)) as 'Number of unique users' FROM \
          (SELECT uid FROM nodes UNION ALL SELECT uid FROM ways) as user

(sqlite3.OperationalError) no such column: uid [SQL: "SELECT COUNT(DISTINCT(user.uid)) as 'Number of unique users' FROM           (SELECT uid FROM nodes UNION ALL SELECT uid FROM ways) as user"]


In [187]:
%sql SELECT COUNT(DISTINCT('user.uid')) as 'Number of unique users' FROM \
          (SELECT 'uid' FROM nodes UNION ALL SELECT 'uid' FROM ways) as user

Number of unique users
1
