In [None]:
import psycopg2
import pandas as pd
# PostgreSQL connection settings
DB_NAME = "pokemon_db"
DB_USER = "postgres"  
DB_PASSWORD = "kego"  
DB_HOST = "localhost"
DB_PORT = "5432"  

def connect_db():
    """Connects to PostgreSQL and returns a connection object."""
    return psycopg2.connect(
        dbname=DB_NAME, user=DB_USER, password=DB_PASSWORD, host=DB_HOST, port=DB_PORT
    )

In [16]:
from airflow import DAG
from airflow.operators.python import PythonOperator
import datetime
from datetime import datetime

# Correct the current time usage
current_time = datetime.now()

# Extract data from the API
def extract_pokemon():
    url = "https://pokeapi.co/api/v2/pokemon?limit=100"
    response = requests.get(url)
    return response.json()["results"]

# Transform the data into the required format
def transform_pokemon(pokemon_list):
    transformed_data = []
    for pokemon in pokemon_list:
        transformed_data.append((pokemon["name"], pokemon["url"]))
    return transformed_data

# Load the transformed data into PostgreSQL database
def load_pokemon_to_db(**context):
    conn = psycopg2.connect("dbname=pokemon_db user=your_username password=your_password")
    cursor = conn.cursor()
    # Pull data from the previous task (transform task)
    transformed_data = context['task_instance'].xcom_pull(task_ids='transform')
    for name, url in transformed_data:
        cursor.execute("INSERT INTO pokemon (name, api_url) VALUES (%s, %s) ON CONFLICT (name) DO NOTHING", (name, url))
    conn.commit()
    cursor.close()
    conn.close()

# Define the DAG with correct datetime format
dag = DAG(
    "pokemon_etl", 
    schedule_interval="@daily", 
    start_date=datetime(2024, 3, 1), 
    catchup=False
)

# Define the PythonOperator tasks
extract = PythonOperator(task_id="extract", python_callable=extract_pokemon, dag=dag)
transform = PythonOperator(task_id="transform", python_callable=transform_pokemon, provide_context=True, dag=dag)
load = PythonOperator(task_id="load", python_callable=load_pokemon_to_db, provide_context=True, dag=dag)

# Set the task dependencies (ETL flow)
extract >> transform >> load

<Task(PythonOperator): load>

In [10]:
airflow scheduler &
airflow webserver --port 8080

SyntaxError: invalid syntax (171200285.py, line 1)

In [17]:
def connect_db():
    return psycopg2.connect(
        dbname=DB_NAME, user=DB_USER, password=DB_PASSWORD, host=DB_HOST, port=DB_PORT
    )

# Export Pokémon data to CSV
def export_pokemon_to_csv():
    conn = connect_db()
    query = """
        SELECT 
            p.id, p.name, p.height, p.weight, p.base_experience,
            STRING_AGG(DISTINCT t.name, ', ') AS types,
            STRING_AGG(DISTINCT a.name, ', ') AS abilities,
            STRING_AGG(DISTINCT m.name, ', ') AS moves
        FROM pokemon p
        LEFT JOIN pokemon_types pt ON p.id = pt.pokemon_id
        LEFT JOIN types t ON pt.type_id = t.id
        LEFT JOIN pokemon_abilities pa ON p.id = pa.pokemon_id
        LEFT JOIN abilities a ON pa.ability_id = a.id
        LEFT JOIN pokemon_moves pm ON p.id = pm.pokemon_id
        LEFT JOIN moves m ON pm.move_id = m.id
        GROUP BY p.id
    """
    
    df = pd.read_sql(query, conn)  # Load data into Pandas DataFrame
    conn.close()
    
    # Save to CSV
    df.to_csv("pokemon_data.csv", index=False)
    print("CSV file created: pokemon_data.csv")

# Run export function
export_pokemon_to_csv()

  df = pd.read_sql(query, conn)  # Load data into Pandas DataFrame


CSV file created: pokemon_data.csv


In [15]:
import networkx as nx

query = "SELECT pokemon_id, evolves_to FROM evolutions"
df = pd.read_sql(query, conn)

G = nx.DiGraph()
for _, row in df.iterrows():
    G.add_edge(row['pokemon_id'], row['evolves_to'])

plt.figure(figsize=(12, 8))
nx.draw(G, with_labels=True, node_color="lightblue", edge_color="gray", node_size=2000, font_size=10)
plt.title("Pokémon Evolution Chain")
plt.show()

NameError: name 'conn' is not defined

In [13]:
def connect_db():
    return psycopg2.connect(
        dbname=DB_NAME, user=DB_USER, password=DB_PASSWORD, host=DB_HOST, port=DB_PORT
    )

# Fetch Pokémon list
def fetch_pokemon_list(limit=100):
    url = f"https://pokeapi.co/api/v2/pokemon?limit={limit}"
    response = requests.get(url)
    if response.status_code == 200:
        return [p['name'] for p in response.json()['results']]
    return []

# Fetch Pokémon details
def fetch_pokemon_data(pokemon_name):
    url = f"https://pokeapi.co/api/v2/pokemon/{pokemon_name.lower()}"
    response = requests.get(url)
    if response.status_code == 200:
        return response.json()
    return None

