In [4]:
#首先导入所需的库
import xml.etree.cElementTree as ET
from collections import Counter
from collections import defaultdict
import re
import csv
import codecs
import pprint
#import schema     
#import cerberus

In [5]:
#定义一个函数audit，返回一个字典，键是所有的街道类型，值是各街道类型对应的街道名集合
street_type_re = re.compile(r'\b\S+\.?$', re.IGNORECASE)

def audit_street_type(street_types, street_name):
    m = street_type_re.search(street_name)
    if m :
        street_type = m.group()
        street_types[street_type].add(street_name)

def is_street_name(elem):
    return (elem.attrib['k'] == "addr:street")

def audit(osmfile):
    osm_file = open(osmfile, "r")
    street_types = defaultdict(set)
    for event, elem in ET.iterparse(osm_file, events=("start",)):
        if (elem.tag=='tag') and is_street_name(elem):
            audit_street_type(street_types, elem.attrib['v'])
    osm_file.close()            
    return street_types

In [6]:
#将结果以一一对应的形式打出来观察
for m,n in audit('export.osm').items():
    for i in n:
        print m,i

Boulevard 北环大道 Beihuan Boulevard
Boulevard 深南大道 Shennan Boulevard
Boulevard Shennan Boulevard
Boulevard 滨河大道 Binhe Boulevard
Boulevard 南山大道 Nanshan Boulevard
District Xili Town, Nanshan District
1號 金鸡路1號
Rd 正云路 Zhengyun Rd
Rd Futianxinsha Rd
Rd 玉园路 Yuyuan Rd
Rd Guihua Rd
Rd 正风路 Zhengfeng Rd
Rd 罗芳路 Luofang Rd
Rd 延芳路 Yanfang Rd
Rd 景田北街 Jingtian N Rd
Rd 经二路 Jinger Rd
Rd Gongye 7th Rd
Rd 湖贝路 Hubei Rd
Rd 莲花路 Lianhua Rd
Rd 香梅路 Xiangmei Rd
Rd 布龙路 Bulong Rd
Rd 中山园路 Zhongshanyuan Rd
Rd Zhenghua Rd
Rd 中兴路 Zhongxing Rd
Rd 景田西路 Jingtian W Rd
Rd 景田路 Jingtian Rd
Rd 粤兴三道 Yuexing 3rd Rd
Rd 东门中路 Dongmen Middle Rd
Rd 文锦中路 Wenjin Middle Rd
Rd 新秀路 Xinxiu Rd
Rd 欣园路 Xinyuan Rd
Tin 青山公路 - 新田段 Castle Peak Road - San Tin
Lu 沙河西路 ShaHe Xi Lu
Lu KeYuan Lu
Lu Hua Fa Bei Lu
Lu 桃园路 Taoyuan Lu
Lu Hongbao Lu
S. Gaoxin S.
BLVD Shennan BLVD
Xinsha Xinsha
CoCoPark 星河CoCoPark
Highway 广深公路 Guangzhou-Shenzhen Highway
B座2樓 海岸城中洲控股金融中心B座2樓
518000 1002 Huaqiang N Rd, HuaQiang Bei, Futian Qu, Shenzhen Shi, Guangdong Sheng, Chi

In [7]:
#expected列表包含了我们所期望的规范化模板。
expected = ["Street", "Avenue", "Boulevard", "Road", 'District', 'Highway', 'Path']

#根据上面的结果，建立mapping字典，键是变更前的街道类型，值是变更后的街道类型，这里只修改英文字符，且对非街道类型不做修改。
mapping = { 'S':"Street",
            'S.':"Street",
            "St": "Street",   
            'Lu':'Road',
            'Rd':"Road",
            'road':'Road',
            'BLVD':"Boulevard",
            'DaDao':'Road',
            'Av':"Avenue",
            'Ave':"Avenue"
            }

def add_street_type(street_types, street_name):
    m = street_type_re.search(street_name)
    if m:
        street_type = m.group()
        if street_type not in expected:
            street_types[street_type].add(street_name)

#替换街道名中的街道类型
def update_name(name, mapping):
    shortname = mapping.keys()
    for word in shortname:
        if name.endswith(word):
            name = name.replace(word,mapping[word])
    return name

In [8]:
#呈现变更的成果
st_types = audit('export.osm')
for st_type, ways in st_types.iteritems():
    for name in ways:
        better_name = update_name(name, mapping)
        print name, "=>", better_name

