In [5]:
#!/usr/bin/env python
# -*- coding: utf-8 -*-

In [6]:
import xml.etree.cElementTree as ET
import pprint
from collections import defaultdict
import re

In [7]:
filename=r'C:\Users\linshoubao\Downloads\p3\chicago.osm'
type_re = re.compile(r'\b\S+\.?$', re.IGNORECASE)

通过观察芝加哥xml数据，发现街道名、tiger GPS中的type、direction存在缩写的行为，如下所示：
```
<tag k="tiger:name_type" v="Ave" />
<tag k="tiger:name_direction_prefix" v="S" />
<tag k="tiger:name_direction_prefix" v="W" /> 
```

# 一、数据清洗、整理阶段

## 第一步：我们需要对数据进行审查，找出问题数据。

我们把街道名、tiger GPS中希望出现的值放入expected中，如下：

In [8]:
expected = ["Street", "Avenue", "Boulevard", "Drive", "Court", "Place", "Square", "Lane", "Road", 
            "Trail", "Parkway", "Commons",'Market','Plaza','Terrace','Walk','Ciruit','West','South','North','Row','Clinton','1']

In [9]:
# 创建审查类型方法
def audit_type(types, name):
    m = type_re.search(name)
    if m:
        key_type = m.group()
        if key_type not in expected:
            types[key_type].add(name)

In [16]:
#创建判断是否是街道名、tiger GPS 元素方法
def is_street_name(elem):
    return (elem.attrib['k'] == "addr:street")
def is_street_type(elem):
    return (elem.attrib['k']=='addr:street:type')
def is_name_type(elem):
    return (elem.attrib['k']=='tiger:name_type')
def is_name_direction_prefix(elem):
    return (elem.attrib['k']=='tiger:name_direction_prefix')

In [17]:
#创建审查方法
def audit(osmfile):
    osm_file = open(osmfile, "r")
    street_types = defaultdict(set)
    street_names = defaultdict(set)
    name_types = defaultdict(set)
    name_direction_prefix = defaultdict(set)
    for event, elem in ET.iterparse(osm_file, events=("start",)):

        if elem.tag == "node" or elem.tag == "way":
            for tag in elem.iter("tag"):
                if is_street_name(tag):
                    audit_type(street_names, tag.attrib['v'])
                elif is_street_type(tag):
                    audit_type(street_types,tag.attrib['v'])
                elif is_name_type(tag):
                    audit_type(name_types,tag.attrib['v'])
                elif  is_name_direction_prefix(tag):
                    audit_type(name_direction_prefix,tag.attrib['v'])
    osm_file.close()
    return street_names,street_types,name_types,name_direction_prefix

In [20]:
def test():
    street_names,street_types,name_types,name_direction_prefix = audit(filename)
    pprint.pprint(dict(street_names))
    pprint.pprint(dict(street_types))
    pprint.pprint(dict(name_types))
    pprint.pprint(dict(name_direction_prefix))

运行test（）方法，获得有问题的数据，以字典形式表示，如下：

In [21]:
if __name__ == '__main__':
    test()

{'575': set(['N LaSalle St, #575']),
 'Dr': set(['North Upper Wacker Dr']),
 'St': set(['W 18th St']),
 'St.': set(['W. Lake St.', 'W. Madison St.'])}
{'St': set(['St'])}
{'Ave': set(['Ave']),
 'Ave:Blvd': set(['Ave:Blvd']),
 'Blvd': set(['Blvd']),
 'Blvd:St': set(['Blvd:St']),
 'Cir': set(['Cir']),
 'Ct': set(['Ct']),
 'Dr': set(['Dr']),
 'Dr;Blvd': set(['Dr;Blvd']),
 'Ln': set(['Ln']),
 'Pky': set(['Pky']),
 'Pl': set(['Pl']),
 'Rd': set(['Rd']),
 'Sq': set(['Sq']),
 'St': set(['St']),
 'Ter': set(['Ter'])}
{'N': set(['N']), 'S': set(['S']), 'W': set(['W'])}


## 第二步：创建mapping字典

通过审查可以看到，数据中有23种缩写问题，我们把这种缩写字符串和全称字符串以字典的形式放入mapping中，如下:

