In [5]:
# import multiprocessing
import requests
from scrapy.selector import Selector
import pandas as pd
import numpy as np
import re
from multiprocessing import Pool
from fake_useragent import UserAgent
from sqlalchemy import create_engine
from concurrent.futures import ThreadPoolExecutor


In [6]:
engine = create_engine("mysql+pymysql://root:123456@localhost:3306/fang", encoding='utf8')

In [3]:
def test_pagelist():
    pagelist=[]
    pagelist.append([0,40])
    for i in range(40,60,20):
        pagelist.append([i,i+20])
    for i in range(60,200,10):
        pagelist.append([i,i+10])
    for i in range(200,250,25):
        pagelist.append([i,i+25])
    for i in range(250,600,50):
        pagelist.append([i,i+50])
    for i in range(600,3000,400):
        pagelist.append([i,i+400])
    for i in range(3000,5000,2000):
        pagelist.append([i,i+2000])
    # print(pagelist)
    return pagelist


In [4]:
#全局变量
# headers={'User-Agent':'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36'}
ua=UserAgent()
headers = { 'User-Agent':ua.random}

# districts=['jiangbei','yubei','shapingba','jiulongpo','nanan','dadukou','banan','beibei']
districts_test=['beibei']

In [5]:
#首先分区域，按价格分页面进行查询，确认不存在超出3000套房源的页面
def test_if_more_than_3000(districts,pagelist):
    for district in districts:
        for page in pagelist:
            bp=page[0]
            ep=page[1]
            url=f'https://cq.lianjia.com/ershoufang/{district}/bp{bp}ep{ep}/'
            resp1=requests.get(url,headers =headers)
            find_all_houses=int(Selector(resp1).css('h2.total.fl span::text').extract_first())
            if find_all_houses ==0:
                continue
            if find_all_houses>3000:
                print(f'{district}--{bp}--{ep}--------超出3000套')
        print(district+f'查找完毕!')


In [6]:
def pd_to_sql(dataframe,table_name,engine,if_exists_method='append',index_true_or_false=False):
    dataframe.to_sql(f'{table_name}', con=engine, if_exists=f'{if_exists_method}', index=index_true_or_false)


In [7]:
#得到页面中房源总数，返回应该划分的页面数
def get_total_fang_nums(headers,bp,ep,district,num_per_page):
    url=f'https://cq.lianjia.com/ershoufang/{district}/bp{bp}ep{ep}/'
    resp=requests.get(url,headers = headers)
    #首先查询共找到的房源套数
    find_houses_nums=int(Selector(resp).css('h2.total.fl span::text').extract_first())
    if find_houses_nums ==0:
        return 0
    #每页为30条数据
    elif find_houses_nums%num_per_page !=0:
        return find_houses_nums//num_per_page+1
    else:
        return find_houses_nums//num_per_page

In [8]:
def get_response_from_district_pg_bp_ep_(district,pg,bp,ep,headers):
    url=f'https://cq.lianjia.com/ershoufang/{district}/pg{pg}bp{bp}ep{ep}/'
    response=requests.get(url,headers = headers)
    return response

In [9]:
def test_response(response):
    if response.status_code !=200:
        print('ERROR')


In [10]:
def get_dataframe_ready(sel):
    #准备空dataframe，用于每页的数据存储
    houses_dataframe=pd.DataFrame(columns=('community_name','page_link','location',
                                 'total_price','unit_price','room_nums',
                                 'built_area','built_time','board_time','last_deal','loan_or_not','house_type'))
    for item in sel:
        house_info={}
        try:
            community_name,location=item.css('div.positionInfo a::text').extract()
            house_info['community_name']=community_name

            page_link=item.css('div.title a::attr(href)').extract_first()
            house_info['page_link']=page_link

            house_info['location']=location

            total_price=int(float(item.css('div.totalPrice span::text').extract_first()))
            house_info['total_price']=total_price
            unit_price=item.css('div.unitPrice span::text').extract_first()
            unit_price=int(float(re.findall('单价(\d+)元',unit_price)[0]))
            house_info['unit_price']=unit_price

            house_informations=item.css('div.houseInfo::text').extract_first().split('|')
            room_nums=house_informations[0].strip()

            house_info['room_nums']=room_nums
            built_area=int(re.match('(\d+)(\\.\d+)?平米',house_informations[1].strip()).group(1))
            house_info['built_area']=built_area

            built_time=house_informations[5].strip()
            built_time=None if '年建' not in built_time else (re.match('(\d+)年建',built_time).group(1))
            house_info['built_time']=built_time

            resp2=requests.get(page_link,headers = headers)
            response_in_link=Selector(response=resp2)

            board_time=response_in_link.css('span:contains("挂牌时间")+span::text').extract_first()
            house_info['board_time']=board_time
            last_deal=response_in_link.css('span:contains("上次交易")+span::text').extract_first()
            last_deal=None if last_deal =="暂无数据" else last_deal
            house_info['last_deal']=last_deal

            loan_or_not=response_in_link.css('span:contains("抵押信息")+span::text').extract_first().strip()

            loan_or_not=None if loan_or_not =="暂无数据" else(re.match('(\w)抵押.*',loan_or_not).group(1))
            house_info['loan_or_not']=loan_or_not

            house_type=response_in_link.css('span:contains("房屋用途")+span::text').extract_first().strip()
            house_info['house_type']=house_type
        except:
            print(f'{page_link}   获取失败！')
            return None

        houses_dataframe=houses_dataframe.append(house_info,ignore_index=True)
        return houses_dataframe


In [11]:
def main(pagelist):
    for district in districts_test:
        for page in pagelist:
            bp=page[0]
            ep=page[1]
            for pg in range(1,get_total_fang_nums(headers,bp,ep,district,30)+1):
                response=get_response_from_district_pg_bp_ep_(district,pg,bp,ep,headers)
                sel=Selector(response).css('li.clear.LOGVIEWDATA')
                dataframe=get_dataframe_ready(sel)
                if dataframe is None:
                    continue
                else:
                    try:
                        pd_to_sql(dataframe,district,engine,'append',False)
                        print(f'{district}-pg{pg}-bp{bp}--ep{ep}数据已写入table{district}')
                    except:
                        print(f'{district}-pg{pg}-bp{bp}--ep{ep}-------------数据写入失败！！！')

In [None]:
if __name__ == '__main__':
    with ThreadPoolExecutor(max_workers=6) as executor:
        executor.map(main, test_pagelist())
        
#     pool = Pool(4)
#     pool.map(main,test_pagelist())
#     pool.close()
#     pool.join()

In [14]:
#清空数据库数据专用
def roll_back_table(table_name):
    houses=pd.DataFrame(columns=('community_name','page_link','location',
                                             'total_price','unit_price','room_nums',
                                             'built_area','built_time','board_time','last_deal','loan_or_not','house_type'))
    houses.to_sql(f'{table_name}', con=engine, if_exists='replace', index=False)
    print(f'table{table_name}已清空还原')

roll_back_table('beibei')

tablebeibei已清空还原