# Fetch evolution data
def fetch_evolution_chain(pokemon_id):
    url = f"https://pokeapi.co/api/v2/pokemon-species/{pokemon_id}"
    response = requests.get(url)
    if response.status_code == 200:
        species_data = response.json()
        if "evolution_chain" in species_data:
            chain_url = species_data["evolution_chain"]["url"]
            evolution_response = requests.get(chain_url)
            if evolution_response.status_code == 200:
                return evolution_response.json()
    return None

# Insert Pokémon data efficiently
def insert_pokemon(data):
    conn = connect_db()
    cursor = conn.cursor()

    # Insert Pokémon
    cursor.execute("""
        INSERT INTO pokemon (id, name, height, weight, base_experience)
        VALUES (%s, %s, %s, %s, %s)
        ON CONFLICT (id) DO NOTHING
    """, (data['id'], data['name'], data['height'], data['weight'], data['base_experience']))

    # Insert Types
    for type_info in data['types']:
        type_name = type_info['type']['name']
        cursor.execute("INSERT INTO types (name) VALUES (%s) ON CONFLICT (name) DO NOTHING", (type_name,))
        cursor.execute("""
            INSERT INTO pokemon_types (pokemon_id, type_id)
            SELECT %s, id FROM types WHERE name = %s
            ON CONFLICT DO NOTHING
        """, (data['id'], type_name))

    # Insert Abilities
    for ability_info in data['abilities']:
        ability_name = ability_info['ability']['name']
        cursor.execute("INSERT INTO abilities (name) VALUES (%s) ON CONFLICT (name) DO NOTHING", (ability_name,))
        cursor.execute("""
            INSERT INTO pokemon_abilities (pokemon_id, ability_id)
            SELECT %s, id FROM abilities WHERE name = %s
            ON CONFLICT DO NOTHING
        """, (data['id'], ability_name))

    # Insert Moves
    for move_info in data['moves']:
        move_name = move_info['move']['name']
        cursor.execute("INSERT INTO moves (name) VALUES (%s) ON CONFLICT (name) DO NOTHING", (move_name,))
        cursor.execute("""
            INSERT INTO pokemon_moves (pokemon_id, move_id)
            SELECT %s, id FROM moves WHERE name = %s
            ON CONFLICT DO NOTHING
        """, (data['id'], move_name))

    # Insert Stats
    for stat_info in data['stats']:
        stat_name = stat_info['stat']['name']
        base_stat = stat_info['base_stat']
        cursor.execute("INSERT INTO stats (name) VALUES (%s) ON CONFLICT (name) DO NOTHING", (stat_name,))
        cursor.execute("""
            INSERT INTO pokemon_stats (pokemon_id, stat_id, base_stat)
            SELECT %s, id, %s FROM stats WHERE name = %s
            ON CONFLICT DO NOTHING
        """, (data['id'], base_stat, stat_name))

    # Insert Evolution Data
    evolution_data = fetch_evolution_chain(data['id'])
    if evolution_data:
        chain = evolution_data["chain"]
        evolves_to = chain["evolves_to"]
        if evolves_to:
            for evolution in evolves_to:
                evolved_name = evolution["species"]["name"]
                cursor.execute("""
                    INSERT INTO evolutions (pokemon_id, evolves_to)
                    VALUES (%s, %s)
                    ON CONFLICT DO NOTHING
                """, (data['id'], evolved_name))

    conn.commit()
    cursor.close()
    conn.close()

# Bulk fetch and insert
pokemon_names = fetch_pokemon_list(limit=2000)
for name in pokemon_names:
    data = fetch_pokemon_data(name)
    if data:
        insert_pokemon(data)
        print(f"Inserted {name} into PostgreSQL database.")

Inserted bulbasaur into PostgreSQL database.
Inserted ivysaur into PostgreSQL database.
Inserted venusaur into PostgreSQL database.
Inserted charmander into PostgreSQL database.
Inserted charmeleon into PostgreSQL database.
Inserted charizard into PostgreSQL database.
Inserted squirtle into PostgreSQL database.
Inserted wartortle into PostgreSQL database.
Inserted blastoise into PostgreSQL database.
Inserted caterpie into PostgreSQL database.
Inserted metapod into PostgreSQL database.
Inserted butterfree into PostgreSQL database.
Inserted weedle into PostgreSQL database.
Inserted kakuna into PostgreSQL database.
Inserted beedrill into PostgreSQL database.
Inserted pidgey into PostgreSQL database.
Inserted pidgeotto into PostgreSQL database.
Inserted pidgeot into PostgreSQL database.
Inserted rattata into PostgreSQL database.
Inserted raticate into PostgreSQL database.
Inserted spearow into PostgreSQL database.
Inserted fearow into PostgreSQL database.
Inserted ekans into PostgreSQL dat

In [11]:
def get_pokemon():
    """Fetch all Pokémon from the database."""
    conn = connect_db()
    cursor = conn.cursor()
    
    cursor.execute("SELECT * FROM pokemon")
    results = cursor.fetchall()
    
    for row in results:
        print(row)
    
    cursor.close()
    conn.close()

get_pokemon()

(25, 'pikachu', 4, 60, 112)
(4, 'charmander', 6, 85, 62)
(1, 'bulbasaur', 7, 69, 64)
(7, 'squirtle', 5, 90, 63)
