        Kate Gallagher
        MSDS 692: Practicum I
        Spring 2024|8w1

# ETL Pipeline for Video Game Reviews

The purpose of this script is to scrape video game reviews for any game from steampowered.com, clean the data, apply sentiment analysis, and store/export the data for further use. 

In the first section I define the custom functions I'll need to pull and process the data. 

In the second section I demonstrate the entire pipeline end-to-end.

In the third section I demonstrate some additional queries on the data beyond the functions shown on the dashboard.

## Section 1: Defining Functions

In [None]:
#import packages
import requests
from bs4 import BeautifulSoup
import json
import os
import pprint
import string
from string import punctuation
import nltk
import nltk.corpus
from nltk.corpus import stopwords
from nltk.tokenize import RegexpTokenizer
from nltk.stem import WordNetLemmatizer
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from datetime import datetime
import csv
from collections import Counter
import re
import pandas as pd
import numpy as np
#set stopword dictionary to english
nltk.download('stopwords')

### Defining Functions for Data Extraction

The <b>get_steam_reviews</b> function will pull the html data from the review page for a particular steam game, represented by the appID. This function is basic and provides no parameters yet, thus I'm initially unable to filter the reviews.

The <b>get_n_reviews</b> function uses the get_steam_reviews function to pull data based on the filtering parameters I provide, and which loops until it pulls the number of reviews specified.  

The parameters are set to filter for English language only, Steam purchase only (meaning this reviewer is guaranteed to have bought the game), and set the date range for the entire last year. The API caps requests at 1 year. 

Steam allows other users to vote on whether a review was helpful or not. This script will pull the n most helpful reviews from the last year.

These functions were adapted from this page: https://andrew-muller.medium.com/scraping-steam-user-reviews-9a43f9e38c92

In [None]:
def get_steam_reviews(appid, params={'json':1}):
        #provide neutral URL text
        url = 'https://store.steampowered.com/appreviews/'
        #request data from page specified by app ID
        response = requests.get(url=url+appid, params=params, headers={'User-Agent': 'Mozilla/5.0'})
        #return html data from specific URL
        return response.json()

In [None]:
def get_n_reviews(appid, n=1000):
    #initiate empty list to store reviews in
    reviews = []
    #define parameters for filtering
    params = {
            'json' : 1,
            'filter' : 'all',
            'language' : 'english',
            'day_range' : 90000000000000000,
            'review_type' : 'all',
            'purchase_type' : 'steam'
            }
    #set initial cursor
    cursor='*'
    #start count of reviews retrieved
    total_reviews_retrieved = 0
    
    #loop until total reviews retrieved reaches n
    while total_reviews_retrieved < n:
        #set the number of reviews to retrieve in each iteration
        params['num_per_page'] = min(100, n - total_reviews_retrieved)
        #set the cursor parameter
        params['cursor'] = cursor.encode()
        #call get_steam_reviews function with specified appID and parameters
        response = get_steam_reviews(appid, params)
        #retrieve reviews
        current_reviews = response['reviews']
        #update cursor
        cursor = response['cursor']
        #append batch of reviews to list of all reviews
        reviews.extend(current_reviews)
        #update total count of reviews retrieved
        total_reviews_retrieved += len(current_reviews)
        
        #break retrieval loop if less than 1000 items are retrieved
        if len(current_reviews) < 100 or cursor == '': 
            break

    #return populated list of reviews
    return reviews

### Defining Functions for Data Transformation

Once the raw data is extracted, it needs to be cleaned and processed for further use. 

The <b>process_reviews</b> function works on the text of each review:
* Removes characters and punctuation, tokenizes, and lemmatizes the review text 
* Saves two versions of this data: the cleaned but pre-lemmatized review, for sentiment analysis [*original_review*], and the fully processed and lemmatized review for keyword analysis[*preprocessed_review*]. 
* Adds a field to each review with the steam identifier number of the game

The <b>groom_review_dictionary</b> function processes the rest of the review data. It does the following tasks:
* drops unnecessary fields
* converts fields to the appropriate data types
* flattens author sub-fields into main dictionary
* converts minutes to hours in playtime fields

These functions were developed with the help of ChatGPT for structuring the loops and the boolean checks. 

