In [2]:
import threading
import logging
import time
from PIL import Image
import cv2
import io
import numpy as np
import json
import base64
import sys
import random
from queue import Queue
from redis import Redis
import requests
import traceback
import os

from yolo3.yolo import YOLO
from geeTest import hanzi_click_target_char, hanzi_click_ques

from selenium import webdriver
import time
from selenium.webdriver.support.wait import WebDriverWait
import json
from selenium.webdriver.common import actions
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC

import platform
_platform = platform.system().lower()

now = lambda : int(time.time() * 1000)

prod = False

logger = logging.getLogger("avc")

if prod:
    fh = logging.FileHandler('avc_gee_test.log')
    fh.setLevel("INFO")
    fh.setFormatter(logging.Formatter("%(asctime)s-%(name)s-%(levelname)s ~ %(message)s  ~[%(filename)s:%(lineno)d]"))
    logger.addHandler(fh)
else:
    logging.basicConfig(level = logging.INFO,format = '%(asctime)s-%(name)s-%(levelname)s ~ %(message)s  ~[%(filename)s:%(lineno)d]')

def log_cost(session_id, action, start):
    if not prod:
        print(session_id, action, 'cost', now() - start)
    else:
        logger.info('%s: %s cost %d' % (session_id, action, now() - start))

class SlideTracks:
    def __init__(self):
        if prod:
            redis_password = ''
            redis_host = ''
            redis_port = 9209
            redis_sc_timeout = 30
            redis_db = 2
        else:
            redis_password = ''
            redis_host = '127.0.0.1'
            redis_port = 6379
            redis_sc_timeout = 30
            redis_db = 2
        self.reconn = Redis(host=redis_host, port=redis_port, db=redis_db, password=redis_password, socket_timeout=redis_sc_timeout)
        self.len_tracks_key_pattern = '__slider_tracks_%d'
    
    def get_track(self, distance):
        tracks = self.reconn.zrevrangebyscore(self.len_tracks_key_pattern % distance, int(time.time() * 1000) + 1, 0)
        if not tracks:
            return None
        track = random.choice(tracks)
        return json.loads(base64.b64decode(track).decode())
    
    def close(self):
        self.reconn.close()

class BrowserPool:
    def __init__(self, pool_size=1, max_pool_size=5):
        self.pool = Queue()
        self.pool_size = pool_size
        self.max_pool_size = max_pool_size
        self.count = 0
        self.count_lock = threading.Lock()
        self.full_lock = threading.Lock()
        options = webdriver.ChromeOptions()
        options.add_experimental_option('excludeSwitches', ["enable-automation", "test-type", "disable-default-apps"])
        options.add_experimental_option('w3c', False)
        options.add_argument('--disable-infobars')
        options.add_argument('--disable-sync')
        options.add_argument('--bwsi')
        options.add_argument('--start-maximized')
        options.add_argument('--disable-java')
        options.add_argument('--no-sandbox')
        options.add_argument('--disable-pinch')
        options.add_argument('--allow-insecure-websocket-from-https-origin')
        options.add_argument('--disable-web-security')
        options.add_argument("--proxy-server='direct://'");
        options.add_argument("--proxy-bypass-list=*");
        other_arguments = ["--ignore-certificate-errors", "--enable-quic", "--disable-client-side-phishing-detection", "--safebrowsing-disable-download-protection", "--safebrowsing-disable-auto-update"]
        for a in other_arguments:
            options.add_argument(a)
        options.headless = prod or _platform == 'linux'
        self.browser_options = options
        self.browser_pool_generate()
        logger.info('browser pool inited')
    
    def is_full(self):
        return self.count >= self.max_pool_size
    
    def __get(self):
        try:
            browser = self.pool.get(block=False)
            if not browser:
                return None
            try:
                browser.get('about:blank')
                return browser
            except Exception as e:
                logger.error('get a broken browser from pool, ' % traceback.format_exc())
                raise e
        except:
            return None
    
    def get(self, wait_seconds):
        browser = self.__get()
        if browser:
            return browser
        if not self.is_full():
            self.full_lock.acquire()
            try:
                if not self.is_full():
                    browser = self.__get()
                    if browser:
                        return browser
                    return self.browser_generate()
                else:
                    start = int(time.time())
                    while self.is_full() and int(time.time()) - start <= wait_seconds:
                        browser = self.__get()
                        if browser:
                            return browser
                        time.sleep(.5)
                    browser = self.pool.__get()
                    if not browser and not self.is_full():
                        browser = self.browser_generate()
                    return browser
            finally:        
                self.full_lock.release()
            
    
    def browser_pool_generate(self):
        for _ in range(self.pool_size):
            browser = self.browser_generate()
            if browser:
                self.pool.put(browser)
    
    def browser_generate(self):
        start = int(time.time() * 1000)
        succ = True
        try:
            if prod or _platform == 'linux':
                browser = webdriver.Chrome(executable_path='chromedriver', options=self.browser_options)
            else:
                browser = webdriver.Chrome(executable_path='chromedriver.exe', options=self.browser_options)
            while 1:
                try:
                    browser.get('about:blank')
                    break
                except Exception as e:
                    raise e
                finally:
                    time.sleep(.5)
            return browser
        except Exception as e:
            logger.error(traceback.format_exc())
            succ = False
        finally:
            if succ:
                self.count_lock.acquire()
                self.count += 1
                self.count_lock.release()
            logger.info('generate browser cost %d' % (int(time.time() * 1000) - start))
    
    def close_browser(self, browser, broken=False):
        if not browser:
            return
        if not broken:
            try:
                if prod:
                    browser.get('about:blank')
            except:
                broken = True
        if broken:
            try:
                browser.close()
            except:
                pass
            self.count_lock.acquire()
            self.count -= 1
            self.count_lock.release()
            self.pool.put(self.__browser_generate())
        else:
            self.pool.put(browser)
    
    def close(self):
        while self.pool:
            try:
                self.__get().close()
            except:
                pass

