This notebook pulls data from the local recommendation engine project pipeline between the cleaning step and the ML preprocessing step, then pipes it to BigQuery.

In [1]:
import time
import pickle
import pandas as pd
from google.cloud import bigquery
from google.api_core.exceptions import GoogleAPICallError, ServiceUnavailable

### Key Function

In [2]:
def get_thee_to_google(project="steam-recommendation-engine", file_path=None, \
                       table_id=None, schema=None, \
                       format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON, skip=None,
                       clustering_fields=None) :
    """
    Uploads a file to Google BigQuery.  
      
    'project' is the BigQuery project name.  
    'file_path' is the location of the file to be uploaded (must be jsonl).  
    'table_id' is the id of the table to be created in BigQuery.  
    'schema' is the schema of the table.  
    'format' indicates the file format within 'bigquery.SourceFormat'.  
    'skip' sets the number of rows to skip (only specify if uploading csv).  
    'clustering_fields' accepts a list of fields and can be left blank.
    """

    client = bigquery.Client(project=project)

    if skip :
        params = {
            'source_format':format,
            'write_disposition':bigquery.WriteDisposition.WRITE_TRUNCATE,
            'skip_leading_rows':skip,
            'schema':schema,
            'clustering_fields':clustering_fields
        }
    else :
        params = {
            'source_format':format,
            'write_disposition':bigquery.WriteDisposition.WRITE_TRUNCATE,
            'schema':schema,
            'clustering_fields':clustering_fields
        }

    job_config = bigquery.LoadJobConfig(**params)

    with open(file_path, 'rb') as source_file:
        job = client.load_table_from_file(source_file, table_id, job_config=job_config)

    # Error handling.
    attempts = 1
    while attempts < 6 :
        try :
            job.result()
            print(f'{job.output_rows} rows successfully loaded as {table_id}.')
            break
        except (GoogleAPICallError, ServiceUnavailable) as e:
            print(f"Fail {attempts}: {e}")
            time.sleep(attempts**2)
            attempts += 1
    else :
        print(f"All {attempts} attempts failed.")

### Games Table

In [3]:
# Choose the columns we'll need for Google.
new_column_order = ['app_id', 'title', 'developer', 'publisher', 'description', 'release_date', \
                    'price', 'number_of_reviews', 'positive_review_percent', 'tags', 'tag_list', \
                    'interface_languages', 'full_audio_languages', 'subtitles_languages', \
                    'game_page_link', 'date_scraped']

# Load and create.
games_df = pd.read_pickle('../data/interim/1 - Games DF - Wrangled.pkl')
google_df = games_df.reindex(columns=new_column_order)

# Prep the data specifically for BigQuery.
for index, row in google_df.iterrows() :
    if type(row['release_date']) == str :
        google_df.at[index, 'release_date'] = None
    if type(row['interface_languages']) == list and len(row['interface_languages']) > 1:
        if len(row['interface_languages'][0]) == None :
            google_df.at[index, 'interface_languages'] = []
    if pd.isna(row['price']) :
        google_df.at[index, 'price'] = 0.0

google_df['release_date'] = pd.to_datetime(google_df['release_date']).dt.strftime('%Y-%m-%d')
# google_df['price'] = google_df['price'].astype(int)
google_df['number_of_reviews'] = google_df['number_of_reviews'].fillna(0)

# Further simplification.
google_df['positive_review_percent'] = (google_df['positive_review_percent'] * 100)\
    .fillna(0).astype(int)

# Save locally before uploading.
google_df.to_parquet('../data/interim/games_df_for_google.parquet', index=False)
google_df.to_json('../data/interim/games_df_for_google.jsonl', orient='records', lines=True)

In [4]:
# For "games" table, stablish schemas and upload to BigQuery.
file_path = '../data/interim/games_df_for_google.jsonl'
table_id = 'steam-recommendation-engine.steam_data.games'

