In [None]:
'''
発火点は
・allScrapings
・reaggregateNotColsedByCsv

allScrapings は実行するだけで Yahoo!映画から全件取得して csv に書き出す.

reaggregate は published_state が上映終了ではない作品の再集計を行う
'''

In [48]:
# -*- coding: UTF-8 -*
import requests
from bs4 import BeautifulSoup as bs
import time
import logging
import copy

logger = logging.getLogger(__name__)

BASE_URL = "https://movies.yahoo.co.jp/movie/"
PAST_QUERY = "?&roadshow_flg=0&img_type=2&sort=year"
LATEST_QUERY = "?&roadshow_flg=0&img_type=2&sort=-year"
# 上映ステータス
CLOSED = 0
ON_AIR = 1
COMING_SOON = 5
NOT_PRODUCT = 100

# ディレイ
REQUEST_DELAY = 1

# デフォルトターゲットファイル
TARGET_FILE = 'yahoo-movies-output.csv'

In [580]:
'''
published_state: [
    0,    #上映終了
    1,    #上映中
    5,    #上映予定
    100,  #未製作
]
余裕があったら必要項目の外出しとかしたい
'''

'\n残りタスク\nページング\n各評価点\n評価数\nキャスト\nスケジュール（何をとるかは決めていない）\n公開ステータス取得\nまず、あらすじがあるかないか判断する\nあらすじがなく、ラベルがない場合は製作だけ決まっているので更新対象\nあらすじがあり、ラベルがない場合は上映が終わっているのでFIX\nラベルがある場合は、ラベルによって更新するものを変更する\npublished_state: [\n    0,    #上映終了\n    1,    #上映中\n    5,    #上映予定\n    100,  #未製作\n]\n余裕があったら必要項目の外出しとかしたい\n'

In [866]:
# scraping 関数
# トップレベルの情報を集めるやつ
def scrapingTop(page, is_past=False):
    # set
    url = BASE_URL
    if is_past:
        url += PAST_QUERY
    else:
        url += LATEST_QUERY
    
    url +=  '&page=' + str(page)
    soup = getSoupWithDelay(url)
    return parseProductWithNest(soup) 

# トップページのサムネ群のパース
# thumbnail--movies 以下が渡って来ないと多分困る
def parseProductWithNest(soup):
    thums = soup.find('ul', class_ = 'thumbnail--movies')
    movies = thums.find_all('li', class_='col')
    
    # id , title, 制作年度, 制作国, ジャンルs, イメージワード, 監督, 原作 をひとまとまりにする.
    products = {}
    # 作品トップ
    for movie in movies:
        movie_id, product, product_soup = parseProductTopByThumbSoup(movie)
        # 未製作以外は詳細情報を集める
        # 上映中と上映終了は同じデータを集める
        # 未上映であれば、情報を絞って集める
        if product['published_state'] < NOT_PRODUCT:
            # 製作が済んでいる
            movie_details = getMovieDetailsOnProduct(product_soup, product['product_url'])
            product.update(movie_details)
            print('ONPURODUCT: ', product['title'])
        else:
            # 未製作映画
            print('未製作映画', product['title'])
        
        _product = {}
        for s, t in product.items():
            _product.update({
                s: convert2Json(t)
            })
        products[movie_id] = _product

    # 次ページがあるか判定
    existNextPage = True
    next_page = soup.find('ul', class_='pagination').find_all('li')[-1]
    try:
        if next_page['class'][0] == 'is-disabled':
            # 最後のページ
            existNextPage = False
        else:
            existNextPage = True
    except:
        existNextPage = True
    # movie 詳細　+ トップページの次のページがあるか
    return products, existNextPage

# 作品トップレベル解析
# 個別の作品のデータ取得
def parseProductTopByThumbSoup(movie):
    # thumbnails 一覧ページでわかる情報
    movie_info = {}
    # ユニークID
    movie_id = movie.get('data-cinema-id')
    # タイトル
    title = movie.h3.get('title')
    # 制作年度
    production_year = int(movie.small.text[1:5])
    
    # 作品トップへ
    target_url = BASE_URL + title + '/' + movie_id + '/'
    soup = getSoupWithDelay(target_url)
    # 作品のステータス判定
    published_state = getPublishedState(soup)
    
    # 一旦まとめて辞書更新
    movie_info.update(
        movie_id = int(movie_id),
        title = title,
        production_year = production_year,
        published_state = published_state,
        product_url = target_url
    )
    
    # 作品トップの情報
    product_info = getProductDetails(soup)
    movie_info.update(product_info)
    
    return movie_id, movie_info, soup