In [34]:
mapping = { 'St': "Street",'Ave':'Avenue', 'Rd.':'Road','Pky':'Parkway','Cir':'Circuit','Dr':'Drive','Sq':'Square','Pl':'Place',
           'Ter':'Terrace','Blvd:St':'Boulevard:Street','W':'West', 'S':'South','Blvd':'Boulevard','Ave:Blvd':'Avenue:Boulevard',
           'N':'North','Ln':'lane','Dr;Blvd':'Drive;Boulevard','Ct':'Court','W 18th St':'West 18th South','W. Lake St.':'West Lake Street',
           'W. Madison St.':'West Madison Street','N LaSalle St, #575':'North LaSalle Street, #575'}

## 第三步：创建update_value方法，对问题数据进行更新。

In [32]:
def update_value(value, mapping):
    if value in mapping:
        value=mapping[value]
    else:
        last=value.split()[-1]
        if  last in mapping:
            value=value.replace(last,mapping[last])
    return value

# 二、数据提取、导出阶段

## 第一步：数据提取前期模块、路径等准备。

In [22]:
import csv
import codecs
import cerberus
import schema_test #自行创建的模块

In [23]:
#设置CSV文件存放路径
NODES_PATH = r"C:\Users\linshoubao\Downloads\p3\nodes.csv"
NODE_TAGS_PATH = r"C:\Users\linshoubao\Downloads\p3\nodes_tags.csv"
WAYS_PATH = r"C:\Users\linshoubao\Downloads\p3\ways.csv"
WAY_NODES_PATH = r"C:\Users\linshoubao\Downloads\p3\ways_nodes.csv"
WAY_TAGS_PATH = r"C:\Users\linshoubao\Downloads\p3\ways_tags.csv"

In [24]:
#创建CSV文件中的字段名
NODE_FIELDS = ['id', 'lat', 'lon', 'user', 'uid', 'version', 'changeset', 'timestamp']
NODE_TAGS_FIELDS = ['id', 'key', 'value', 'type']
WAY_FIELDS = ['id', 'user', 'uid', 'version', 'changeset', 'timestamp']
WAY_TAGS_FIELDS = ['id', 'key', 'value', 'type']
WAY_NODES_FIELDS = ['id', 'node_id', 'position']

In [25]:
PROBLEMCHARS = re.compile(r'[=\+/&<>;\'"\?%#$@\,\. \t\r\n]') #问题字段正则表达式

SCHEMA = schema_test.schema

## 第二步：数据提取。

根据要求，需将xml文件中的值提取出来，赋值到CSV文件中对应字段名下，创建shape_element方法;

In [26]:
def shape_element(element, node_attr_fields=NODE_FIELDS, way_attr_fields=WAY_FIELDS,
                  problem_chars=PROBLEMCHARS, default_tag_type='regular'):
    """Clean and shape node or way XML element to Python dict"""

    node_attribs = {}
    way_attribs = {}
    way_nodes = []
    tags = []  # Handle secondary tags the same way for both node and way elements

    if element.tag == 'node':
        # 提取node_attribs中tag的值
        for field in node_attr_fields:
            if field=='id' or field=='uid' or field=='changeset':
                node_attribs[field]=int(element.attrib[field])
            elif field=='lat' or field=='lon':
                node_attribs[field]=float(element.attrib[field])
            else:
                node_attribs[field]=element.attrib[field]
        #提取node标签嵌套文档的值
        for elem in element.findall('tag'):
            tags_record={}
            if elem is not None:
                if problem_chars.search(elem.attrib['k']):
                    pass
                else:
                    tags_record['id']=node_attribs['id']
                    if is_street_name(elem) or is_street_type(elem) or is_name_type(elem) or is_name_direction_prefix(elem):
                        tags_record['value']=update_value(elem.attrib['v'],mapping)
                    else:
                        tags_record['value']=elem.attrib['v']
                    if ':' in elem.attrib['k']:
                        n=elem.attrib['k'].find(':')
                        tags_record['key']=elem.attrib['k'][n+1:]
                        tags_record['type']=elem.attrib['k'][0:n]
                    else:
                        tags_record['key']=elem.attrib['k']
                        tags_record['type']=default_tag_type
                    tags.append(tags_record)
        return {'node': node_attribs, 'node_tags': tags}
    
    elif element.tag == 'way':
        #提取way_attribs中tag的值
        for field in way_attr_fields:
            if field=='id' or field=='uid' or field=='changeset':
                way_attribs[field]=int(element.attrib[field])
            else:
                way_attribs[field]=element.attrib[field]
        #提取way标签嵌套文档的值
        for elem in element.findall('tag'):
            tags_record={}
            if elem is not None:
                if problem_chars.search(elem.attrib['k']):
                    pass
                else:
                    tags_record['id']=way_attribs['id']
                    if is_street_name(elem) or is_street_type(elem) or is_name_type(elem) or is_name_direction_prefix(elem):
                        tags_record['value']=update_value(elem.attrib['v'],mapping)
                    else:
                        tags_record['value']=elem.attrib['v']
                    if ':' in elem.attrib['k']:
                        n=elem.attrib['k'].find(':')
                        tags_record['key']=elem.attrib['k'][n+1:]
                        tags_record['type']=elem.attrib['k'][0:n]
                    else:
                        tags_record['key']=elem.attrib['k']
                        tags_record['type']=default_tag_type
                    tags.append(tags_record)
        #提取way标签嵌套文档node的值
        count=0
        for elem in element.findall('nd'):
            node_record={}
            node_record['id']=way_attribs['id']
            node_record['node_id']=elem.attrib['ref']
            node_record['position']=count
            way_nodes.append(node_record)
            count +=1
        return {'way': way_attribs, 'way_nodes': way_nodes, 'way_tags': tags}


