In [1]:
import requests
import json

from pyspark.sql.types import IntegerType, StringType, ArrayType, ArrayType, StructType, StructField
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import col, monotonically_increasing_id, explode, row_number

In [2]:
spark = SparkSession \
    .builder \
    .appName("Pokemon Go ELT") \
    .getOrCreate()

In [3]:
response = requests.get('https://pogoapi.net/api/v1/pokemon_types.json')

if response.status_code == 500:
    print('O servidor não está acessível, erro 500.')
    
pokemon_types = response.json()

print(json.dumps(pokemon_types, indent=4))

[
    {
        "form": "Fall_2019",
        "pokemon_id": 1,
        "pokemon_name": "Bulbasaur",
        "type": [
            "Grass",
            "Poison"
        ]
    },
    {
        "form": "Normal",
        "pokemon_id": 1,
        "pokemon_name": "Bulbasaur",
        "type": [
            "Grass",
            "Poison"
        ]
    },
    {
        "form": "Purified",
        "pokemon_id": 1,
        "pokemon_name": "Bulbasaur",
        "type": [
            "Grass",
            "Poison"
        ]
    },
    {
        "form": "Shadow",
        "pokemon_id": 1,
        "pokemon_name": "Bulbasaur",
        "type": [
            "Grass",
            "Poison"
        ]
    },
    {
        "form": "Normal",
        "pokemon_id": 2,
        "pokemon_name": "Ivysaur",
        "type": [
            "Grass",
            "Poison"
        ]
    },
    {
        "form": "Purified",
        "pokemon_id": 2,
        "pokemon_name": "Ivysaur",
        "type": [
            "Grass",
       

In [4]:
pokemon_types_schema = StructType([
    StructField('id', IntegerType(), True),
    StructField('pokemon_id', IntegerType(), True),
    StructField('pokemon_name', StringType(), True),
    StructField('type', ArrayType(StringType()), True)
])

pokemon_types_dataframe = spark.createDataFrame(pokemon_types, pokemon_types_schema)

result = pokemon_types_dataframe \
    .dropDuplicates(['pokemon_name', 'type']) \
    .select(col('pokemon_id'),
            col('pokemon_name'),
            explode('type').alias('type'))

result.show(10)

+----------+------------+--------+
|pokemon_id|pokemon_name|    type|
+----------+------------+--------+
|        53|     Persian|  Normal|
|       226|     Mantine|   Water|
|       226|     Mantine|  Flying|
|       366|    Clamperl|   Water|
|         7|    Squirtle|   Water|
|       602|      Tynamo|Electric|
|       283|     Surskit|     Bug|
|       283|     Surskit|   Water|
|       569|    Garbodor|  Poison|
|        84|       Doduo|  Normal|
+----------+------------+--------+
only showing top 10 rows



In [5]:
result.printSchema()

root
 |-- pokemon_id: integer (nullable = true)
 |-- pokemon_name: string (nullable = true)
 |-- type: string (nullable = true)



In [6]:
result.toPandas().to_csv('pokemon_types.csv', index=False)

In [7]:
proof = spark.read.csv('pokemon_types.csv', header=True)

In [8]:
proof.show(5)

+----------+------------+------+
|pokemon_id|pokemon_name|  type|
+----------+------------+------+
|        53|     Persian|Normal|
|       226|     Mantine| Water|
|       226|     Mantine|Flying|
|       366|    Clamperl| Water|
|         7|    Squirtle| Water|
+----------+------------+------+
only showing top 5 rows