# 作品URLで解析する
# 作品 dictionary を返す
def parseProductWithNestByUrl(target_url):
    soup = getSoupWithDelay(target_url)
    movie_id, product = parseProductTopByTargetSoup(soup)
    
    if product['published_state'] < NOT_PRODUCT:
        # 製作が済んでいる
        movie_details = getMovieDetailsOnProduct(soup, product['product_url'])
        product.update(movie_details)
        print('ONPURODUCT: ', product['title'])
    else:
        # 未製作映画
        print('未製作映画', product['title'])
    
    return product

# 作品トップレベル解析
# 作品トップの soup を受け取って解析する
def parseProductTopByTargetSoup(soup):
    movie_info = {}
    '''
    # movie_id
    # title
    # production_year
    # の取得
    '''
    published_state = getPublishedState(soup)
    movie_info.update(
        movie_id = int(movie_id),
        title = title,
        production_year = production_year,
        published_state = published_state,
        product_url = target_url
    )
    
    product_info = getProductDetails(soup)
    movie_info.update(product_info)
    
    return movie_id, movie_info

# 映画トップページ Soup を受けとって、公開ステータスを返す.
def getPublishedState(soup):
    #  movie description がない場合は未製作とみなす!! 決定!!
    description = soup.find('section', class_='movie_description')
    if description is None:
        return NOT_PRODUCT
    
    # 製作はされているので、ラベルがあるか判定する
    if soup.find('span', class_='label bgcolor-B') is not None:
        # blue ラベルなので上映予定
        return COMING_SOON
    elif soup.find('p', class_='icon_showing label bgcolor-YR') is not None:
        # 上映中
        return ON_AIR
    
    # 規定ラベルがないので、上映終了と判断
    return CLOSED
        

# 作品トップのプロダクト概要を集める
def getProductDetails(soup):
    # 作品情報解析
    tbody = soup.find('tbody')
    details = tbody.find_all('tr')

    single_target = ['原題', '製作国', '上映時間']
    multi_target = ['ジャンル', '原作', '脚本', '音楽']
    product_info = {}
    
    for detail in details:
        tar = detail.find('th').text

        # 単独であろう項目群
        if tar in single_target:
            value = detail.find('td').text
            product_info[jpn2Eng(tar)] = value
        # 複数ありえる項目群
        elif tar in multi_target:
            value_list = detail.find_all('li')
            values = list(map(lambda s: s.text, value_list))
            product_info[jpn2Eng(tar)] = values
    
    return product_info


# 製作されているっぽいやつ向けの情報収集機
def getMovieDetailsOnProduct(soup, target_url):
    _movie_details = _getMovieDetailsOnProduct(soup, target_url)
    return _movie_details

# まとめて操作するやつ
def _getMovieDetailsOnProduct(soup, target_url):
        _movie_details = {}
        
        # 詳細取得関数を呼んでマージしていく
        # ページ遷移なし
        _movie_details.update(getImageWords(soup))
        _movie_details.update(getReviewPointAvg(soup))
        _movie_details.update(getReviewPointDistribution(soup))
        _movie_details.update(getReviewPointChart(soup))
        
        # ページ遷移あり
        _movie_details.update(getRundown(soup, target_url))
        _movie_details.update(getCredit(soup, target_url))
        
        return _movie_details

'''
ページ遷移なし
'''
# イメージワード
# 作品トップを受け取る
def getImageWords(soup):
    image_word_list = soup.find('ul', class_='image_words list-inline text-xsmall').find_all('li')
    image_words =  list(map(lambda s: s.text, image_word_list))
    return {'image_words': image_words}
# レビューの平均点
# 作品トップを受け取る
def getReviewPointAvg(soup):
    try:
        review_point_avg = float(soup.find('span', itemprop='ratingValue').text)
    except:
        # 未評価がある
        review_point_avg = None
    return {'review_point_avg': review_point_avg}