In [27]:
def get_element(osm_file, tags=('node', 'way', 'relation')):    #获取tags标签元素
    """Yield element if it is the right type of tag"""

    context = ET.iterparse(osm_file, events=('start', 'end'))
    _, root = next(context)
    for event, elem in context:
        if event == 'end' and elem.tag in tags:
            yield elem
            root.clear()

In [28]:
def validate_element(element, validator, schema=SCHEMA):    #验证元素
    """Raise ValidationError if element does not match schema"""
    if validator.validate(element, schema) is not True:
        field, errors = next(validator.errors.iteritems())
        message_string = "\nElement of type '{0}' has the following errors:\n{1}"
        error_string = pprint.pformat(errors)
        
        raise Exception(message_string.format(field, error_string))

## 第三步：保存数据。

这里我们需要创建UnicodeDictWriter方法，不用CSV模块中的DictWriter方法，是因为xml文件中存在无法编码的数据。

In [29]:
class UnicodeDictWriter(csv.DictWriter, object):   #创建Unicode写入类
    """Extend csv.DictWriter to handle Unicode input"""

    def writerow(self, row):
        super(UnicodeDictWriter, self).writerow({
            k: (v.encode('utf-8') if isinstance(v, unicode) else v) for k, v in row.iteritems()
        })

    def writerows(self, rows):
        for row in rows:
            self.writerow(row)

In [30]:
def process_map(file_in, validate):     #保存CSV文件
    """Iteratively process each XML element and write to csv(s)"""

    with codecs.open(NODES_PATH, 'w') as nodes_file, \
         codecs.open(NODE_TAGS_PATH, 'w') as nodes_tags_file, \
         codecs.open(WAYS_PATH, 'w') as ways_file, \
         codecs.open(WAY_NODES_PATH, 'w') as way_nodes_file, \
         codecs.open(WAY_TAGS_PATH, 'w') as way_tags_file:

        nodes_writer = UnicodeDictWriter(nodes_file, NODE_FIELDS)
        node_tags_writer = UnicodeDictWriter(nodes_tags_file, NODE_TAGS_FIELDS)
        ways_writer = UnicodeDictWriter(ways_file, WAY_FIELDS)
        way_nodes_writer = UnicodeDictWriter(way_nodes_file, WAY_NODES_FIELDS)
        way_tags_writer = UnicodeDictWriter(way_tags_file, WAY_TAGS_FIELDS)

        nodes_writer.writeheader()
        node_tags_writer.writeheader()
        ways_writer.writeheader()
        way_nodes_writer.writeheader()
        way_tags_writer.writeheader()

        validator = cerberus.Validator()

        for element in get_element(file_in, tags=('node', 'way')):
            el = shape_element(element)
            if el:
                if validate is True:
                    validate_element(el, validator)

                if element.tag == 'node':
                    nodes_writer.writerow(el['node'])
                    node_tags_writer.writerows(el['node_tags'])
                elif element.tag == 'way':
                    ways_writer.writerow(el['way'])
                    way_nodes_writer.writerows(el['way_nodes'])
                    way_tags_writer.writerows(el['way_tags'])

## 第四步：导出数据。

因为数据集大小为104M，比较大，导出时间至少需要30分钟。

In [35]:
if __name__ == '__main__':
    process_map(filename, validate=True)