-
Notifications
You must be signed in to change notification settings - Fork 2
/
网页测试.py
153 lines (132 loc) · 4.61 KB
/
网页测试.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# coding=utf8
from multiprocessing import Process, Pool
import os, threading
import requests
from requests import Request, Session
import ssl
# from Crypto.Cipher import AES
# from bs4 import BeautifulSoup
import urllib, random
# import ConfigParser
# import cookielib
import json
from ast import literal_eval
from pymongo import MongoClient
import zlib
import StringIO
import gzip
import struct
import chardet
import base64
import bz2
ssl._create_default_https_context = ssl._create_unverified_context
# 地址列表
IPANDPORT = []
# 获取代理列表
def getTheRemoteAgent():
f = open("proxy_list.txt", "r")
for line in f:
IPANDPORT.append(line)
f.close()
# 目标host
HOST_1 = 'http://clientapi.medical-lighter.com/user/short_userinfo'
HOST_2 = 'http://api.doctorpda.cn/api/comment/list?app_key=f1c18i2otirc0004&client_id=0bd1agocekr6073a&access_token=c83ad482-b05c-4187-9c07-566454b0b168&net=wifi&versionName=4.2.2&versionCode=85&id=%s&p=1&limit=50&layer=TwoLayer&type=content'
HOST_3 = 'http://api.doctorpda.cn/api/app/client/open?app_key=f1c18i2otirc0004&client_id=0bd1agocekr6073a&access_token=c83ad482-b05c-4187-9c07-566454b0b168&net=wifi&versionName=4.2.2&versionCode=85&loc=0&lon=0&lat=0&cur_channel=36e18idp80ct00e'
# session会话
session = Session()
# 第一级url
def getUrl(target_url, index):
# print target_url
print index
curentURL = target_url
headers = {'User-Agent': '%E8%BD%BB%E7%9B%88%E5%8C%BB%E5%AD%A6/2.1.4 CFNetwork/758.3.15 Darwin/15.4.0',
'Accept': '''*/*''',
# 'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
'Content-Type': 'application/x-www-form-urlencoded',
'Accept-Language': 'zh-cn',
}
s = '''{\"device\":{\"platform\":\"android\",\"idfa\":\"e721e44d-ba72-3ec7-b90c-cd0d81bd8932\",\"macAddress\":\"9c:c1:72:6f:4d:59\",\"imei\":\"864036028378890\"},\"command\":\"message\\/notification\",\"user\":{\"uid\":\"302142\",\"nickname\":\"unicorn1369\",\"access_token\":\"y7NQT9BV8BB8F~nPdDjlf5ir7amkekGpjKpT7-QVn85sZ4FVertH-2w6EfgnPJIECwDGBGZbKqSAizFe\"},\"soft\":{\"coopId\":\"10020\",\"version\":\"2.1.4\",\"productId\":\"3001\"},\"request\":{\"user_id\":\"302143\"}}'''
c = zlib.compress(s)
byteStr = bytearray(c)
for i in range(0, len(byteStr)):
byteStr[i] = 0x5A ^ byteStr[i]
prepare = Request('GET', curentURL, headers=headers, data=bytes(byteStr)).prepare()
# print prepare.headers
# print prepare.body
# print prepare._cookies
# try多次
attempts = 0
success = False
while attempts < 2 and not success:
try:
# ind = random.randint(0, len(IPANDPORT) - 1)
# proxy = {'http': 'http://%s' % IPANDPORT[ind].strip()}
# print proxy
# result = session.send(prepare, timeout=20, proxies = proxy)
result = session.send(prepare, timeout=5)
success = True
result.encoding = 'utf8'
except Exception, e:
print '请求失败, 重试...'
print e
attempts += 1
if attempts == 3:
print '请求三次失败,跳过'
return
print "====================="
buf = StringIO.StringIO()
try:
arr = bytearray(urllib.unquote(result.text), 'utf8')
for i in range(0, len(arr)):
arr[i] = 0x5A ^ arr[i]
buf.write(chr(arr[i]))
print zlib.decompress(buf.getvalue(), -zlib.MAX_WBITS)
except Exception, e:
print e
# 解析用户名,关注,点赞,阅读数python bz2.decompress
def praseJsonForOne(response, index):
print '+++++++++++++++++++++++++++++++++++++'
client = MongoClient()
myDB = client['zsyxDB']
print myDB.web.insert({'index': index, 'data': response})
#
# if __name__ == '__name__':
# getTheRemoteAgent()
pool = Pool(10)
for index in range(1):
pool.apply_async(getUrl, args=(HOST_1, index))
pool.close()
pool.join()
print 'All subprocesses done.'
# client = MongoClient()
# myDB = client['zsyxDB']
# json = myDB.web.find_one({'index': 8})
# print json
# data = json['data']
# print data
#
# print '------------'
# arr = bytearray(data, encoding='utf8')
# for i in range(0, len(arr)):
# arr[i] = 0x5A ^ arr[i]
# print arr
#
# from io import StringIO
#
# str_1 = StringIO(data)
# str_2 = StringIO()
# for i in range(len(str_1.getvalue())):
# temp = ord(str_1.read(1)) ^ 0x5A
# print temp
# str_2.write(unichr(temp))
#
#
# import sys
# reload(sys)
# sys.setdefaultencoding('utf8')
#
# try:
# print zlib.decompress(bytes(str_2.getvalue()), -zlib.MAX_WBITS)
# except Exception, e:
# print e