-
Notifications
You must be signed in to change notification settings - Fork 224
/
phone.py
127 lines (105 loc) · 4.11 KB
/
phone.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# -*- coding: utf-8 -*-
import os
import struct
import sys
__author__ = 'lovedboy'
if sys.version_info > (3, 0):
def get_record_content(buf, start_offset):
end_offset = buf.find(b'\x00', start_offset)
return buf[start_offset:end_offset].decode()
else:
def get_record_content(buf, start_offset):
end_offset = buf.find('\x00', start_offset)
return buf[start_offset:end_offset]
class Phone(object):
def __init__(self, dat_file=None):
if dat_file is None:
dat_file = os.path.join(os.path.dirname(__file__), "phone.dat")
self.dat_file = dat_file
with open(self.dat_file, 'rb') as f:
self.buf = f.read()
self.head_fmt = "<4si"
self.phone_fmt = "<iiB"
self.head_fmt_length = struct.calcsize(self.head_fmt)
self.phone_fmt_length = struct.calcsize(self.phone_fmt)
self.version, self.first_phone_record_offset = struct.unpack(
self.head_fmt, self.buf[:self.head_fmt_length])
self.phone_record_count = (len(
self.buf) - self.first_phone_record_offset) // self.phone_fmt_length
def get_phone_dat_msg(self):
print("版本号:{}".format(self.version))
print("总记录条数:{}".format(self.phone_record_count))
print("dat_file:{}".format(self.dat_file))
@staticmethod
def get_phone_no_type(no):
if no == 4:
return "电信虚拟运营商"
if no == 5:
return "联通虚拟运营商"
if no == 6:
return "移动虚拟运营商"
if no == 3:
return "电信"
if no == 2:
return "联通"
if no == 1:
return "移动"
if no == 7:
return "广电"
if no == 8:
return "广电虚拟运营商"
@staticmethod
def _format_phone_content(phone_num, record_content, phone_type):
province, city, zip_code, area_code = record_content.split('|')
return {
"phone": phone_num,
"province": province,
"city": city,
"zip_code": zip_code,
"area_code": area_code,
"phone_type": Phone.get_phone_no_type(phone_type)
}
def _lookup_phone(self, phone_num):
phone_num = str(phone_num)
assert 7 <= len(phone_num) <= 11
int_phone = int(str(phone_num)[0:7])
left = 0
right = self.phone_record_count
buflen = len(self.buf)
while left <= right:
middle = (left + right) // 2
current_offset = (self.first_phone_record_offset +
middle * self.phone_fmt_length)
if current_offset >= buflen:
return
buffer = self.buf[current_offset: current_offset +
self.phone_fmt_length]
cur_phone, record_offset, phone_type = struct.unpack(self.phone_fmt,
buffer)
if cur_phone > int_phone:
right = middle - 1
elif cur_phone < int_phone:
left = middle + 1
else:
record_content = get_record_content(self.buf, record_offset)
return Phone._format_phone_content(phone_num, record_content,
phone_type)
def find(self, phone_num):
return self._lookup_phone(phone_num)
@staticmethod
def human_phone_info(phone_info):
if not phone_info:
return ''
return "{}|{}|{}|{}|{}|{}".format(phone_info['phone'],
phone_info['province'],
phone_info['city'],
phone_info['zip_code'],
phone_info['area_code'],
phone_info['phone_type'])
def test(self):
self.get_phone_dat_msg()
for i in range(1529900, 1529999):
print(self.human_phone_info(self.find(i)))
if __name__ == "__main__":
phone = Phone()
phone.test()