# PySpark-RuneScape
Solução simples para aprendizado com PySpark, onde faço a ingestão de dados da API de monstros do jogo RuneScape, realizo algumas operações simples (filtragem e agregação) e converto os dados para o formato Parquet.

# Como utilizar

O script deverá ser rodado em ambiente Ubuntu 22.04.

1. Realizar a instalação de Python versão 3.10.12
2. Realização da instalação de Java JDK versão 11.0.22
3. Instalar as bibliotecas do requirements.txt
4. Executar o script "main.py" na pasta raíz do repositório.

O script gerará arquivos de input, printará as filtragens e agregações no dataframe, e criará os outputs parquet em pastas dentro do próprio repositório.

# Primeiros passos

Declaração de variáveis de ambiente que serão utilizadas no processo:

* JSON_DATA_PATH: Caminho onde os arquivos que serão consumidos da API serão armazenados (source);
* PARQUET_DATA_PATH: Caminho onde os arquivos serão armazenados após o processo de transformação (target);
* RUNESCAPE_BASE_URL: Link da API onde serão capturados os arquivos que serão ingeridos em formato JSON, transformados e armazenados em Parquet;

In [8]:
JSON_DATA_PATH = "./Data/Json"
PARQUET_DATA_PATH = "./Data/Parquet"
RUNESCAPE_BASE_URL = "https://secure.runescape.com/m=itemdb_rs/bestiary/beastData.json?beastid="

Importação as libs necessárias para execução do processo, principalmente as funções e dependências do pyspark;

In [11]:
import os
import requests
from faker import Faker
from json import dump
import os
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number

Declaração da classe personalizada do Spark para setar as variáveis necessárias para o processo;
Criação de pastas necessárias para as transações caso não existam;
Criação da sessão do Spark;
Declaração de funções para manipualação dos dados:

- read_source_data: função para leitura do arquivo JSON;
- mfilter_data: funçao para filtragem do DataFrame onde só são considerados arquivos onde o campo member = True;
- group_by_count: Agrupamento para contabilização de membros e não membros;
- order_by_release: Função orgenação do Data Frame pela data "date_released";
- convert_to_parquet: função para salvar os dados em formato parquet no diretório padrão.

In [12]:
class PySparkRuneScape():
    def __init__(self):
        # Instancia caminhos de dados
        self.source_data_path = JSON_DATA_PATH
        self.destination_path = PARQUET_DATA_PATH
    
        # Verifica se PARQUET_DATA_PATH existe e cria, caso não exista
        if not os.path.isdir(PARQUET_DATA_PATH):
            os.makedirs(PARQUET_DATA_PATH)

        # Inicia sessão Spark
        self.spark = SparkSession.builder \
            .appName("RuneScapeStream") \
            .getOrCreate()

        self.source_data = self.read_source_data()
    
    def read_source_data(self):
        return self.spark.read.json(self.source_data_path)
    
    def filter_data(self, df, column: str, value: str):
        return df.where(col(column) == value)

    def group_by_count(self, df, column: str):
        return df.groupBy(column).count()

    def order_by_release(self, df):
        window_spec = Window.orderBy("date_released")
        return df.withColumn("row_number", row_number().over(window_spec))

    def convert_to_parquet(self, df):
        df.write.parquet(self.destination_path, mode="overwrite")
    

Declaração de função para limpeza, caso necessário, dos diretório onde são armazenados os arquivos de entrada no formato JSON;

In [13]:
def erase_input_files():
    for file in os.listdir(JSON_DATA_PATH):
        os.remove(os.path.join(JSON_DATA_PATH, file))

Declaração de função para realização das requisições na API através de ids sequenciais com increment de 1;
Salvamento os arquivos JSON nomeando-os pelo id de membro;

In [None]:
def get_runescape_data(num_of_beasts):  
    # Instanciamento de classe fabricadora de dados
    fake = Faker()
    
    # Cria diretórios se houver necessidade
    if not os.path.exists(JSON_DATA_PATH):
        os.makedirs(JSON_DATA_PATH)
    
    for id in range(1, num_of_beasts + 1):
        
        beast_data = requests.get(RUNESCAPE_BASE_URL + str(id))
        if beast_data.status_code == 200:
            if beast_data.text != '':
                
                beast_data = beast_data.json()

                # Gera dados fake de data para utilizar função window
                beast_data['date_released'] = fake.date()
                
                with open(f"{JSON_DATA_PATH}/{id}.json", 'w') as file:
                    dump(beast_data, fp=file)
            
            else:
                print(f"Beast data for ID {id} returned empty. Status code: {beast_data.status_code}")
        else:
            print(f"Failed to fetch data for beast with ID {id}. Status code: {beast_data.status_code}")

* Alimentação do argumento de quantidade de função get_runescape_data para definição do número de membros que serão capturados;
* Atribui a instancia da classe de pyspark a uma variável;
* Cria uma Data Frame e filtra apenas onde o campo "member" é igual a "true";
* Exibe os resultados do Data Frame;
* Cria uma novo Data Frame, usando o anterior, ordernado pela data "released_date";
* Exibe os resultados do Data Frame;
* Cria uma novo Data Frame,usando o anterior, agrupando pelo campo "members" e realizando uma contagem;
* Converte os dados para Parquet e salva no diretório já especificado em passos anteriores;
* Apaga os arquivos no formato JSON utilizados para ingestão inicial;

In [None]:
# Pega dados fonte
get_runescape_data(10)

# Instancia classe de pyspark
rs = PySparkRuneScape()

# Filtra monstros para "membros"
members_monsters = rs.filter_data(rs.source_data, "members", "true")
members_monsters.show()

# Ordena monstros para "membros" por released_date
members_monsters_ordered = rs.order_by_release(members_monsters)
members_monsters_ordered.show()

# Conta quantos monstros são para "membros" e quantos não
members_monsters_count = rs.group_by_count(rs.source_data, "members")
members_monsters_count.show()

# Converte dados em JSON para Parquet
rs.convert_to_parquet(rs.source_data)

# Apaga arquivos fonte
erase_input_files()