In [6]:
from datetime import datetime, timedelta
import os
import io
import requests
import boto3
import pandas as pd

In [10]:
# retrieve the data from the API
url = "https://pokeapi.co/api/v2/generation?limit=10000"
response = requests.get(url, timeout=1000)
if response.status_code == 200:
    data = response.json()["results"]
    generations = []
    for generation in data: 
        generation_url = generation["url"]
        response = requests.get(generation_url, timeout=1000)
        if response.status_code == 200:
            pokemon_data = response.json()["pokemon_species"]
            pokemon_names = [pokemon["name"] for pokemon in pokemon_data]
            generations.append(
                {
                    "generation_name": generation["name"],
                    "pokemon_name": pokemon_names,
                }
            )

In [11]:
# create a DataFrame from the data
df = pd.DataFrame(generations)

# explode the "pokemon_names" column and select relevant columns
df = df.explode("pokemon_name")

df.head()

Unnamed: 0,generation_name,pokemon_name
0,generation-i,bulbasaur
0,generation-i,charmander
0,generation-i,squirtle
0,generation-i,caterpie
0,generation-i,weedle


In [12]:
# Retrieve the data from the API
response = requests.get("https://pokeapi.co/api/v2/move?limit=10000", timeout=1000)
if response.status_code == 200:
    data = response.json()["results"]

    # Create a dictionary to store the data
    moves_dict = {}

    # Loop through the data and add it to the dictionary
    for move in data:
        move_id = int(move["url"].split("/")[-2])
        move_response = requests.get(move["url"], timeout=1000)
        move_data = move_response.json()
        move_name = move_data["name"]
        move_accuracy = move_data["accuracy"]

        # Loop through the list of pokemon that can learn this move
        for pokemon in move_data["learned_by_pokemon"]:
            pokemon_name = pokemon["name"]
            if pokemon_name not in moves_dict:
                moves_dict[pokemon_name] = []
            moves_dict[pokemon_name].append((move_id, move_name, move_accuracy))

    # Convert the dictionary to a Pandas dataframe
    data = []
    for pokemon_name, moves in moves_dict.items():
        for move in moves:
            data.append((pokemon_name, move[0], move[1], move[2]))
    df = pd.DataFrame(
        data,
        columns=["learned_by_pokemon", "move_id", "move_name", "move_accuracy"],
    )

In [13]:
df.head()

Unnamed: 0,learned_by_pokemon,move_id,move_name,move_accuracy
0,clefairy,1,pound,100.0
1,clefairy,3,double-slap,85.0
2,clefairy,5,mega-punch,85.0
3,clefairy,7,fire-punch,100.0
4,clefairy,8,ice-punch,100.0


In [14]:
# retrieve the data from the API
url = "https://pokeapi.co/api/v2/pokemon-habitat?limit=3000"
response = requests.get(url, timeout=1000)
if response.status_code == 200:
    data = response.json()["results"]
    habitat_list = []
    for habitat in data:
        habitat_url = habitat["url"]
        response = requests.get(habitat_url, timeout=1000)
        if response.status_code == 200:
            habitat_data = response.json()
            for pokemon in habitat_data["pokemon_species"]:
                pokemon_dict = {
                    "habitat": habitat_data["name"],
                    "pokemon_name": pokemon["name"],
                }
                habitat_list.append(pokemon_dict)

In [15]:
# create a DataFrame from the data
df = pd.DataFrame(habitat_list)

df.head()

Unnamed: 0,habitat,pokemon_name
0,cave,zubat
1,cave,diglett
2,cave,gastly
3,cave,onix
4,cave,misdreavus


In [19]:
# retrieve the data from the API
url = "https://pokeapi.co/api/v2/pokemon?limit=10000"
response = requests.get(url, timeout=1000)
if response.status_code == 200:
    data = response.json()["results"]
    pokemon_list = []
    for pokemon in data:
        pokemon_url = pokemon["url"]
        response = requests.get(pokemon_url, timeout=1000)
        if response.status_code == 200:
            pokemon_data = response.json()
            base_stats = [bs["stat"]["name"] for bs in pokemon_data["stats"]]
            pokemon_dict = {
                "pokemon_name": pokemon_data["name"],
                "base_stat": base_stats,
            }
            pokemon_list.append(pokemon_dict)

In [20]:
# create a DataFrame from the data
df = pd.DataFrame(pokemon_list)

# explode the "base_stats" column and select relevant columns
df = df.explode("base_stat")

df.head()

Unnamed: 0,pokemon_name,base_stat
0,bulbasaur,hp
0,bulbasaur,attack
0,bulbasaur,defense
0,bulbasaur,special-attack
0,bulbasaur,special-defense


In [1]:
import configparser
import pathlib
import sys
import psycopg2
from psycopg2 import sql

In [3]:
# Parse our configuration file
script_path = "/workspaces/data-engineer-zoomcamp-project/airflow/scripts"
parser = configparser.ConfigParser()
parser.read(f"{script_path}/configuration.conf")

# Store our configuration variables
USERNAME = parser.get("aws_config", "redshift_username")
PASSWORD = parser.get("aws_config", "redshift_password")
HOST = parser.get("aws_config", "redshift_hostname")
PORT = parser.get("aws_config", "redshift_port")
AWS_ACCESS_KEY_ID = parser.get("aws_config", "aws_access_key_id")
AWS_SECRET_ACCESS_KEY = parser.get("aws_config", "aws_secret_access_key")
DATABASE = parser.get("aws_config", "redshift_database")
SCHEMA_NAME = parser.get("aws_config", "redshift_schema")
BUCKET_NAME = parser.get("aws_config", "bucket_name")