In [None]:
def process_reviews(review_list):
    
    #create translation table for removing punctuation
    replace_chars = string.punctuation
    translation_table = str.maketrans('', '', replace_chars)

    #instantiate the tokenizer and lemmatizer, set stopwords to English
    tokenizer = RegexpTokenizer(r'[a-zA-Z0-9]+')
    lemmatizer = WordNetLemmatizer()
    stop_words = set(stopwords.words('english'))
    
    #loop through each review object 
    for review_obj in review_list:

        #extract content from 'review' html tag 
        html_content = review_obj.get('review', '')

        try:
            #parse html for review tag
            soup = BeautifulSoup(html_content, 'html.parser')
            #pull text from review tag & strip data
            text_data = soup.get_text()
            #Lowercase text
            cleaned_review = text_data.lower()
            #remove punctuation using translation table
            cleaned_review = cleaned_review.translate(translation_table)
            #remove digits
            cleaned_review = ''.join(char for char in cleaned_review if not char.isdigit())
            #tokenize the cleaned review
            tokenized_review = tokenizer.tokenize(cleaned_review)
            #filter out stopwords and lemmatize each token in the review
            lemmatized_review = [lemmatizer.lemmatize(word) for word in tokenized_review if word.lower() not in stop_words]
                                   
            #replace the review field in the processed object copy with the cleaned and lemmatized review data
            review_obj['original_review'] = cleaned_review
            review_obj['preprocessed_review'] = ' '.join(lemmatized_review)
            #add the game_appid field
            review_obj['game_appid'] = str(appid)
            
        #exception handling for errors
        except Exception as e:
            print(f"Error: {e}")
   
    #return list of processed reviews
    return review_list

In [None]:
def groom_review_dictionary(review_list):
    #loop through each review in the processed list
    for review in review_list:
        
        #create list of author sub-fields to drop
        author_subfields_to_drop = ['steamid', 'playtime_last_two_weeks', 'last_played']

        #flatten nested author fields and drop specified subfields
        author_data = review.get('author', {})
        for sub_key, sub_value in author_data.items():
            if sub_key not in author_subfields_to_drop:
                review[f'author_{sub_key}'] = sub_value

        #drop other specified fields
        fields_to_drop = ['recommendationid', 'language', 'timestamp_updated', 'voted_up', 'steam_purchase', 'hidden_in_steam_china',
                          'steam_china_location', 'review', 'timestamp_dev_responded', 'author', 'author_steamid', 'author_playtime_last_two_weeks', 'author_last_played']
        for key in fields_to_drop:
            review.pop(key, None)

        #convert timestamp_created field to datetime
        timestamp_created = review.get('timestamp_created')
        if timestamp_created:
            review['timestamp_created'] = datetime.fromtimestamp(timestamp_created)

        #convert weighted_vote_score field to float
        weighted_vote_score = review.get('weighted_vote_score')
        if weighted_vote_score:
            review['weighted_vote_score'] = float(weighted_vote_score)

        #convert developer_response field to boolean
        developer_response = review.get('developer_response')
        review['developer_response'] = developer_response is not None

        #convert author playtime data from minutes to hours
        author_playtime_forever = review.get('author_playtime_forever')
        if author_playtime_forever is not None:
            review['author_playtime_forever'] = round(author_playtime_forever / 60)
        
        author_playtime_at_review = review.get('author_playtime_at_review')
        if author_playtime_at_review is not None:
            review['author_playtime_at_review'] = round(author_playtime_at_review / 60)
        
    #return the list of groomed review dictionaries
    return review_list

## Section 2: End-to-end Demonstration

### Extraction

In order to use the functions I defined, I need the Steam identifier number for that game, or the "game app id". These can be found in the URL of the game's steampowered.com webpage - see the screenshot below for an example.

![image.png](attachment:image.png)

In [None]:
#define game app id variables
stardew_appid = '413150'
cyberpunk_appid = '1091500'
starfield_appid = '1716740'

In [None]:
#set variable
appid = stardew_appid
#extract reviews
raw_reviews = get_n_reviews(appid, n=1000)

Steam's API throttles the call rate on their end. It took me about 10-15 minutes to pull 1000 reviews. Just let the function run and do not move on to the next step until it is complete.

### Transformation

Now we clean and transform the scraped data. The sentiment analysis is applied after combining all review datasets together, for consistency.

In [None]:
#process and clean the reviews
processed_reviews = process_reviews(raw_reviews)

In [None]:
#groom the review dictionaries and save in a unique variable
groomed_stardew_reviews = groom_review_dictionary(processed_reviews)

In [None]:
#combine all datasets into a single list
combined_reviews_3000 = []
combined_reviews_3000.extend(groomed_stardew_reviews)
combined_reviews_3000.extend(groomed_cyberpunk_reviews)
combined_reviews_3000.extend(groomed_starfield_reviews)

