# Data Auditing

In [19]:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import xml.etree.ElementTree as ET  # Use cElementTree or lxml if too slow
import pprint
import re
import pprint
from collections import defaultdict


## 1. Create a sample OSM

In [3]:
OSM_FILE = "HoustonSW.osm" 
SAMPLE_FILE = "sample.osm"

k = 10 # Parameter: take every k-th top level element

def get_element(osm_file, tags=('node', 'way', 'relation')):
    """Yield element if it is the right type of tag

    Reference:
    http://stackoverflow.com/questions/3095434/inserting-newlines-in-xml-file-generated-via-xml-etree-elementtree-in-python
    """
    context = iter(ET.iterparse(osm_file, events=('start', 'end')))
    _, root = next(context)
    for event, elem in context:
        if event == 'end' and elem.tag in tags:
            yield elem
            root.clear()


with open(SAMPLE_FILE, 'wb') as output:
    output.write('<?xml version="1.0" encoding="UTF-8"?>\n')
    output.write('<osm>\n  ')

    # Write every kth top level element
    for i, element in enumerate(get_element(OSM_FILE)):
        if i % k == 0:
            output.write(ET.tostring(element, encoding='utf-8'))

    output.write('</osm>')

## 2. Check Tag Names

In [4]:
#Specify which File to Use
filename=OSM_FILE
#filename=SAMPLE_FILE

In [5]:
"""
Print out the tags encountered and numbers of encounter
"""
def count_tags(filename):
        # YOUR CODE HERE
    osm_file=open(filename,"r")
    count_tags={}
    for event, elem in ET.iterparse(osm_file, events=("start",)):
        #print event, elem
        if elem.tag in count_tags:
            count_tags[elem.tag]+=1
        else:
            count_tags[elem.tag]=1
    return count_tags
        
def test():

    tags = count_tags(filename)
    pprint.pprint(tags)
    

if __name__ == "__main__":
    test()

{'bounds': 1,
 'member': 5492,
 'nd': 382655,
 'node': 285952,
 'osm': 1,
 'relation': 560,
 'tag': 330038,
 'way': 57639}


## 3. Check Key Names

In [9]:
#Specify which file to Use
filename=OSM_FILE
#filename=SAMPLE_FILE
"""
tag categories in a dictionary:
  "lower", for tags that contain only lowercase letters and are valid,
  "lower_colon", for otherwise valid tags with a colon in their names,
  "problemchars", for tags with problematic characters, and
  "other", for other tags that do not fall into the other three categories.
See the 'process_map' and 'test' functions for examples of the expected format.
"""

lower = re.compile(r'^([a-z]|_)*$')
lower_colon = re.compile(r'^([a-z]|_)*:([a-z]|_)*$')
problemchars = re.compile(r'[=\+/&<>;\'"\?%#$@\,\. \t\r\n]')

#if lower.match('abcd'): print 'match!'

def key_type(element, keys):
    if element.tag == "tag":
        # YOUR CODE HERE
        s=element.attrib['k']
        #print s
        if lower.match(s):
            keys['lower']+=1
        elif lower_colon.match(s):
            keys['lower_colon']+=1
        elif problemchars.match(s):
            keys['problemchars']+=1
        else:
            print s
            keys['other']+=1
        #print keys
    return keys



def process_map(filename):
    keys = {"lower": 0, "lower_colon": 0, "problemchars": 0, "other": 0}
    for _, element in ET.iterparse(filename):
        keys = key_type(element, keys)

    return keys



def test():
    keys = process_map(filename)
    pprint.pprint(keys)
    

if __name__ == "__main__":
    test()

gnis:Class
gnis:County
gnis:ST_num
gnis:ST_alpha
gnis:County_num
gnis:Class
gnis:County
gnis:ST_num
gnis:ST_alpha
gnis:County_num
gnis:Class
gnis:County
gnis:ST_num
gnis:ST_alpha
gnis:County_num
gnis:Class
gnis:County
gnis:ST_num
gnis:ST_alpha
gnis:County_num
gnis:Class
gnis:County
gnis:ST_num
gnis:ST_alpha
gnis:County_num
gnis:Class
gnis:County
gnis:ST_num
gnis:ST_alpha
gnis:County_num
gnis:Class
gnis:County
gnis:ST_num
gnis:ST_alpha
gnis:County_num
gnis:Class
gnis:County
gnis:County_num
gnis:ST_alpha
gnis:ST_num
gnis:Class
gnis:County
gnis:ST_num
gnis:ST_alpha
gnis:County_num
gnis:Class
gnis:County
gnis:ST_num
gnis:ST_alpha
gnis:County_num
gnis:Class
gnis:County
gnis:County_num
gnis:ST_alpha
gnis:ST_num
gnis:Class
gnis:County
gnis:ST_num
gnis:ST_alpha
gnis:County_num
gnis:Class
gnis:County
gnis:ST_num
gnis:ST_alpha
gnis:County_num
gnis:Class
gnis:County
gnis:ST_num
gnis:ST_alpha
gnis:County_num
gnis:Class
gnis:County
gnis:ST_num
gnis:ST_alpha
gnis:County_num
gnis:Class
gnis:County
gn