In [4]:
# Our S3 files
pokemon_info = f"s3://{BUCKET_NAME}/data/pokemon_info.csv"
pokemon_generations = f"s3://{BUCKET_NAME}/data/pokemon_generations.csv"
pokemon_moves = f"s3://{BUCKET_NAME}/data/pokemon_moves.csv"
pokemon_habitats = f"s3://{BUCKET_NAME}/data/pokemon_habitats.csv"
pokemon_base_stats = f"s3://{BUCKET_NAME}/data/pokemon_base_stats.csv"

In [25]:
# Create Redshift table if it doesn't exist
sql_create_table = sql.SQL(
    """CREATE TABLE IF NOT EXISTS staging.pokemon_info (
        pokemon_id INT PRIMARY KEY,
        name VARCHAR(256),
        height INT,
        weight INT,
        types VARCHAR(256)
    );
    
    CREATE TABLE IF NOT EXISTS staging.pokemon_generations (
        generation_name VARCHAR(256),
        pokemon_name VARCHAR(256)
    );
    
    CREATE TABLE IF NOT EXISTS staging.pokemon_moves (
        learned_by_pokemon VARCHAR(256),
        move_id INT,
        move_name VARCHAR(256),
        move_accuracy DECIMAL
    );
    
    CREATE TABLE IF NOT EXISTS staging.pokemon_habitats (
        habitat VARCHAR(256),
        pokemon_name VARCHAR(256)
    );
    
    CREATE TABLE IF NOT EXISTS staging.pokemon_base_stats (
        pokemon_id INT,
        pokemon_name VARCHAR(256),
        base_stat VARCHAR(256)
    );"""
)

print(sql_create_table)

SQL('CREATE TABLE IF NOT EXISTS staging.pokemon_info (\n        pokemon_id INT PRIMARY KEY,\n        name VARCHAR(256),\n        height INT,\n        weight INT,\n        types VARCHAR(256)\n    );\n    \n    CREATE TABLE IF NOT EXISTS staging.pokemon_generations (\n        generation_name VARCHAR(256),\n        pokemon_name VARCHAR(256)\n    );\n    \n    CREATE TABLE IF NOT EXISTS staging.pokemon_moves (\n        learned_by_pokemon VARCHAR(256),\n        move_id INT,\n        move_name VARCHAR(256),\n        move_accuracy DECIMAL\n    );\n    \n    CREATE TABLE IF NOT EXISTS staging.pokemon_habitats (\n        habitat VARCHAR(256),\n        pokemon_name VARCHAR(256)\n    );\n    \n    CREATE TABLE IF NOT EXISTS staging.pokemon_base_stats (\n        pokemon_id INT,\n        pokemon_name VARCHAR(256),\n        base_stat VARCHAR(256)\n    );')


In [26]:
# If ID already exists in table, we remove it and add new ID record during load.
sql_copy_pokemon_info = f"COPY {SCHEMA_NAME}.pokemon_info FROM '{pokemon_info}' CREDENTIALS 'aws_access_key_id={AWS_ACCESS_KEY_ID};aws_secret_access_key={AWS_SECRET_ACCESS_KEY}' IGNOREHEADER 1 DELIMITER ',' CSV;"
sql_copy_pokemon_generations = f"COPY {SCHEMA_NAME}.pokemon_generations FROM '{pokemon_generations}' CREDENTIALS 'aws_access_key_id={AWS_ACCESS_KEY_ID};aws_secret_access_key={AWS_SECRET_ACCESS_KEY}' IGNOREHEADER 1 DELIMITER ',' CSV;"
sql_copy_pokemon_moves = f"COPY {SCHEMA_NAME}.pokemon_moves FROM '{pokemon_moves}' CREDENTIALS 'aws_access_key_id={AWS_ACCESS_KEY_ID};aws_secret_access_key={AWS_SECRET_ACCESS_KEY}' IGNOREHEADER 1 DELIMITER ',' CSV;"
sql_copy_pokemon_habitats = f"COPY {SCHEMA_NAME}.pokemon_habitats FROM '{pokemon_habitats}' CREDENTIALS 'aws_access_key_id={AWS_ACCESS_KEY_ID};aws_secret_access_key={AWS_SECRET_ACCESS_KEY}' IGNOREHEADER 1 DELIMITER ',' CSV;"
sql_copy_pokemon_base_stats = f"COPY {SCHEMA_NAME}.pokemon_base_stats FROM '{pokemon_base_stats}' CREDENTIALS 'aws_access_key_id={AWS_ACCESS_KEY_ID};aws_secret_access_key={AWS_SECRET_ACCESS_KEY}' IGNOREHEADER 1 DELIMITER ',' CSV;"

In [27]:
rs_conn = psycopg2.connect(dbname=DATABASE, user=USERNAME, password=PASSWORD, host=HOST, port=PORT)

In [28]:
with rs_conn:

    cur = rs_conn.cursor()
    cur.execute(sql_create_table)
    cur.execute(sql_copy_pokemon_info)
    cur.execute(sql_copy_pokemon_generations)
    cur.execute(sql_copy_pokemon_moves)
    cur.execute(sql_copy_pokemon_habitats)
    cur.execute(sql_copy_pokemon_base_stats)

    # Commit only at the end, so we won't end up
    # with a temp table and deleted main table if something fails
    rs_conn.commit()
    print()