北环大道 Beihuan Boulevard => 北环大道 Beihuan Boulevard
深南大道 Shennan Boulevard => 深南大道 Shennan Boulevard
Shennan Boulevard => Shennan Boulevard
滨河大道 Binhe Boulevard => 滨河大道 Binhe Boulevard
南山大道 Nanshan Boulevard => 南山大道 Nanshan Boulevard
Xili Town, Nanshan District => Xili Town, Nanshan District
金鸡路1號 => 金鸡路1號
正云路 Zhengyun Rd => 正云路 Zhengyun Road
Futianxinsha Rd => Futianxinsha Road
玉园路 Yuyuan Rd => 玉园路 Yuyuan Road
Guihua Rd => Guihua Road
正风路 Zhengfeng Rd => 正风路 Zhengfeng Road
罗芳路 Luofang Rd => 罗芳路 Luofang Road
延芳路 Yanfang Rd => 延芳路 Yanfang Road
景田北街 Jingtian N Rd => 景田北街 Jingtian N Road
经二路 Jinger Rd => 经二路 Jinger Road
Gongye 7th Rd => Gongye 7th Road
湖贝路 Hubei Rd => 湖贝路 Hubei Road
莲花路 Lianhua Rd => 莲花路 Lianhua Road
香梅路 Xiangmei Rd => 香梅路 Xiangmei Road
布龙路 Bulong Rd => 布龙路 Bulong Road
中山园路 Zhongshanyuan Rd => 中山园路 Zhongshanyuan Road
Zhenghua Rd => Zhenghua Road
中兴路 Zhongxing Rd => 中兴路 Zhongxing Road
景田西路 Jingtian W Rd => 景田西路 Jingtian W Road
景田路 Jingtian Rd => 景田路 Jingtian Road
粤兴三道 Yuexing

In [9]:
#定义函数shape_element，将XML的元素从文档形式转换为能够写入CSV的数据结构

LOWER_COLON = re.compile(r'^([a-z]|_)+:([a-z]|_)+')
PROBLEMCHARS = re.compile(r'[=\+/&<>;\'"\?%#$@\,\. \t\r\n]')

NODE_FIELDS = ['id', 'lat', 'lon', 'user', 'uid', 'version', 'changeset', 'timestamp']
NODE_TAGS_FIELDS = ['id', 'key', 'value', 'type']
WAY_FIELDS = ['id', 'user', 'uid', 'version', 'changeset', 'timestamp']
WAY_TAGS_FIELDS = ['id', 'key', 'value', 'type']
WAY_NODES_FIELDS = ['id', 'node_id', 'position']

def shape_element(element, node_attr_fields=NODE_FIELDS, way_attr_fields=WAY_FIELDS,
                  problem_chars=PROBLEMCHARS, default_tag_type='regular'):
    """Clean and shape node or way XML element to Python dict"""

    node_attribs = {}
    way_attribs = {}
    way_nodes = []
    tags=[]
    m=0  #要在for child in element:循环之前规定，否则每次一循环就归零了
    # YOUR CODE HERE
    if element.tag == 'node':
        for i in NODE_FIELDS:
            node_attribs[i]=element.get(i)
        if element.find('tag') is not None:
            for tag in element.iter('tag'):  #iter生成迭代来查询
                tag_dict={}   #定义要在for循环里面，因为如果是字典嵌套在列表里，每个字典又有共同的key,定义在外面会导致字典的值都是最后一次循环的，详细参考word
                if PROBLEMCHARS.search(tag.get('k')): #如果k值存在问题
                    continue   #即当满足该条件时跳过，继续循环，这里也可以用‘pass’
                else:  #如果k值不存在问题
                    tag_dict['id']=element.get('id')
                    tag_dict['value']=tag.get('v')
                    if LOWER_COLON.search(tag.attrib['k']): #如果k值存在冒号
                        tag_dict['key']=tag.get('k').split(':',1)[1] #以冒号来分隔一次，取第二个元素，PS：split是返回分隔后字符串的列表
                        tag_dict['type']=tag.get('k').split(':',1)[0]
                    else: #如果k值不存在冒号
                        tag_dict['key']=tag.get('k')
                        tag_dict['type']='regular'
                tags.append(tag_dict)
        else:
            pass  #如果不存在次级标记，就啥也不做，即tags还是空列表
        return {'node': node_attribs, 'node_tags': tags}
    elif element.tag == 'way':
        for p in WAY_FIELDS:
            way_attribs[p]=element.get(p)
        for child in element:
            if child.tag=='tag':
                tag_dict={}
                if PROBLEMCHARS.search(child.get('k')):
                    continue
                else:
                    tag_dict['id']=element.get('id')
                    tag_dict['value']=child.get('v')
                    if LOWER_COLON.search(child.attrib['k']):
                        tag_dict['key']=child.get('k').split(':',1)[1]
                        tag_dict['type']=child.get('k').split(':',1)[0]
                    else:
                        tag_dict['key']=child.get('k')
                        tag_dict['type']='regular'
                tags.append(tag_dict)
            elif child.tag is None:
                pass
            elif child.tag=='nd':
                way_dict={}
                way_dict['id']=element.get('id')
                way_dict['node_id']=child.get('ref')
                way_dict['position']=m
                m+=1
                way_nodes.append(way_dict)
        return {'way': way_attribs, 'way_nodes': way_nodes, 'way_tags': tags}