schema = [
    bigquery.SchemaField("app_id", bigquery.enums.SqlTypeNames.INTEGER, mode="REQUIRED"),
    bigquery.SchemaField("title", bigquery.enums.SqlTypeNames.STRING, mode="NULLABLE"),
    bigquery.SchemaField("developer", bigquery.enums.SqlTypeNames.STRING, mode="REPEATED"),
    bigquery.SchemaField("publisher", bigquery.enums.SqlTypeNames.STRING, mode="REPEATED"),
    bigquery.SchemaField("description", bigquery.enums.SqlTypeNames.STRING, mode="NULLABLE"),
    bigquery.SchemaField("release_date", bigquery.enums.SqlTypeNames.DATE, mode="NULLABLE"),
    bigquery.SchemaField("price", bigquery.enums.SqlTypeNames.FLOAT, mode="NULLABLE"),
    bigquery.SchemaField("number_of_reviews", bigquery.enums.SqlTypeNames.FLOAT, mode="NULLABLE"),
    bigquery.SchemaField("positive_review_percent", bigquery.enums.SqlTypeNames.FLOAT, mode="NULLABLE"),
    bigquery.SchemaField("tags", bigquery.enums.SqlTypeNames.STRING, mode="REPEATED"),
    bigquery.SchemaField("tag_list", bigquery.enums.SqlTypeNames.STRING, mode="REPEATED"),
    bigquery.SchemaField("interface_languages", bigquery.enums.SqlTypeNames.STRING, mode="REPEATED"),
    bigquery.SchemaField("full_audio_languages", bigquery.enums.SqlTypeNames.STRING, mode="REPEATED"),
    bigquery.SchemaField("subtitles_languages", bigquery.enums.SqlTypeNames.STRING, mode="REPEATED"),
    bigquery.SchemaField("game_page_link", bigquery.enums.SqlTypeNames.STRING, mode="NULLABLE"),
    bigquery.SchemaField("date_scraped", bigquery.enums.SqlTypeNames.DATE, mode="NULLABLE"),
]

clustering_fields = ['release_date']

get_thee_to_google(file_path=file_path, table_id=table_id, \
                   schema=schema, clustering_fields=clustering_fields)

100892 rows successfully loaded as steam-recommendation-engine.steam_data.games.


### Users Table

In [5]:
# Load the set of all user ids
with open('../data/raw/all_users', 'rb+') as file :
    all_users = set(pickle.load(file))

# Tableify it
all_users_table = pd.DataFrame(all_users)
all_users_table.columns = ['user_id']

# Change id from str to int64 to save storage space
all_users_table['user_id'] = pd.to_numeric(all_users_table['user_id'], errors='coerce')

# Save to disk
all_users_table.to_json('../data/interim/all_users_table_for_google.jsonl', orient='records', lines=True)

In [6]:
# For "users" table, stablish schemas and upload to BigQuery.
file_path = '../data/interim/all_users_table_for_google.jsonl'
table_id = 'steam-recommendation-engine.steam_data.users'

schema = [
    bigquery.SchemaField("user_id", bigquery.enums.SqlTypeNames.INTEGER, mode="REQUIRED"),
]

get_thee_to_google(file_path=file_path, table_id=table_id, schema=schema)

5069512 rows successfully loaded as steam-recommendation-engine.steam_data.users.


### Reviews Table

In [18]:
review_table = pd.read_parquet('../data/interim/cleaned_reviews.parquet')

# For consistency and readability
review_table.rename(columns={'user': 'user_id', 'total_playtime':'hours_played_total', \
                             'review_playtime':'hours_played_at_review'}, inplace=True)

# Dates need finessing for Google to accept
review_table['review_date'] = pd.to_datetime(review_table['review_date']).dt.strftime('%Y-%m-%d')
review_table['edit_date'] = pd.to_datetime(review_table['edit_date']).dt.strftime('%Y-%m-%d')
review_table['date_scraped'] = pd.to_datetime(review_table['date_scraped']).dt.strftime('%Y-%m-%d')

# Save locally
review_table.to_json('../data/interim/review_table_for_google.jsonl', orient='records', lines=True)

In [19]:
# For "reviews" table, stablish schemas and upload to BigQuery.
file_path = '../data/interim/review_table_for_google.jsonl'
table_id = 'steam-recommendation-engine.steam_data.reviews'