## 4. Check Users

In [10]:

"""
The function process_map should return a set of unique user IDs ("uid")
"""
#Specify which file to Use
filename=OSM_FILE
#filename=SAMPLE_FILE#Specify which file to Use

def get_user(element):
    if 'uid' in element.attrib:
        return element.attrib['uid']
    else:
        return



def process_map(filename):
    users = set()
    for _, element in ET.iterparse(filename):
        if get_user(element):
            users.add(get_user(element))
    return users


def test():

    users = process_map(filename)
    #pprint.pprint(users)
    print 'Number of Users: ',len(users)



if __name__ == "__main__":
    test()

Number of Users:  437


## 5. Audit Street Names

In [20]:

"""
Find the stree names that are not expected

"""
#Specify which file to Use
filename=OSM_FILE
#filename=SAMPLE_FILE
street_type_re = re.compile(r'\b\S+\.?$', re.IGNORECASE)

expected = ["Street", "Avenue", "Boulevard", "Drive", "Court", "Place", "Square", "Lane", "Road", 
            "Trail", "Parkway", "Commons", "Freeway","Loop", "Park","Way","Speedway"]

def audit_street_type(street_types, street_name):
    m = street_type_re.search(street_name)
    if m:
        street_type = m.group()
        if street_type not in expected:
            street_types[street_type].add(street_name)
            
def is_street_name(elem):
    return (elem.attrib['k'] == "addr:street")

def audit(osmfile):
    osm_file = open(osmfile, "r")
    street_types = defaultdict(set)
    for event, elem in ET.iterparse(osm_file, events=("start",)):

        if elem.tag == "node" or elem.tag == "way":
            for tag in elem.iter("tag"):
                if is_street_name(tag):
                    audit_street_type(street_types, tag.attrib['v'])
    osm_file.close()
    #pprint.pprint(dict(street_types))    
    return street_types

def test():
    st_types = audit(filename)
    pprint.pprint(dict(st_types))

if __name__ == '__main__':
    test()


{'110': set(['Memorial Drive, Ste 110']),
 '125': set(['798 Sorella Court Suite 125']),
 '240': set(['Bissonnet St #240']),
 '300': set(['Town & Country Blvd #300']),
 '502': set(['Northwest Freeway #502']),
 '59': set(['Southwest Freeway 59']),
 '704': set(['Memorial City Way #704']),
 '77027': set(['77027']),
 '77096': set(['Meyerland Plaza, Houston, TX 77096']),
 '90a': set(['Hwy 90a']),
 '925': set(['Katy Freeway Suite 925']),
 'Ave': set(['7828 N 19th Ave',
             'Richmond Ave',
             'S. Rice Ave',
             'W Bellfort Ave',
             'Washington Ave']),
 'Ave.': set(['Bertner Ave.']),
 'B': set(['Richmond Ave, Ste B', 'W Holcombe Blvd #B']),
 'Beechnut': set(['Beechnut']),
 'Blossom': set(['Blossom']),
 'Blvd': set(['John Freeman Blvd', 'Post Oak Blvd']),
 'Blvd.': set(['Bellaire Blvd.', 'Post Oak Blvd.']),
 'Dr': set(['1111 Upland Dr',
            'Portway Dr',
            'Post Oak Place Dr',
            'S Wilcrest Dr',
            'Waugh Dr']),
 'Driscol

## Audit Postal Codes

In [13]:
zipcode_re = re.compile(r'^\d{5}(-\d{4})?$')
def audit_post_code(wrongcodes,postcode):
    m = zipcode_re.search(postcode)
    if m:
        return 
    else:
        wrongcodes.append(postcode)
        return 
    
def is_post_code(elem):
    return (elem.attrib['k'] == "addr:postcode")

def audit_zipcode(osmfile):
    osm_file = open(osmfile, "r")
    wrongcodes=[]
    for event, elem in ET.iterparse(osm_file, events=("start",)):
        if elem.tag == "node" or elem.tag == "way":
            for tag in elem.iter("tag"):
                if is_post_code(tag):
                    audit_post_code(wrongcodes,tag.attrib['v'])
    osm_file.close()  
    return wrongcodes

def test():
    wrong=audit_zipcode(filename)
    print "Incorrect ZipCode: "
    print wrong

if __name__ == '__main__':
    test()

Incorrect ZipCode: 
['Weslayan Street', '7-']