class SlideDistanceLocator:
    
    def loc(self, big, small, top, d=5):
        big = big.crop((0, top + d, big.size[0], top + small.size[1] - d))
        small = small.crop((d, d, small.size[0] - d, small.size[1] - d))

        target = np.array(small)
        target = target[:, :, ::-1].copy()
        template = np.array(big)
        template = template[:, :, ::-1].copy()
        target = cv2.cvtColor(target, cv2.COLOR_BGR2GRAY)
        template = cv2.cvtColor(template, cv2.COLOR_BGR2GRAY)
        w, h = target.shape[::-1]
        target = abs(255 - target)
        result = cv2.matchTemplate(target, template, cv2.TM_CCOEFF_NORMED)
        return np.unravel_index(result.argmax(), result.shape)
            
class Predicter:
    def __init__(self, *agrs, **kwargs):
        
        self.screenshot_save_dir = 'screenshot'
        if not os.path.exists(self.screenshot_save_dir):
            os.mkdir(self.screenshot_save_dir)
        
        # 点选模型加载
        self.hanzi_click_yolo_model = YOLO(model_path='geeTest/hanzi_click_position.h5', anchors_path='geeTest/hanzi_click_anchors.txt',
                               classes_path='geeTest/hanzi_click_classes.txt', score=0.5, iou=0.5)
        self.hanzi_click_target_char_predicter = hanzi_click_target_char.Predicter()
        self.hanzi_click_ques_predicter = hanzi_click_ques.Predicter()
        self.d = 40
        
        # 验证首页
        if prod:
            domain = ''
        else:
            domain = ''
        self.gee_index_url = 'http://' + domain + '/geeTest/index?challenge=%s&gt=%s&new_captcha=%d&referer=%s'
        # 取结果接口
        self.gee_result_url = 'http://' + domain + '/geeTest/index/cache?sessionId=%s'
        
        # 启动浏览器池
        if prod:
            self.browser_pool = BrowserPool(pool_size=2, max_pool_size=5)
        else:
            self.browser_pool = BrowserPool(pool_size=1, max_pool_size=1)
        
        # 滑动轨迹
        self.slide_tracks = SlideTracks()
        
        # 滑动距离
        self.slide_distance_locator = SlideDistanceLocator()
    
    def save_screenshot(self, browser, filename):
        try:
            browser.get_screenshot_as_file('%s/%s' % (self.screenshot_save_dir, filename))
        except:
            logger.error('save screenshot %s error, %s' % (filename, str(traceback.format_exc())))
    
    def predict_captcha(self, params):
        params = base64.b64decode(params).decode()
        params = json.loads(s=params)
        _type = params['type']
        try:
            if 'gee_2' == _type:
                return self.gee_2(params)
            if 'gee_3' == _type:
                return self.gee_3(params)
        except Exception as e:
            logger.error('%s handle %s failed, %s' % (params['session_id'], str(params), str(traceback.format_exc())))
            print(traceback.format_exc())
            return json.dumps({'success': False, 'message': 'error occured'})
        return None
    
    def order_bg(self, bg_byts):
        bg_img = Image.open(io.BytesIO(bg_byts))
        height = bg_img.size[1]
        half_height = height // 2
        w = 10
        top_imgs = []
        down_imgs = []
        for i in range(26):
            x = (w + 2) * i + 1
            top_imgs.append(bg_img.crop((x, 0, x + w, half_height)))
            down_imgs.append(bg_img.crop((x, half_height, x + w, height)))

        base_img = Image.new('RGB', (260, height), (255, 255, 255))
        top_orders = [13, 12, 22, 23, 15, 14, 20, 21, 9, 8, 24, 25, 7, 6, 2, 3, 1, 0, 10, 11, 5, 4, 18, 19, 17, 16]
        down_orders = [12, 13, 23, 22, 14, 15, 21, 20, 8, 9, 25, 24, 6, 7, 3, 2, 0, 1, 11, 10, 4, 5, 19, 18, 16, 17]
        for i in range(26):
            region = down_imgs[top_orders[i]]
            box = (i * w, 0, region.size[0] + (i * w), half_height)
            base_img.paste(region, box)
            region = top_imgs[down_orders[i]]
            box = (i * w, half_height, region.size[0] + (i * w), height)
            base_img.paste(region, box)
        return base_img
    
    def gee_2(self, params):
        start_time = now()
        browser = self.browser_pool.get(10)
        if not browser:
            logger.error(params['session_id'] + ' can not get a browser')
            return json.dumps({'success': False, 'message': 'server is busy'})
        log_cost(params['session_id'], 'get browser', start_time)
        start_time = now()
        try:
            logger.info(params['session_id'] + ' start to crack')
            browser.get(self.gee_index_url % (params['challenge'], params['gt'], 0, params['referer']))
            retry = 1
            track = None
            while retry > 0:
                retry -= 1
                #WebDriverWait(browser, 5).until(EC.visibility_of_element_located((By.CSS_SELECTOR, 'div.gt_slider_knob')))
                #WebDriverWait(browser, 5).until(EC.visibility_of_element_located((By.CSS_SELECTOR, 'div.gt_cut_fullbg')))
                #WebDriverWait(browser, 5).until(EC.visibility_of_element_located((By.CSS_SELECTOR, 'div.gt_slider')))
                #def gt_cut_fullbg_slice_loaded_ec(_browser):
                #    elms = _browser.find_elements_by_css_selector('div.gt_cut_fullbg_slice')
                #    return elms and len(elms) == 52 and len([elm for elm in elms if elm.is_displayed()]) == 52
                #WebDriverWait(browser, 5).until(gt_cut_fullbg_slice_loaded_ec)
                #log_cost(params['session_id'], 'load complete', start_time)
                #start_time = now()
                #slice_img_url = browser.find_elements_by_css_selector('div.gt_slice.gt_show')[0].get_attribute('style')
                #top = slice_img_url[slice_img_url.find('top:') + 4:]
                #top = int(top[:top.find('px')].strip())
                #slice_img_url = slice_img_url[slice_img_url.find('url("') + 5:]
                #slice_img_url = slice_img_url[:slice_img_url.find('"')]
                #bg_img_url = slice_img_url.replace('.png', '.webp').replace('.jpg', '.webp').replace('slice' , 'bg')
                
                session_id = browser.execute_script('return (function(){try{return proxy_sessionId;}catch(e){return null;}})();')
                result_url = self.gee_result_url % session_id
                start = start_time = now()
                failed = True
                while int(time.time() * 1000) - start < 5000:
                    r = requests.get(result_url).text
                    if not r or '{}' == r:
                        time.sleep(.5)
                        continue
                    logger.info(params['session_id'] + ' get.php result ' + r)
                    r = json.loads(r)
                    if not 'get' in r:
                        time.sleep(.5)
                        continue
                    r = r['get']
                    r = r[r.find('(') + 1:-1]
                    r = json.loads(r)
                    if 'status' in r and not 'success' == r['status']:
                        logger.error(params['session_id'] + ' get.php return status not success')
                        return json.dumps({'success': False, 'message': 'load page failed'})
                    if 'data' in r:
                        r = r['data']
                    if not 'slice' in r:
                        time.sleep(.5)
                        continue
                    bg_img_url = ('http://%s/%s' % (r['static_servers'][0], r['bg'])).replace('.jpg', '.webp')
                    slice_img_url = 'http://%s/%s' % (r['static_servers'][0], r['slice'])
                    top = r['ypos']
                    failed = False
                    break
                if failed:
                    logger.error(params['session_id'] + ' get images timeout')
                    return json.dumps({'success': False, 'message': 'load page timeout'})
                
                slice_img_byts = requests.get(slice_img_url).content
                bg_img_byts = requests.get(bg_img_url).content
                bg_img = self.order_bg(bg_img_byts)
                slice_img = Image.open(io.BytesIO(slice_img_byts))
                distance = self.slide_distance_locator.loc(bg_img, slice_img, top)[1] - 5
                log_cost(params['session_id'], 'get slide distance', start_time)
                start_time = now()
                
                track = self.slide_tracks.get_track(distance)
                if not track: # 点击刷新按钮
                    if retry <= 0:
                        self.save_screenshot(browser, '%s.png' % params['session_id'])
                        return json.dumps({'success': False, 'message': 'no match track found over retry 3 times. give up'})
                    logger.info(params['session_id'] + ' no match track, retry')
                    browser.find_elements_by_css_selector('a.gt_refresh_button')[0].click()
                    continue
                def gt_cut_fullbg_slice_loaded_ec(_browser):
                    elms = _browser.find_elements_by_css_selector('div.gt_cut_fullbg_slice')
                    return elms and len(elms) == 52 and len([elm for elm in elms if elm.is_displayed()]) == 52
                WebDriverWait(browser, 5).until(gt_cut_fullbg_slice_loaded_ec)
                slideBtn = browser.find_element_by_css_selector('div.gt_slider div.gt_slider_knob')
                ActionChains(browser).move_to_element_with_offset(slideBtn, 5, 5).perform()
                ActionChains(browser).click_and_hold(slideBtn).perform()
                log_cost(params['session_id'], 'begin to slide', start_time)
                start_time = now()
                for t in track:
                    ActionChains(browser).move_by_offset(t[0], t[1]).perform()
                    time.sleep(t[2] / 1000)
                log_cost(params['session_id'], 'slide done ', start_time)
                time.sleep(random.randint(50, 500) / 1000)
                ActionChains(browser).release().perform()

                session_id = browser.execute_script('return (function(){try{return proxy_sessionId;}catch(e){return null;}})();')
                result_url = self.gee_result_url % session_id
                start = int(time.time() * 1000)
                failed = False
                while int(time.time() * 1000) - start < 5000:
                    r = requests.get(result_url).text
                    if not r or '{}' == r:
                        time.sleep(.5)
                        continue
                    logger.info(params['session_id'] + ' crack result ' + r)
                    r = json.loads(r)
                    if not 'result' in r:
                        time.sleep(.5)
                        continue
                    get_r = r['get']
                    r = r['result']
                    r = r[r.find('(') + 1:-1]
                    r = json.loads(r)
                    if r['success'] == 0:
                        failed = True
                    else:
                        result = r['validate']
                        get_r = get_r[get_r.find('(') + 1:-1]
                        get_r = json.loads(get_r)
                        challenge = get_r['challenge']
                    break
                if failed:
                    if retry <= 0:
                        self.save_screenshot(browser, '%s.png' % params['session_id'])
                        return json.dumps({'success': False, 'message': 'try to slide but failed over retry 3 times. give up'})
                    logger.info(params['session_id'] + ' slide failed, retry')
                    def gt_hide_displayed(_browser):
                        elms = _browser.find_elements_by_css_selector('div.gt_hide')
                        return elms and not elms[0].is_displayed()
                    WebDriverWait(browser, 5).until(gt_hide_displayed)
                    browser.find_elements_by_css_selector('a.gt_refresh_button')[0].click()
                    continue
                if not result:
                    return json.dumps({'success': False, 'message': 'can not handle crack result'})
                result = {'valid': result, 'challenge': challenge}
                return json.dumps(result)
        except Exception as e:
            self.save_screenshot(browser, '%s.png' % params['session_id'])
            logger.error(params['session_id'] + "-->" + str(traceback.format_exc()))
            raise e
        finally:
            self.browser_pool.close_browser(browser)
    
    def gee_3(self, params):
        start_time = now()
        browser = self.browser_pool.get(10)
        if not browser:
            logger.error(params['session_id'] + ' can not get a browser')
            return json.dumps({'success': False, 'message': 'server is busy'})
        log_cost(params['session_id'], 'get browser', start_time)
        start_time = now()
        try:
            logger.info(params['session_id'] + ' start to crack')
            browser.get(self.gee_index_url % (params['challenge'], params['gt'], 1, params['referer']))
            
            try:
                WebDriverWait(browser, 5).until(EC.visibility_of_element_located((By.CSS_SELECTOR, 'div.geetest_radar_btn')))
            except:
                logger.error('can not locate div.geetest_radar_btn, give it a try any way')
            browser.find_element_by_css_selector('div.geetest_radar_btn').click()
            session_id = browser.execute_script('return (function(){try{return proxy_sessionId;}catch(e){return null;}})();')
            result_url = self.gee_result_url % session_id
            failed = False
            next_phrase = None
            result = None
            start = now()
            while int(time.time() * 1000) - start < 5000:
                r = requests.get(result_url).text
                if not r or '{}' == r:
                    time.sleep(.5)
                    continue
                logger.info(params['session_id'] + ' first phrase crack result ' + r)
                r = json.loads(r)
                if not 'get' in r:
                    time.sleep(.5)
                    continue
                get_r = r['get']
                get_r = get_r[get_r.find('(') + 1:-1]
                get_r = json.loads(get_r)
                if 'status' in get_r and not 'success' == get_r['status']:
                    logger.error(params['session_id'] + ' get.php return status not success')
                    return json.dumps({'success': False, 'message': 'load page failed'})
                if not 'result' in r:
                    time.sleep(.5)
                    continue
                get1_r = None
                if 'get1' in r:
                    get1_r = r['get1']
                r = r['result']
                r = r[r.find('(') + 1:-1]
                r = json.loads(r)
                if not r['status'] == 'success':
                    failed = True
                else:
                    r = r['data']
                    phrase_result = r['result']
                    if phrase_result == 'success':
                        result = r['validate']
                    else:
                        next_phrase = phrase_result
                        if 'click' == next_phrase:
                            if not get1_r:
                                time.sleep(.5)
                                continue
                            get1_r = get1_r[get1_r.find('(') + 1:-1]
                            get1_r = json.loads(get1_r)
                            if 'data' in get1_r:
                                get1_r = get1_r['data']
                            pic_type = get1_r['pic_type']
                            if not pic_type in ['word', 'phrase']:
                                next_phrase = 'click pic type %s' % pic_type
                break
            if failed:
                self.save_screenshot(browser, '%s.png' % params['session_id'])
                return json.dumps({'success': False, 'message': 'crack failed'})
            if next_phrase:
                if next_phrase == 'click':
                    return self.gee_3_click(browser, params)
                elif next_phrase == 'slide':
                    return self.gee_3_slide(browser, params)
                else:
                    return json.dumps({'success': False, 'message': 'unsupport crack type %s' % next_phrase})
            if not result:
                return json.dumps({'success': False, 'message': 'can not handle crack result'})
            logger.info('crack pass on first phrase')
            result = {'valid': result, 'challenge': params['challenge']}
            return json.dumps(result)
        except Exception as e:
            self.save_screenshot(browser, '%s.png' % params['session_id'])
            logger.error(params['session_id'] + "-->" + str(traceback.format_exc()))
            raise e
        finally:
            self.browser_pool.close_browser(browser)
    
    def gee_3_click(self, browser, params):
        failed = True
        start = start_time = now()
        session_id = browser.execute_script('return (function(){try{return proxy_sessionId;}catch(e){return null;}})();')
        result_url = self.gee_result_url % session_id
        while int(time.time() * 1000) - start < 5000:
            r = requests.get(result_url).text
            if not r or '{}' == r:
                time.sleep(.5)
                continue
            logger.info(params['session_id'] + ' get.php result ' + r)
            r = json.loads(r)
            if not 'get1' in r:
                time.sleep(.5)
                continue
            r = r['get1']
            r = r[r.find('(') + 1:-1]
            r = json.loads(r)
            if 'status' in r and not 'success' == r['status']:
                logger.error(params['session_id'] + ' get.php return status not success')
                return json.dumps({'success': False, 'message': 'load page timeout'})
            if 'data' in r:
                r = r['data']
            if not 'pic' in r:
                time.sleep(.5)
                continue
            img_url = ('http://%s/%s' % (r['static_servers'][0], r['pic']))
            failed = False
            break
        if failed:
            logger.error(params['session_id'] + ' get image timeout')
            return json.dumps({'success': False, 'message': 'load page timeout'})
        
        img_byts = requests.get(img_url).content
        img = Image.open(io.BytesIO(img_byts))
        target = img.crop((0, 0, 344, 344))
        ques = img.crop((0, 344, 116, 384))
        
        ques_result = self.hanzi_click_ques_predicter.predict_captcha(ques)
        if len(ques_result) < 2:
            return json.dumps(obj={'success': False, 'message': 'click image chars predict failed.'}, ensure_ascii=False)
        logger.info('quest result %s' % ques_result)
        buf = io.BytesIO()
        target.save(buf, format='jpeg')
        target = Image.open(buf)
        yolo_rs = self.hanzi_click_yolo_model.detect_image2(target)
        yolo_rs.sort(key=lambda elm: elm[1], reverse=True)
        target_result = []
        for j in range(len(yolo_rs)):
            r = yolo_rs[j]
            #print(r)
            center = [(r[3][0] - r[2][0]) // 2 + r[2][0], (r[3][1] - r[2][1]) // 2 + r[2][1]]
            x = center[0] - self.d
            if x < 0:
                x = 0
            y = center[1] - self.d
            if y < 0:
                y = 0
            w = center[0] + self.d
            if y > target.size[0]:
                w = target.size[0]
            h = center[1] + self.d
            if h > target.size[1]:
                h = target.size[1]
            split_buf = io.BytesIO()
            target.crop((x, y, w, h)).save(split_buf, format='jpeg')
            target_result.append((self.hanzi_click_target_char_predicter.predict_captcha(Image.open(split_buf)), r, split_buf))

        mappered_target_result = {}
        not_in_ques = set()
        for tr in target_result:
            n = tr[0]
            if not n in mappered_target_result:
                mappered_target_result[n] = []
            mappered_target_result[n].append(tr)
            if not n in ques_result:
                not_in_ques.add(n)
        final_target_result = {}
        for n in ques_result:
            if n in mappered_target_result:
                final_target_result[n] = mappered_target_result[n][0]

        for n in ques_result:
            if n in final_target_result:
                continue
            else:
                if not_in_ques:
                    final_target_result[n] = mappered_target_result[not_in_ques.pop()][0]
                else:
                    # 无法补全, 失败
                    return json.dumps({'success': False, 'message': 'click image chars predict failed, no enough targets'})
                    #final_target_result[n] = None
        if None in final_target_result or len(final_target_result) != len(ques_result):
            return json.dumps(obj={'success': False, 'message': 'click postion predict failed.'}, ensure_ascii=False)
        final_target_result = [final_target_result[n] for n in ques_result]
        logger.info('final target result %s' % str(final_target_result))
        final_target_result = [tr[1] for tr in final_target_result]
        final_target_result = [([int(r[3][0] - r[2][0]) // 2 + int(r[2][0]), int(r[3][1] - r[2][1]) // 2 + int(r[2][1])]) for r in final_target_result]
        log_cost(params['session_id'], 'predict position', start_time)
        start_time = now()
        try:
            WebDriverWait(browser, 5).until(EC.visibility_of_element_located((By.CSS_SELECTOR, 'img.geetest_item_img')))
        except:
            logger.error('can not locate img.geetest_item_img, give it a try any way')
        
        img_elm = browser.find_element_by_css_selector('img.geetest_item_img')
        for tr in final_target_result:
            ActionChains(browser).move_to_element_with_offset(img_elm, tr[0] - random.randint(-10, 10), tr[1] - random.randint(-10, 10)).perform()
            time.sleep(0.03)
            ActionChains(browser).move_to_element_with_offset(img_elm, tr[0], tr[1]).perform()
            #ActionChains(browser).move_by_offset(tr[0], tr[1]).perform()
            time.sleep(random.randint(50, 100) / 1000)
            ActionChains(browser).click().perform()
            time.sleep(random.randint(50, 500) / 1000)
        #time.sleep(random.randint(500, 1000) / 1000)
        #browser.execute_script('(function(){jQuery("a.geetest_commit")[0].click()})()')
        WebDriverWait(browser, 3).until(EC.element_to_be_clickable((By.CSS_SELECTOR, 'a.geetest_commit')))
        commit_elm = browser.find_element_by_css_selector('a.geetest_commit')
        ActionChains(browser).move_to_element_with_offset(commit_elm, random.randint(3, 10), random.randint(3, 10)).perform()
        time.sleep(0.03)
        commit_elm.click()
        return self.gee_3_parse_second_phrase_result(browser, params)
    
    def gee_3_parse_second_phrase_result(self, browser, params):
        session_id = browser.execute_script('return (function(){try{return proxy_sessionId;}catch(e){return null;}})();')
        result_url = self.gee_result_url % session_id
        failed = False
        result = None
        start = now()
        while int(time.time() * 1000) - start < 5000:
            r = requests.get(result_url).text
            if not r or '{}' == r:
                time.sleep(.5)
                continue
            logger.info(params['session_id'] + ' second phrase crack result ' + r)
            r = json.loads(r)
            if not 'result1' in r:
                time.sleep(.5)
                continue
            get_r = r['get1']
            r = r['result1']
            r = r[r.find('(') + 1:-1]
            r = json.loads(r)
            
            if 'status' in r:
                if not r['status'] == 'success':
                    failed = True
                    break
                r = r['data']
                if not r['result'] == 'success':
                    result = r['result']
                    failed = True
                    break
                result = r['validate']
            else:
                if not r['message'] == 'success':
                    result = r['message']
                    failed = True
                else:
                    result = r['validate']
            if result and not failed:
                get_r = get_r[get_r.find('(') + 1:-1]
                get_r = json.loads(get_r)
                if 'data' in get_r:
                    get_r = get_r['data']
                if 'challenge' in get_r:
                    challenge = get_r['challenge']
                else:
                    challenge = params['challenge']
            break
        if failed:
            self.save_screenshot(browser, '%s.png' % params['session_id'])
            return json.dumps({'success': False, 'message': 'crack failed%s' % ('' if not result else (', final result ' + result)) })
        if not result:
            return json.dumps({'success': False, 'message': 'can not handle crack result'})
        result = {'valid': result, 'challenge': challenge}
        return json.dumps(result)
    
    def gee_3_slide(self, browser, params):
        session_id = browser.execute_script('return (function(){try{return proxy_sessionId;}catch(e){return null;}})();')
        result_url = self.gee_result_url % session_id
        start = start_time = now()
        failed = True
        while int(time.time() * 1000) - start < 5000:
            r = requests.get(result_url).text
            if not r or '{}' == r:
                time.sleep(.5)
                continue
            logger.info(params['session_id'] + ' get.php result ' + r)
            r = json.loads(r)
            if not 'get1' in r:
                time.sleep(.5)
                continue
            r = r['get1']
            r = r[r.find('(') + 1:-1]
            r = json.loads(r)
            if 'status' in r and not 'success' == r['status']:
                logger.error(params['session_id'] + ' get.php return status not success')
                return json.dumps({'success': False, 'message': 'load page timeout'})
            if 'data' in r:
                r = r['data']
            if not 'slice' in r:
                time.sleep(.5)
                continue
            bg_url = ('http://%s/%s' % (r['static_servers'][0], r['bg'])).replace('.jpg', '.webp')
            slice_url = 'http://%s/%s' % (r['static_servers'][0], r['slice'])
            top = r['ypos']
            failed = False
            break
        if failed:
            logger.error(params['session_id'] + ' get images timeout')
            return json.dumps({'success': False, 'message': 'load page timeout'})
        
        slice_img_byts = requests.get(slice_url).content
        bg_img_byts = requests.get(bg_url).content
        bg_img = self.order_bg(bg_img_byts)
        slice_img = Image.open(io.BytesIO(slice_img_byts))
        start_time = now()
        distance = self.slide_distance_locator.loc(bg_img, slice_img, top)[1] - 5
        log_cost(params['session_id'], 'get slide distance %d' % distance, start_time)
        start_time = now()
        track = self.slide_tracks.get_track(distance)
        if not track: # 点击刷新按钮
            self.save_screenshot(browser, '%s.png' % params['session_id'])
            logger.info(params['session_id'] + ' no match track')
            return json.dumps({'success': False, 'message': 'no match track found. give up'})
        try:
            WebDriverWait(browser, 5).until(EC.visibility_of_element_located((By.CSS_SELECTOR, 'div.geetest_slider_button')))
        except:
            logger.error('can not locate div.geetest_slider_button, give it a try any way')
        slideBtn = browser.find_element_by_css_selector('div.geetest_slider_button')
        ActionChains(browser).move_to_element_with_offset(slideBtn, 5, 5).perform()
        ActionChains(browser).click_and_hold(slideBtn).perform()
        log_cost(params['session_id'], 'begin to slide', start_time)
        start_time = now()
        for t in track:
            ActionChains(browser).move_by_offset(t[0], t[1]).perform()
            time.sleep(t[2] / 1000)
        log_cost(params['session_id'], 'slide done ', start_time)
        time.sleep(random.randint(50, 500) / 1000)
        ActionChains(browser).release().perform()
        
        return self.gee_3_parse_second_phrase_result(browser, params)
        
    
    def close(self):
        self.yolo_model.close_session()
        self.hanzi_click_target_char_predicter.close()
        self.hanzi_click_ques_predicter.close()
        self.browser_pool.close()
        self.slide_tracks.close()


Using TensorFlow backend.


In [2]:
p = Predicter()

In [1]:

def test_gee_2():
    r = requests.get('http://127.0.0.1:5000/pc-geetest/register?t=1560842865594').text
    r = json.loads(r)
    r['referer'] = 'http://127.0.0.1'
    r['type'] = 'gee_2'
    r['session_id'] = 'test'
    s = time.time()
    input()
    print(p.predict_captcha(base64.b64encode(json.dumps(r).encode())))
    print(time.time() - s)

def test_gee_3_slide():
    r = requests.get('https://passport.ceair.com/cesso/geet!geetInit.shtml').text
    r = json.loads(r)
    r['referer'] = 'http://127.0.0.1'
    r['type'] = 'gee_3'
    r['session_id'] = 'test'
    s = time.time()
    print(p.predict_captcha(base64.b64encode(json.dumps(r).encode())))
    print(time.time() - s)

def test_gee_3_click():
    r = requests.get('https://ais.api.mucfc.com/captchaPreProcess.json').text
    r = json.loads(r)
    r['referer'] = 'http://127.0.0.1'
    r['type'] = 'gee_3'
    r['session_id'] = 'test'
    s = time.time()
    print(p.predict_captcha(base64.b64encode(json.dumps(r).encode())))
    print(time.time() - s)
#test_gee_2()
#test_gee_3_slide()
#test_gee_3_click()