# レビューの星得点割合
# 作品トップを受け取る
def getReviewPointDistribution(soup):
    distribution_list = soup.find(
        'ul', class_='rating-distribution text-xsmall').find_all('div', class_='rating-distribution__cell-point')
    # 星に対応
    rate_index = [5, 4, 3, 2, 1]
    distributions = list(map(lambda s: convertToFloat(s.text[:-1]), distribution_list))
    review_point_rate = dict(zip(rate_index, distributions))
    return {'review_point_distribution': review_point_rate}
# 得点分布
# 作品トップを受け取る
def getReviewPointChart(soup):
    chart = soup.find('canvas', class_='rader-chart__figure')
    label = chart['data-chart-label']
    val = chart['data-chart-val-total']
    review_point_distribution = dict(zip(label.split(','), list(map(float, val.split(',')))))
    return {'review_point_chart': review_point_distribution}

# 数字文字列を受けとって float にして返す
# 空文字列だった場合は None にする
def convertToFloat(num_string):
    if num_string == '':
        return None
    else:
        return float(num_string)

'''
ページ遷移あり
''' 
# 解説, あらすじ
# 作品トップを受け取る
def getRundown(soup, target_url):
    story_url = target_url + 'story'
    soup = getSoupWithDelay(story_url)
    sections = soup.find('div', id='story').find_all('section', class_='section')
    
    rundowns = {}
    for section in sections:
        category = ""
        if section.find('h2', class_='text-middle').text == '解説':
            category = jpn2Eng('解説')
        elif section.find('h2', class_='text-middle').text == 'あらすじ':
            category = jpn2Eng('あらすじ')
        elif section.find('h2', class_='text-middle').text == '映画レポート':
            category = jpn2Eng('映画レポート')
        else:
            raise ValueError("想定外のストーリー内セクション")
        
        rundown = {
            category: section.find('p', class_='text-readable').contents[0].strip()
        }
        rundowns.update(rundown)

    return rundowns

# クレジット
# 作品トップを受け取る
def getCredit(soup, target_url):
    credit_url = target_url + 'credit'
    soup = getSoupWithDelay(credit_url)
    
    # キャスト
    casts = []
    try:
        cast_list = soup.find('div', id='cstl').find_all('h3')
        for cast in cast_list:
            casts.append(cast.text)
    except AttributeError:
        casts = None

    # スタッフ
    staffs = []
    try:
        staff_list = soup.find('div', id='stfl').find_all('div', class_='box__cell pl1em')
        for staff in staff_list:
            name = staff.find('h3', class_='text-middle text-break color-sub').text
            try:
                position = d.find('p', class_='text-xsmall no-space-bottom').text
            except AttributeError:
                position = None
            _staff = {
                'name': name,
                'position': position
            }
            staffs.append(_staff)
    except AttributeError:
        staffs = None
    return {
        'casts': casts,
        'staffs': staffs
    }

# url を引数にしてリクエストを飛ばす、sleep の後 soup を返す
def getSoupWithDelay(target_url):
    # アクセスディレイのため時間計測
    start_time = time.time()
    
    response = requests.get(target_url)
    soup = bs(response.content, 'html.parser')

    # 処理時間含めディレイ
    end_time = time.time()
    sleep_time = REQUEST_DELAY - (end_time - start_time)
    if sleep_time > 0:
        time.sleep(sleep_time)
        
    return soup

In [3]:
# 便利関数置き場
# 日本語を英語に変える関数
def jpn2Eng(text):
    cate_dict = {
        '原題': 'origin_title',
        '製作国': 'made_in',
        '上映時間': 'run_time',
        'ジャンル': 'genre',
        '原作': 'origin',
        '脚本': 'charge_of_scenario',
        '音楽': 'charge_of_misic',
        '解説': 'explanation',
        'あらすじ': 'rundown',
        '映画レポート': 'report'
    }
    return cate_dict[text]


# csv 出力するもの
import csv
fieldnames = [
    'movie_id',
    'title',
    'production_year',
    'published_state',
    'product_url',
    'run_time',
    'made_in',
    'genre',
    'origin',
    'charge_of_scenario',
    'charge_of_misic',
    'image_words',
    'review_point_avg',
    'review_point_distribution',
    'review_point_chart',
    'explanation',
    'rundown',
    'casts',
    'staffs',
    'scraped_at'
]