In [None]:
#initialize sentiment intensity analyzer
sia = SentimentIntensityAnalyzer()

#apply Vader sentiment intensity analyzer to the original_review field
for review in combined_reviews_3000:
    review['vader_sentiment'] = sia.polarity_scores(review['original_review'])['compound']

#add fields for sentiment score and categorization of score
for review in combined_reviews_3000:
    score = review['vader_sentiment']
    if score > 0.5:
        review['sentiment_category'] = 'Positive'
    elif score < -0.05:
        review['sentiment_category'] = 'Negative'
    else:
        review['sentiment_category'] = 'Neutral'

### Loading

In this section I export the transformed dataset to CSV for usage in Tableau. I also write the data to MongoDB for storage purposes.

It is possible to connect Tableau directly to the data in MongoDB and I have done so, but the connection is slow and unreliable. It is far more efficient to use a CSV for the dashboard.

#### Export to CSV

This cell writes the entire processed and analyzed dataset to a CSV. This file supports most of the Tableau dashboard.

In [None]:
#specify the field names for the CSV file
field_names = ['original_review', 'preprocessed_review', 'vader_sentiment', 'sentiment_category', 'timestamp_created', 'votes_up', 'votes_funny', 'weighted_vote_score', 'comment_count', 'received_for_free', 'written_during_early_access', 'game_appid', 'author_num_games_owned', 'author_num_reviews', 'author_playtime_forever', 'author_playtime_at_review', 'developer_response']

#specify the file path for the CSV file
csv_file_path = 'combined_reviews_3000.csv'

#write the combined data to the CSV file
with open(csv_file_path, 'w', newline='', encoding='utf-8') as csv_file:
    writer = csv.DictWriter(csv_file, fieldnames=field_names)
    writer.writeheader()
    for review in combined_reviews_3000:
        writer.writerow(review)

The following cell creates a keywords dataframe to support the *customized keywords count* section of the dashboard. The rest of the dashboard is supported by the first csv.

In [None]:
#initialize lists to store data
words = []
sentiments = []
game_appids = []

#extract data from combined_reviews
for review in combined_reviews_3000:
    preprocessed_review = review['preprocessed_review']
    sentiment_category = review['sentiment_category']
    game_appid = review['game_appid']
    
    #split preprocessed review into words
    review_words = preprocessed_review.split()
    
    #add words, sentiment category, and game_appid to respective lists
    words.extend(review_words)
    sentiments.extend([sentiment_category] * len(review_words))
    game_appids.extend([game_appid] * len(review_words))

#create new df with extracted data
data = {'word': words, 'sentiment_category': sentiments, 'game_appid': game_appids}
word_freq_df_updated = pd.DataFrame(data)

In [None]:
#write word frequency df to CSV
def write_dataframe_to_csv(df, filename):
    df.to_csv(filename, index=False)

write_dataframe_to_csv(word_freq_df_updated, 'review_word_freq_updated.csv')

#### Save to MongoDB for future usage

In [None]:
#import packages
import pymongo
from pymongo import MongoClient

In [None]:
#link to Mongo Atlas cluster
connection_string = "mongodb+srv://kgallagher:FkYE7R4T1KMpItH0@cluster0.qvuy9wc.mongodb.net/?retryWrites=true&w=majority"

#connect to host for loading
client = MongoClient(connection_string)

#select database
db = client['VideoGameReviews']

In [None]:
#create collection within database
collection = db["videogame_reviews"]

#insert processed data into MongoDB
for groomed_review in combined_reviews_3000:
    collection.insert_one(groomed_review)

## Section 3: Example Queries

This section demonstrates some deeper dive queries on information that a developer might want after looking at the dashboard, or in addition to doing so.

#### Pull 10 most recent reviews containing a specific keyword

In [None]:
#set game app id variable
game_appid = cyberpunk_appid

#list comprehension to search for specific term
filtered_reviews = [review for review in combined_reviews_3000 if 'crash' in review['original_review'].lower() and review['game_appid'] == game_appid]

#sort the filtered reviews by 'timestamp_created' in descending order, select the top 10, and extract the 'original_review' field
most_recent_10_reviews = [review['original_review'] for review in sorted(filtered_reviews, key=lambda x: x['timestamp_created'], reverse=True)[:10]]

most_recent_10_reviews

#### Find average playtime for highest weighted reviews

In [None]:
#set game app id variable
game_appid = cyberpunk_appid

#define the threshold for a high weighted vote score
weighted_vote_score_threshold = 0.9