schema = [
    bigquery.SchemaField("user_id", bigquery.enums.SqlTypeNames.INTEGER, mode="REQUIRED"),
    bigquery.SchemaField("app_id", bigquery.enums.SqlTypeNames.INTEGER, mode="REQUIRED"),
    bigquery.SchemaField("positive", bigquery.enums.SqlTypeNames.BOOL, mode="REQUIRED"),
    bigquery.SchemaField("hours_played_total", bigquery.enums.SqlTypeNames.FLOAT, mode="NULLABLE"),
    bigquery.SchemaField("hours_played_at_review", bigquery.enums.SqlTypeNames.FLOAT, mode="NULLABLE"),
    bigquery.SchemaField("text", bigquery.enums.SqlTypeNames.STRING, mode="NULLABLE"),
    bigquery.SchemaField("helpful_count", bigquery.enums.SqlTypeNames.INTEGER, mode="NULLABLE"),
    bigquery.SchemaField("review_date", bigquery.enums.SqlTypeNames.DATE, mode="NULLABLE"),
    bigquery.SchemaField("edit_date", bigquery.enums.SqlTypeNames.DATE, mode="NULLABLE"),
    bigquery.SchemaField("date_scraped", bigquery.enums.SqlTypeNames.DATE, mode="NULLABLE")
]

clustering_fields = ['app_id', 'review_date']

get_thee_to_google(file_path=file_path, table_id=table_id, \
                   schema=schema, clustering_fields=clustering_fields)

6747619 rows successfully loaded as steam-recommendation-engine.steam_data.reviews.


### Relationships Table

In [9]:
relationships_table = pd.read_parquet('../data/interim/all_relationships_cleaned.parquet')

# Dates need finessing for Google to accept
relationships_table['friend_since'] = pd.to_datetime(relationships_table['friend_since']).dt.strftime('%Y-%m-%d')

# Save locally
relationships_table.to_json('../data/interim/relationships_table_for_google.jsonl', orient='records', lines=True)

In [10]:
# For "relationships" table, stablish schemas and upload to BigQuery.
file_path = '../data/interim/relationships_table_for_google.jsonl'
table_id = 'steam-recommendation-engine.steam_data.relationships'
schema = [
    bigquery.SchemaField("users", bigquery.enums.SqlTypeNames.INTEGER, mode="REPEATED"),
    bigquery.SchemaField("friend_since", bigquery.enums.SqlTypeNames.DATE, mode="NULLABLE")
]

get_thee_to_google(file_path=file_path, table_id=table_id, schema=schema)

3072233 rows successfully loaded as steam-recommendation-engine.steam_data.relationships.


### Recently Played Table

In [11]:
recently_played_table = pd.read_pickle('../data/interim/recently_played_cleaned.pkl')

# For consistency and readability
recently_played_table.rename(columns={'user':'user_id', 'playtime_2w':'hours_played_last_2_weeks', \
                                      'playtime_f':'hours_played_total'}, inplace=True)

# Save locally. Doing this one as CSV because jsonl gave odd "row too long" errors.
recently_played_table.to_csv('../data/interim/recently_played_table_for_google.csv', index=False)

In [12]:
# For "recently_played" table, stablish schemas and upload to BigQuery.
file_path = '../data/interim/recently_played_table_for_google.csv'
table_id = 'steam-recommendation-engine.steam_data.recently_played'
schema = [
    bigquery.SchemaField("user_id", bigquery.enums.SqlTypeNames.INTEGER, mode="REQUIRED"),
    bigquery.SchemaField("app_id", bigquery.enums.SqlTypeNames.INTEGER, mode="REQUIRED"),
    bigquery.SchemaField("hours_played_in_last_2_weeks", bigquery.enums.SqlTypeNames.INTEGER, mode="NULLABLE"),
    bigquery.SchemaField("hours_played_total", bigquery.enums.SqlTypeNames.INTEGER, mode="NULLABLE")
]

clustering_fields = ['user_id']