import os
# 辞書配列からcsv出力する, 既存ファイルがあればヘッダーはつけない.
def writeCsvWithDictList(output, filename=TARGET_FILE):
    # 現在時刻をスクレイピングした日時として追加
    list(map(lambda s: s.update({'scraped_at': getStrTime()}), output))
    
    path = './' + filename
    with open(filename, 'a', newline='') as csv_file:
        writer = csv.DictWriter(csv_file, fieldnames, extrasaction='ignore')
        if not os.path.exitsts(path):
            # 初回の書き込みなのでヘッダーをつける
            writer.writeheader()
        writer.writerows(output)

# list か dict ならJSONにして、違ったらそのまま返す関数
import json
def convert2Json(covar):
    if type(covar) == dict or type(covar) == list:
        return json.dumps(covar, ensure_ascii=False)
    else:
        return covar

# 現在時刻取得
import datetime
import dateutil.parser
def getStrTime():
    TIME_FORMAT = "{0:%Y-%m-%d %H:%M:%S}"
    time_str = TIME_FORMAT.format(datetime.datetime.now())
    return time_str

# datetime object にする場合
# dateutil.parser.parse(time_str)



In [35]:
import pandas as pd

def reaggregateOnAirByCsv(filename):
    url_list = getUrlListByCsv(filename, ON_AIR)
    if len(url_list) == 0:
        print('NotFound: OnAir movies in ', 'filename')
        return False
    reaggregate(url_list)
    return True
def reaggregateComingSoonByCsv(filename):
    url_list = getUrlListByCsv(filename, COMINT_SOON)
    if len(url_list) == 0:
        print('NotFound: ComingSoon movies in ', 'filename')
        return False
    reaggregate(url_list)
    return True
def reaggregateNotProductByCsv(filename):
    url_list = getUrlListByCsv(filename, NOT_PRODUCT)
    if len(url_list) == 0:
        print('NotFound: NotProduct movies in ', 'filename')
        return False
    reaggregate(url_list)
    return True
def reaggregateClosedByCsv(filename):
    url_list = getUrlListByCsv(filename, CLOSED)
    if len(url_list) == 0:
        print('NotFound: Closed movies in ', 'filename')
        return False
    reaggregate(url_list)
    return True

def getUrlListByCsv(filename, target_state):
    # csv から published_state が target_state の product_url をリストにする 
    df = pd.read_csv(TARGET_FILE)
    urls = df[df.published_state == '{}'.format(target_state)]['product_url']
    list(urls)
    return urls

def reaggregate(url_list):
    product_list = []
    for target_url in url_list:
        product = parseProductWithNestByUrl(target_url)
        product_list.append(product)
    
    # 一旦 CSV に出力する
    # DB に入れるなら上書きしていく
    writeCsvWithDictList(products)

In [46]:
# 全件取得
# page 指定で途中からも可能
def allScrapings(page=1):
    is_next = True
    error_count = 0
    while is_next:
        try:
            products, existNextPage = scrapingTop(page, False)
            print('page: ', page, 'isNext: ', existNextPage)
            writeCsvWithDictList(products.values())

            if existNextPage:
                page += 1
            else:
                is_next = False
        except e:
            error_count += 1
        
        if error_count >= 10:
            # Error が10回以上出たらとりあえず止める
            print("ErrorCount: ", error_count)
            break
            

    print('FIIIIIIIINIIIIIIIIIIIISH')
    return True

def reaggregateNotColsedByCsv(filename=TARGET_FILE):
    on_air = reaggregateOnAirByCsv(filename)
    coming_soon = reaggregateComingSoonByCsv(filename)
    not_product = reaggregateNotProductByCsv(filename)
    
    print('OnAir: ', on_air)
    print('ComingSoon: ', coming_soon)
    print('NotProduct: ', not_product)
    
    return True

In [49]:
# 映画の重複削除
# 後更新を優先
def deleteDuplicatesMovie(filename=TARGET_FILE):
    try:
        df.sort_values('scraped_at', ascending=False).drop_duplicates(subset='movie_id')
    except e:
        print(e)
        return False
    return True
