<a href="https://colab.research.google.com/github/jzhangfob/igdb-games-data-pipeline/blob/feature%2Fadd-notebook/notebooks/Twitch-Data-ETL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Initialization

- Import necessary packages (install google-cloud-storage if not already)
- Mount Google Drive
- Define export path variable to write raw CSV files to
- Define API endpoints
- Create global header variable to store authentication information

Use of this notebook requires downloading a Service Account Key from GCP:

1. Create a service account and download a service account key to your local machine
2. Upload the service account JSON file to a Google Drive directory
3. Define Google Application Credentials using the file path from step #2 above

In [None]:
pip install google-cloud-storage

In [None]:
# Import packages
import requests
import csv
import time
import pandas as pd
import numpy as np
import os
import json

from google.cloud import storage
from io import StringIO

In [None]:
# Mount GDrive
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# Set the google application credentials path after uploading the service account key to Google Drive
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/content/drive/MyDrive/Twitch Data Pipeline/igdb-pipeline-a3bbac471b4c.json"

In [None]:
# Test that you can access GCS buckets
client = storage.Client()
buckets = list(client.list_buckets())
print(buckets)  # Verifies that you can access your storage buckets

In [None]:
# If exporting to Google Drive, define the directory
EXPORT_PATH = '/content/drive/MyDrive/Twitch Data Pipeline/Raw'

In [None]:
# All endpoints of interest
end_point_games = 'https://api.igdb.com/v4/games'
end_point_platforms = 'https://api.igdb.com/v4/platforms'
end_point_game_modes = 'https://api.igdb.com/v4/game_modes'
end_point_game_engines = 'https://api.igdb.com/v4/game_engines'
end_point_genres = 'https://api.igdb.com/v4/genres'
end_point_external_games = 'https://api.igdb.com/v4/external_games'

# Endpoint dictionary
end_point_dict = {
    'games': end_point_games,
    'platforms': end_point_platforms,
    'game_modes': end_point_game_modes,
    'game_engines': end_point_game_engines,
    'genres': end_point_genres,
    'external_games': end_point_external_games
}

In [None]:
# Pass in headers to api call
HEADERS = {
    'Client-ID': "yzlyxaef51zs7qmklracxzbzuusrcf",
    'Authorization': "Bearer 3itkqiiepb0ml35r2bw1pajtcgncib"
    }

## Functions

1. make_api_call
  - Retrieves the data from a specified endpoint
2. upload_dataframe_to_gcs
  - Writes the data from make_api_call into a GCS storage bucket
3. write_csv_from_api (Optional)
  - Writes the data as a CSV for testing purposes

In [None]:
# Function to make API calls to various endpoints
def make_api_call(end_point, limit, offset, fields, header):
  """
  Makes a request to an API endpoint with specified parameters and retrieves data.

  Parameters:
  ----------
  end_point : str
      The URL of the API endpoint to send the request to.
  limit : int
      The maximum number of records to return in a single API call.
  offset : int
      The starting position in the dataset from which records will be retrieved.
  fields : str
      A comma-separated string specifying the fields to include in the response.
  header : dict
      The headers for the API request, typically containing authentication details
      (e.g., Client ID and authorization token).

  Returns:
  -------
  pandas.DataFrame
      A DataFrame containing all the retrieved data from the specified API endpoint.
  """

  # Start logging message
  print(f"Beginning API call for endpoint: {end_point}\n--------------")
  # Initialize sentinel value and an empty dataframe to store all API data
  results_len = 1
  all_df = pd.DataFrame()

  # Continue the loop until all data from the API has been extracted
  while results_len != 0:

    try:
      # if end_point == 'https://api.igdb.com/v4/game_engines':
      #   params = {
      #       'fields':"*; exclude description;",
      #       'limit':limit,
      #       'offset':offset
      #       }
      # # Set the parameters
      # else:
      #   params = {'fields':fields, 'limit':limit, 'offset':offset}
      params = {'fields':fields, 'limit':limit, 'offset':offset}

      # Make the API call and validate response status
      r = requests.get(end_point, headers = header, params = params)
      if r.status_code != 200:
        raise Exception(f"API call failed with status code {r.status_code}: {r.text}")

      # Print confirmation
      print(f"Getting the results for {r.url}")

      # Parse JSON response and check its structure
      results = r.json()
      if not isinstance(results,list):
        raise ValueError(f"Unexpected response format for {r.url}. Expected a list of records.")

      # Update results length
      results_len = len(results)
      print(f"Received {results_len} records from {r.url}")

      # Add results to the dataframe (all_df)
      if results_len > 0:
        batch_results_df = pd.DataFrame(results)
        all_df = pd.concat([all_df, batch_results_df], ignore_index=True)

      # Increment offset for the next batch
      offset += limit

      # Maximum of 4 api calls per second
      time.sleep(.25)

    # Stop the loop on network failure
    except requests.exceptions.RequestException as e:
      print(f"Network-related error occurred: {e}")
      break
    # Stop the loop on unexpected errors
    except Exception as e:
      print(f"An error occurred: {e}")
      break

  # Print confirmation message
  print(f'Finished retrieving data from {end_point}')
  print(f'Total records retrieved: {all_df.shape[0]}')
  # End logging message
  print(f"Finished API call for endpoint: {end_point}\n--------------")

  return all_df


In [None]:
def write_csv_from_api(api_data, path, data_type):
  # Create the file path if it does not exist
  if not os.path.exists(path):
    os.makedirs(path, exist_ok=True)

  # Write the df as a csv
  final_path = os.path.join(EXPORT_PATH, f'{data_type}.csv')
  api_data.to_csv(final_path, index=False)

  # Print message
  print(f"Wrote {data_type} data to {final_path}")

In [None]:
def upload_dataframe_to_gcs(df, bucket_name, destination_blob_name):
    """
    Writes a pandas DataFrame to Google Cloud Storage as a CSV file.

    Parameters:
    ----------
    df (pandas.DataFrame): The DataFrame to upload.
    bucket_name (str): The name of the GCS bucket.
    destination_blob_name (str): The destination path within the bucket.

    Returns:
    --------
    None
    """
    # Convert DataFrame to CSV
    csv_buffer = StringIO()
    df.to_csv(csv_buffer, index=False)
    # Reset buffer position to the beginning
    csv_buffer.seek(0)

    # Initialize GCS client
    client = storage.Client()
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    # Upload the file
    blob.upload_from_string(csv_buffer.getvalue(), content_type='text/csv')
    print(f"Data uploaded to {bucket_name}/{destination_blob_name}")

## Main function

Loops through the end point dictionary to retrieve data and writes it to the specified path

In [None]:
# Store dataframes from API calls separately
all_api_df = []

# Loop through the end point dict to make API calls
for data_type in end_point_dict:
  # Retrieve data
  data = make_api_call(
      end_point=end_point_dict[data_type],
      limit=500,
      offset=0,
      fields="*",
      header=HEADERS
  )

  all_api_df.append(data)
  # print(f"Columns from {data_type}: {data.columns}\n")

  # Write data to GCS bucket
  upload_dataframe_to_gcs(
      df=data,
      bucket_name="igdb_raw_data_bucket",
      destination_blob_name=data_type
  )


In [None]:
for df in all_api_df:
  print(df.info())