In [0]:
#importação de bibliotecas necessárias
import requests
import json
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, element_at, size, lit, try_element_at

In [0]:
def get_pokemon(limit=20, offset=0):
    url = f"https://pokeapi.co/api/v2/pokemon?limit={limit}&offset={offset}"
    response = requests.get(url)
    if response.status_code == 200:
        data = response.json()["results"]
        pokemons = []
        for p in data:
            # Chama o detalhe do Pokémon
            poke_detail = requests.get(p["url"]).json()
            pokemons.append({
                "id": poke_detail["id"],
                "name": poke_detail["name"],
                "base_experience": poke_detail.get("base_experience"),
                "height": poke_detail.get("height"),
                "weight": poke_detail.get("weight"),
                "types": [t["type"]["name"] for t in poke_detail["types"]]
            })
        return pokemons
    else:
        return []



In [0]:
def pipeline_pokes(spark):
    dados = get_pokemon()

    schema = StructType([
        StructField("id", IntegerType(), True),
        StructField("name", StringType(), True),
        StructField("base_experience", IntegerType(), True),
        StructField("height", IntegerType(), True),
        StructField("weight", IntegerType(), True),
        StructField("types", ArrayType(StringType()), True)
    ])

    df = spark.createDataFrame(dados, schema=schema)

    df = df \
        .withColumn("type1", try_element_at(col("types"), 1)) \
        .withColumn("type2", try_element_at(col("types"), 2)) \
        .drop("types")

    return df
 