In [10]:
#将每个数据结构写入相应的 .csv 文件

#SCHEMA = schema.schema

NODES_PATH = "nodes.csv"
NODE_TAGS_PATH = "nodes_tags.csv"
WAYS_PATH = "ways.csv"
WAY_NODES_PATH = "ways_nodes.csv"
WAY_TAGS_PATH = "ways_tags.csv"


def get_element(osm_file, tags=('node', 'way', 'relation')):
    """Yield element if it is the right type of tag"""

    context = ET.iterparse(osm_file, events=('start', 'end'))
    _, root = next(context)
    for event, elem in context:
        if event == 'end' and elem.tag in tags:
            yield elem
            root.clear()
            
#因为课程中的习题已经做过验证了，这里不再重复
#def validate_element(element, validator, schema=SCHEMA):
#    """Raise ValidationError if element does not match schema"""
#    if validator.validate(element, schema) is not True:
#         field, errors = next(validator.errors.iteritems())
#         message_string = "\nElement of type '{0}' has the following errors:\n{1}"
#         error_string = pprint.pformat(errors)
        
#        raise Exception(message_string.format(field, error_string))
        
class UnicodeDictWriter(csv.DictWriter, object):
    """Extend csv.DictWriter to handle Unicode input"""

    def writerow(self, row):
        super(UnicodeDictWriter, self).writerow({
            k: (v.encode('utf-8') if isinstance(v, unicode) else v) for k, v in row.iteritems()
        })

    def writerows(self, rows):
        for row in rows:
            self.writerow(row)
            
def process_map(file_in, validate):
    """Iteratively process each XML element and write to csv(s)"""

    with codecs.open(NODES_PATH, 'w') as nodes_file, \
         codecs.open(NODE_TAGS_PATH, 'w') as nodes_tags_file, \
         codecs.open(WAYS_PATH, 'w') as ways_file, \
         codecs.open(WAY_NODES_PATH, 'w') as way_nodes_file, \
         codecs.open(WAY_TAGS_PATH, 'w') as way_tags_file:

        nodes_writer = UnicodeDictWriter(nodes_file, NODE_FIELDS)
        node_tags_writer = UnicodeDictWriter(nodes_tags_file, NODE_TAGS_FIELDS)
        ways_writer = UnicodeDictWriter(ways_file, WAY_FIELDS)
        way_nodes_writer = UnicodeDictWriter(way_nodes_file, WAY_NODES_FIELDS)
        way_tags_writer = UnicodeDictWriter(way_tags_file, WAY_TAGS_FIELDS)

        nodes_writer.writeheader()
        node_tags_writer.writeheader()
        ways_writer.writeheader()
        way_nodes_writer.writeheader()
        way_tags_writer.writeheader()
        
       #validator = cerberus.Validator()


        for element in get_element(file_in, tags=('node', 'way')):
            el = shape_element(element)
            if el:
               #if validate is True:
                    #validate_element(el, validator)
                if element.tag == 'node':
                    nodes_writer.writerow(el['node'])
                    node_tags_writer.writerows(el['node_tags'])
                elif element.tag == 'way':
                    ways_writer.writerow(el['way'])
                    way_nodes_writer.writerows(el['way_nodes'])
                    way_tags_writer.writerows(el['way_tags'])
                    
if __name__ == '__main__':
    process_map('export.osm',validate =False)
            