#list comprehension to pull reviews for the specified game and with a weighted vote score above the threshold
high_score_reviews = [review for review in combined_reviews_3000 if review['game_appid'] == game_appid and review['weighted_vote_score'] > weighted_vote_score_threshold]

#calculate the average playtime at review for high score reviews
if high_score_reviews:  # Check if the list is not empty to avoid division by zero
    average_playtime = sum(review['author_playtime_at_review'] for review in high_score_reviews) / len(high_score_reviews)
    average_playtime_hours = average_playtime
    print(f"Average playtime for highest weighted reviews: {round(average_playtime_hours, 2)} hours")
else:
    print(f"None")

#### Find most common words in Negative and Positive reviews

In [None]:
#set game app id variable
game_appid = cyberpunk_appid

#list comprehension to pull reviews for the specified game and with a Negative sentiment category
negative_reviews = [review for review in combined_reviews_3000 if review['game_appid'] == game_appid and review['sentiment_category'] == "Negative"]

#apply word counter to determine most common words
negative_review_texts = [review['preprocessed_review'].lower() for review in negative_reviews]
word_counts = Counter(re.findall(r'\b\w+\b', ' '.join(negative_review_texts)))
most_common_words = word_counts.most_common(10)

print("Most common words in negative reviews:", most_common_words)

In [None]:
#set game app id variable
game_appid = cyberpunk_appid

#list comprehension to pull reviews for the specified game and with a Positive sentiment category
positive_reviews = [review for review in combined_reviews_3000 if review['game_appid'] == game_appid and review['sentiment_category'] == "Positive"]

#apply word counter to determine most common words
positive_review_texts = [review['preprocessed_review'].lower() for review in positive_reviews]
word_counts = Counter(re.findall(r'\b\w+\b', ' '.join(positive_review_texts)))
most_common_words = word_counts.most_common(10)

print("Most common words in positive reviews:", most_common_words)

#### Determine correlation between sentiment score and playtime

In [None]:
#set game app id variable
game_appid = cyberpunk_appid

#list comprehension to filter reviews for the specified game
filtered_reviews = [review for review in combined_reviews_3000 if review['game_appid'] == game_appid]

#extract playtimes and sentiment scores from the filtered reviews
playtimes = [review['author_playtime_forever'] for review in filtered_reviews]
sentiment_scores = [review['vader_sentiment'] for review in filtered_reviews]

#calculate the correlation between playtime and weighted vote score
correlation = np.corrcoef(playtimes, sentiment_scores)[0, 1]
print(f"Correlation between playtime and sentiment score for game {game_appid}: {correlation}")

#### Count highly engaged reviews by Negative and Positive

In [None]:
#set game app id variable
game_appid = cyberpunk_appid

#list comprehension to filter reviews for the specified game
filtered_reviews = [review for review in combined_reviews_3000 if review['game_appid'] == game_appid]

#define the engagement threshold
engagement_threshold = 250

#identify highly engaged reviews based on the threshold
highly_engaged_reviews = [review for review in filtered_reviews if review['votes_up'] + review['votes_funny'] + review['comment_count'] > engagement_threshold]

#separate highly engaged reviews into positive and negative sentiment categories
positive_reviews = [review for review in highly_engaged_reviews if review['sentiment_category'] == 'Positive']
negative_reviews = [review for review in highly_engaged_reviews if review['sentiment_category'] == 'Negative']

print(f"Positive: {len(positive_reviews)} reviews")
print(f"Negative: {len(negative_reviews)} reviews")

#### See upvotes on posts with and without developer responses

In [None]:
#set game app id variable
game_appid = cyberpunk_appid

#list comprehension to filter reviews for the specified game
filtered_reviews = [review for review in combined_reviews_3000 if review['game_appid'] == game_appid]

#separate reviews into those with and without developer responses
reviews_with_dev_response = [review for review in filtered_reviews if review['developer_response']]
reviews_without_dev_response = [review for review in filtered_reviews if not review['developer_response']]

#calculate the average vote scores for each group
average_votes_with_dev_response = sum(review['votes_up'] for review in reviews_with_dev_response) / len(reviews_with_dev_response) if reviews_with_dev_response else "No reviews with developer responses"
average_votes_without_dev_response = sum(review['votes_up'] for review in reviews_without_dev_response) / len(reviews_without_dev_response) if reviews_without_dev_response else "No reviews without developer responses"

print(f"   Developer Response: {round(average_votes_with_dev_response, 2)}")
print(f"No Developer Response: {round(average_votes_without_dev_response, 2)}")

There are plenty of other queries that can be done on this data, according to the concerns of the specific developer. I hope you enjoyed this!