# Download Steam Banners

Code inspired from https://github.com/woctezuma/download-steam-banners

In [1]:
!ls

gdrive	sample_data


In [2]:
# Install requirements

!pip install aiofiles aiohttp

Collecting aiofiles
  Downloading https://files.pythonhosted.org/packages/cf/f2/a67a23bc0bb61d88f82aa7fb84a2fb5f278becfbdc038c5cbb36c31feaf1/aiofiles-0.4.0-py3-none-any.whl
Collecting aiohttp
[?25l  Downloading https://files.pythonhosted.org/packages/0d/5c/f87987f4dc8b2cfcf37c83a814ea4b2aff4d285cbffc0ab08b2b4fa3f584/aiohttp-3.5.4-cp36-cp36m-manylinux1_x86_64.whl (1.2MB)
[K    100% |████████████████████████████████| 1.2MB 15.8MB/s 
[?25hCollecting async-timeout<4.0,>=3.0 (from aiohttp)
  Downloading https://files.pythonhosted.org/packages/e1/1e/5a4441be21b0726c4464f3f23c8b19628372f606755a9d2e46c187e65ec4/async_timeout-3.0.1-py3-none-any.whl
Collecting idna-ssl>=1.0; python_version < "3.7" (from aiohttp)
  Downloading https://files.pythonhosted.org/packages/46/03/07c4894aae38b0de52b52586b24bf189bb83e4ddabfe2e2c8f2419eec6f4/idna-ssl-1.1.0.tar.gz
Collecting multidict<5.0,>=4.0 (from aiohttp)
[?25l  Downloading https://files.pythonhosted.org/packages/71/cc/ceb5b8c76e7a23212b9e0353053cc3

In [0]:
# Import requirements

import asyncio
import json
from pathlib import Path

import aiofiles
import aiohttp

In [4]:
# Mount Google Drive

from google.colab import drive

mount_folder = '/content/gdrive'
drive.mount(mount_folder)

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [5]:
# Go into the working directory

import os

app_folder = mount_folder + '/My Drive/download-steam-banners/'
Path(app_folder).mkdir(exist_ok=True)

os.chdir(app_folder)

!ls

app_ids.txt  features  images_steam  saved_model_steam
data	     images    saved_model


In [0]:
# Input names

def get_app_ids_file_name():
  return 'app_ids.txt'

def get_app_ids():
    with open(get_app_ids_file_name()) as f:
      app_ids = [int(app_id.strip()) for app_id in f.readlines()]
    return app_ids
  
def get_sorted_app_ids():
  app_ids = sorted(get_app_ids(), key=int)
  return app_ids
  
# Output names

def get_banner_folder():
  banner_folder = 'data/'
  Path(banner_folder).mkdir(exist_ok=True)  
  return banner_folder

def get_file_extension():
  return '.jpg'
  
def get_banner_file_name(app_id):
  return get_banner_folder() + str(app_id) + get_file_extension()

# Data source

def get_banner_url(app_id):
  return 'https://steamcdn-a.akamaihd.net/steam/apps/' + str(app_id) + '/header.jpg'

In [0]:
# Function to download Steam banners

async def main():
    async with aiohttp.ClientSession() as session:

        for app_id in get_sorted_app_ids():
            banner_file_name = Path(get_banner_file_name(app_id))

            if banner_file_name.exists():
                continue

            banner_url = get_banner_url(app_id)

            # Reference: https://stackoverflow.com/a/51745925
            async with session.get(banner_url) as resp:
                if resp.status == 200:
                    f = await aiofiles.open(banner_file_name, mode='wb')
                    await f.write(await resp.read())
                    await f.close()
                    print('Banner downloaded to {} for appID {}.'.format(banner_file_name, app_id))
                else:
                    print('Banner for appID {} could not be downloaded.'.format(app_id))

    return

In [8]:
# Download Steam banners
# Caveat: there are more than 30,000 banners to download, this will take some time!

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

Banner for appID 22360 could not be downloaded.
Banner for appID 22362 could not be downloaded.
Banner for appID 366842 could not be downloaded.
Banner for appID 382940 could not be downloaded.
Banner for appID 652930 could not be downloaded.


In [0]:
import glob

def get_app_ids_with_steam_banners():
    image_filenames = Path(get_banner_folder()).glob('*' + get_file_extension())

    app_ids = [banner.name.strip(get_file_extension()) for banner in image_filenames]
    
    # There is an issue with duplicates, but only when running on Google Drive:
    app_ids = [app_id for app_id in app_ids if ' (1)' not in app_id]

    app_ids = sorted(app_ids, key=int)

    return app_ids

In [10]:
# Check the number of banners saved to disk

app_ids = get_app_ids()
print('#appIDs = {}'.format(len(app_ids)))

app_ids_with_steam_banners = get_app_ids_with_steam_banners()
print('#banners = {}'.format(len(app_ids_with_steam_banners)))

#appIDs = 31723
#banners = 31718


In [11]:
import pathlib
from time import time

import cv2 as cv
import numpy as np

from keras.applications.mobilenet import MobileNet
from keras.applications.mobilenet import decode_predictions
from keras.applications.mobilenet import preprocess_input

from keras.preprocessing.image import img_to_array
from keras.preprocessing.image import load_img


Using TensorFlow backend.


In [0]:
def load_keras_model():
    # Reference: https://github.com/keras-team/keras-applications/blob/master/keras_applications/mobilenet.py
    alpha_value = 0.25
    target_model_size = (128, 128)

    num_channels = 3
    # Image data format: channels last
    input_shape = tuple(list(target_model_size) + [num_channels])

    model = MobileNet(include_top=False, pooling=None, alpha=alpha_value, input_shape=input_shape)

    return model, target_model_size


def label_image(image, model):
    # Reference: https://github.com/glouppe/blackbelt/

    # convert the image pixels to a numpy array
    image = img_to_array(image)

    # reshape data for the model
    image = np.expand_dims(image, axis=0)

    # prepare the image for the VGG model
    image = preprocess_input(image)

    # predict the probability across all output classes
    yhat = model.predict(image)

    return yhat

In [13]:
import tensorflow as tf
tf.test.gpu_device_name()

'/device:GPU:0'

In [0]:
# Output folder
features_folder_name = 'features/'
# Reference of the following line: https://stackoverflow.com/a/14364249
Path(features_folder_name).mkdir(exist_ok=True)    

# Load the model
model, target_model_size = load_keras_model()

app_ids = get_sorted_app_ids()
num_games = len(app_ids)

Y_hat = np.zeros((num_games, np.product(model.output_shape[1:])))

start = time()    

for (counter, app_id) in enumerate(app_ids):
    banner_file_name = Path(get_banner_file_name(app_id))

    if not banner_file_name.exists():
      continue       
      
    image = load_img(banner_file_name, target_size=target_model_size)
    yhat = label_image(image, model)  # runtime: 1 second
    Y_hat[counter, :] = yhat.flatten()
    
    if counter % 1000 == 0:
      print('#appIDs processed = {}'.format(counter+1))
      print('Elapsed time: {:.2f} s'.format(time() - start))
      start = time()         
    
np.save(features_folder_name + 'label_database.npy', Y_hat)    