get_thee_to_google(file_path=file_path, table_id=table_id, \
                   schema=schema, clustering_fields=clustering_fields, \
                   format=bigquery.SourceFormat.CSV, skip=1)

4181501 rows successfully loaded as steam-recommendation-engine.steam_data.recently_played.


### Potential Company Duplicates (WIP)

In [13]:
# These potential duplicates were identified using fuzzywuzzy with a
# threshold of 90. Verifying duplicates, selecting canonical forms,
# and performing replacement will take more time than I have at the
# moment, so I'll just add this file to the dataset for later.

pot_dups = set()
bad_dups = set()

with open('../data/interim/potential_company_duplicates.txt', 'r') as file :
    for line in file :
        cleaned = line.replace('\n', '')
        tup = tuple(cleaned.split(' - '))
        if len(tup) == 2 :
            pot_dups.add(tup)        
        else :
            bad_dups.add(tup)    

print(f"Successfully parsed potential duplicates: {len(pot_dups)}")
print(f"Unsuccessfully parsed potential duplicates: {len(bad_dups)}")

Successfully parsed potential duplicates: 3416
Unsuccessfully parsed potential duplicates: 12


In [14]:
# Lookie lookie
bad_dups

{('BYU Animation', ' Class of 2017', 'BYU Animation', ' Class of 2016'),
 ('BYU Animation', ' Class of 2018', 'BYU Animation', ' Class of 2016'),
 ('BYU Animation', ' Class of 2018', 'BYU Animation', ' Class of 2017'),
 ('BYU Animation', 'Class of 2023', 'BYU Animation', ' Class of 2016'),
 ('BYU Animation', 'Class of 2023', 'BYU Animation', ' Class of 2017'),
 ('BYU Animation', 'Class of 2023', 'BYU Animation', ' Class of 2018'),
 ('Chris Bell',
  'Creative Design Studios',
  'Chris Bell – Creative Design Studios'),
 ('Dovetail Games', 'Trains', 'Dovetail Games ', 'Trains'),
 ('Dovetail Games', 'Trains', 'Dovetail Games – Trains'),
 ('Dovetail Games ', 'Trains', 'Dovetail Games – Trains'),
 ('Dreams of Heaven', 'Games', 'Dreams of Heaven Games'),
 ('Ubisoft', 'San Francisco', 'Ubisoft San Francisco')}

In [15]:
# Looks like the first 6 of those are not real potential duplicates.
# Let's manually handle the remaining 6.
# Gotta be careful with those en dashes and extra spaces...

fixeds = [
    ('Chris Bell - Creative Design Studios', 'Chris Bell – Creative Design Studios'),
    ('Dovetail Games - Trains', 'Dovetail Games  - Trains'),
    ('Dovetail Games - Trains', 'Dovetail Games – Trains'),
    ('Dovetail Games  - Trains', 'Dovetail Games – Trains'),
    ('Dreams of Heaven - Games', 'Dreams of Heaven Games'),
    ('Ubisoft - San Francisco', 'Ubisoft San Francisco')
]

for item in fixeds :
    pot_dups.add(item)

print(len(pot_dups))

3422


In [16]:
# The count is correct. Now let's save locally.
potential_duplicates = pd.DataFrame(pot_dups)
potential_duplicates.rename(columns={0:'op_1', 1:'op_2'}, inplace=True)

potential_duplicates.to_json('../data/interim/potential_duplicates_for_google.jsonl', orient='records', lines=True)

In [17]:
# And a one, and a two, and a you-know-what-to-do...
file_path = '../data/interim/potential_duplicates_for_google.jsonl'
table_id = 'steam-recommendation-engine.steam_data.potential_duplicates'
schema = [
    bigquery.SchemaField("op_1", bigquery.enums.SqlTypeNames.STRING, mode="REQUIRED"),
    bigquery.SchemaField("op_2", bigquery.enums.SqlTypeNames.STRING, mode="REQUIRED")
]

get_thee_to_google(file_path=file_path, table_id=table_id, schema=schema)

3422 rows successfully loaded as steam-recommendation-engine.steam_data.potential_duplicates.
