In [1]:
from bs4 import BeautifulSoup
from tqdm import tqdm
from time import sleep
from multiprocessing.dummy import Pool, Queue, Lock
import re
import requests
import sys
import gzip
import json
import codecs

In [2]:
lock = Lock()

def log_error(status_code):
    with lock:
        print('ERROR: ' + (requests.status_codes._codes[status_code])[0], file=sys.stderr)

def error(message):
    with lock:
        print(message, file=sys.stderr)

# Шаг 1. Соберем ссылки на игры

In [3]:
host = 'https://gg.deals'
main_url = 'https://gg.deals/games/?sort=metascore&type=1'

In [4]:
def get_page_content(url, n_attempts=5, t_sleep=1, **kwargs):
    for _ in range(n_attempts):
        r = requests.get(url, **kwargs)
        if r.ok and r:
            return r.content
        
        log_error(r.status_code)
        sleep(t_sleep)

    return None

In [5]:
def get_urls(n_pages):
    content = get_page_content(main_url)
    l = []
    for i in tqdm(range(1, n_pages + 1)):
        current_url = main_url + f'&page={i}'
        content = get_page_content(current_url)
        if content is None:
            return None
        soup = BeautifulSoup(content, 'lxml')
        for link in soup.find_all('a', class_=lambda s: s and s.startswith('full-link')):
            l.append(link['href'])
    return l

In [6]:
top = 300
n_pages = top // 48 + 1

urls = get_urls(n_pages)[:300]
print(urls[:10])

100%|████████████████████████████████████████████████████████████████████████████████████| 7/7 [00:06<00:00,  1.13it/s]

['/game/disco-elysium/', '/game/grand-theft-auto-v/', '/game/half-life-2/', '/game/half-life/', '/game/bioshock/', '/game/elden-ring/', '/game/portal-2/', '/game/divinity-original-sin-enhanced-edition/', '/game/the-elder-scrolls-iv-oblivion-game-of-the-year-edition/', '/game/mass-effect-2/']





# Шаг 2. Соберем информацию про игры

In [7]:
def get_name(soup, url):
    return soup.find('a', href=url).find('span', itemprop='name').text

In [8]:
def get_img(soup):
    info = {}

    image = soup.find('img', class_='image-game')['src']
    if image:
        info['image'] = image
    
    link_widget = soup.find('a', class_='game-link-widget')
    if link_widget:
        market_url = requests.get(link_widget['href'])
        info['market_url'] = market_url.url
    
    return info

In [9]:
def get_counts(soup):
    game_collection = soup.find('div', class_=lambda s: s and s.startswith('game-collection-actions'))
    counts_info = soup.find_all('div', class_='game-action-wrap')
    counts = [i.find('span', class_='count').text for i in counts_info]
    
    names = ['wishlist_count', 'alert_count', 'owners_count']
    
    return dict(zip(names, counts))

In [10]:
def get_other_info(soup):
    info = {}
    game_info = soup.find('div', class_=('game-info-content'))
    game_ratings = soup.find('div', class_=('game-info-details-content'))

    for word in ['release', 'developer']:
        data = game_info.find('div', class_=lambda s: s and s.startswith(f'game-info-details-section-{word}')).find('p').text
        if data:
            info[word] = data
    
    scores = soup.find_all('div', class_=r'score-col')
    if scores:
        for block in scores:
            if len(block['class']) > 1:
                break
            score = block.find('span', class_='overlay').text
            title = block.find('div', class_='score-label').text
            info[title] = score
    
    steam = game_ratings.find('a', class_=lambda s: s and s.startswith('score-grade'))
    if steam:
        block = steam.find('span', class_=lambda s: s and s.startswith('reviews-label'))
        if block.contents[0]:
            info['review_label'] = block.contents[0]
        if block['title']:
            info['review_positive_pctg'] = block['title'].split()[0]
        if block.contents[1]:
            info['review_count'] = block.contents[1].text[2:-1]
    
    for word in ['genres', 'tags', 'features']:
        tags = game_info.find('div', id=f'game-info-{word}')
        if tags:
            t_arr = tags.find_all('a')
            info[word] = [el.text for el in t_arr]
    
    platforms = game_info.find('div', class_ =lambda s: s and s.startswith('platform-link')).find_all('svg')
    info['platforms'] = []
    for platform in platforms:
        info['platforms'].append(platform['title'])

    return info

In [11]:
def get_dlcs_and_packs(soup):
    info = {}
    for word in ['dlcs', 'packs']:
        data = soup.find('section', id=f'game-{word}')
        if data:
            content = data.find_all('a', class_=lambda s: s and s.startswith('full-link'))
            if content:
                info[word] = []
                for link in content:
                    info[word].append(host + link['href'])

    return info
    

In [12]:
def get_price_history(soup):
    headers = {'x-requested-with': 'XMLHttpRequest'}
    table = soup.find('div', id='historical-chart-container')
    if table:
        url = host + table['data-without-keyshops-url']
        page = requests.get(url, headers=headers)
        prices = dict(page.json())
        prices = prices['chartData']['deals']
        for old_dict in prices:
            old_dict['ts'] = old_dict.pop('x') / 1000
            old_dict['price'] = old_dict.pop('y')
            old_dict.pop('name', None)
        return {'price_history': prices}
    return None

Итоговая функция

In [13]:
def process_game(url):
    content = get_page_content(host + url)
    if content is None:
        return {}
    soup = BeautifulSoup(content, 'html.parser')
    info = {
        'url': host + url,
        "name": get_name(soup, url),
    }
    for func in [get_img, get_counts, get_other_info, get_dlcs_and_packs, get_price_history]:
        data = func(soup)
        if data:
            info.update(data)

    return info

# Парсинг данных

In [14]:
queue = Queue()
for elem in urls:
    queue.put(elem)

def process_page_wrapper(i):
    with gzip.open('part_{:05d}.jsonl.gz'.format(i), mode='wb') as f_json:
        f_json = codecs.getwriter('utf8')(f_json)

        while not queue.empty():
            url= queue.get()
            record = process_game(url)
            record_str = json.dumps(record, ensure_ascii=False)
            print(record_str, file=f_json)

            with lock:
                pbar.update(1)


with Pool(processes=4) as pool, tqdm(total=queue.qsize()) as pbar:
    lock = pbar.get_lock()
    pool.map(process_page_wrapper, range(pool._processes))

100%|████████████████████████████████████████████████████████████████████████████████| 300/300 [02:20<00:00,  2.13it/s]


In [15]:
with gzip.open('part_00000.jsonl.gz', mode='r') as f_json:
    first = f_json.readlines()[72]
    answer = json.loads(first)
    print(json.dumps(answer, indent=4))

{
    "url": "https://gg.deals/game/out-of-the-park-baseball-18/",
    "name": "Out of the Park Baseball 18",
    "image": "https://img.gg.deals/4b/52/1f2ebb555efa518e5d33132c569955ed3878_307xt176.jpg",
    "market_url": "https://store.steampowered.com/app/465650/",
    "wishlist_count": "101",
    "alert_count": "3",
    "owners_count": "1899",
    "release": "24 Mar 2017",
    "developer": "Out of the Park Developments",
    "Metascore": "86",
    "Userscore": "6.8",
    "review_label": "Very Positive",
    "review_positive_pctg": "91%",
    "review_count": "483",
    "genres": [
        "Indie",
        "Simulation",
        "Sports",
        "Strategy"
    ],
    "tags": [
        "Sports",
        "Baseball",
        "Simulation",
        "Strategy",
        "Indie",
        "Management"
    ],
    "features": [
        "Single-player",
        "Steam Achievements",
        "Steam Trading Cards",
        "Steam Workshop"
    ],
    "platforms": [
        "Windows",
        "Mac",
