In [1]:
# Define os imports necessários para a execução do código
from pyspark.sql.types import *
import pyspark.sql.functions as fn
from pyspark.sql import SparkSession
from IPython.core.display import HTML
import os, time, json
from datetime import datetime
from requests import post

display(HTML("<style>pre { white-space: pre !important; }</style>"))

# Define a sessão do Spark com os jars necessários para conexão com o MINIO
spark = (SparkSession.builder
         .config("spark.jars","""/home/jovyan/jars/aws-java-sdk-core-1.11.534.jar,
                                 /home/jovyan/jars/aws-java-sdk-dynamodb-1.11.534.jar,
                                 /home/jovyan/jars/aws-java-sdk-s3-1.11.534.jar,
                                 /home/jovyan/jars/hadoop-aws-3.2.2.jar""")
         .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000")
         .config("spark.hadoop.fs.s3a.access.key", "aulafia")
         .config("spark.hadoop.fs.s3a.secret.key", "aulafia@123")
         .config("spark.hadoop.fs.s3a.path.style.access", True)
         .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
         .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
         .getOrCreate()
        )


In [75]:
df_control = (spark
                .read
                .format('parquet')
                .load('s3a://landing-zone/igdb/games/control_table')
       )

In [76]:
df_control.show()

+-------------------+----------+-----------+------------+
|          Exec_date| Exec_time|  Load_type|Loaded_files|
+-------------------+----------+-----------+------------+
|2023-07-09 18:55:50|1688928950|Incremental|           1|
|2023-07-09 18:43:11|1688928191|Incremental|           0|
|2023-07-09 18:32:53|1688927573|    Initial|           3|
+-------------------+----------+-----------+------------+



In [72]:
last_execution = df_control.select(fn.max(fn.col("Exec_time")).alias("Latest_execution")).first()["Latest_execution"]
print(last_execution)

1688927573


In [77]:
# Define a data de extração para leitura no Lake
extraction_date = "2023-07-09-D3"

In [34]:
from minio import Minio

# Configura as informações de acesso ao MinIO para listar os objetos
minio_client = Minio("minio:9000", access_key="aulafia", secret_key="aulafia@123", secure=False)

In [78]:
# Nome do bucket
bucket_name = "landing-zone"

# Lista de caminhos dos arquivos JSON no MinIO
arquivos = []

# Lista os objeto no bucket e adiciona os caminhos dos arquivos à lista
for obj in minio_client.list_objects(bucket_name, prefix="igdb/games/" + extraction_date + "/"):
    caminho = f"s3a://{bucket_name}/{obj.object_name}"
    arquivos.append(caminho)

dfIGDB_Games = None

# Loop para ler cada arquivo JSON e combinar os DataFrames
for arquivo in arquivos:
    
    df_temp = spark.read.json(arquivo)
    
    # Se o DataFrame inicial estiver vazio, atribui o DataFrame atual
    if dfIGDB_Games is None:
        dfIGDB_Games = df_temp
    # Caso contrário, combina o DataFrame atual com o DataFrame anterior
    else:
        dfIGDB_Games = dfIGDB_Games.unionByName(df_temp, allowMissingColumns=True)

In [79]:
# Exibe o DataFrame resultante
dfIGDB_Games.show(20, False)

+--------------------------------------------------------+-----------------+-----------------------+-------------------------------------------------------------------------------------+-------------------------------+------------------------+--------+------------------------------------+----------+------+----------+------------------------------------+----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------+-------+---------+----------+------------+------------------+------------+---------------------------+-----+------+---------------------------------------------------------------+------------------------------------------------------------------------------------------------------

In [42]:
dfIGDB_Games.select("id", "updated_at").show(20, False)

+---+----------+
|id |updated_at|
+---+----------+
|1  |1688130235|
|2  |1686331580|
|3  |1688130381|
|4  |1688906243|
|5  |1686609526|
|6  |1687280734|
|7  |1688734186|
|8  |1686141912|
|9  |1686660684|
|10 |1686609978|
|11 |1687656923|
|12 |1687698537|
|13 |1688561774|
|14 |1688647657|
|15 |1688850161|
|16 |1688922158|
|17 |1688907411|
|18 |1688543479|
|19 |1688573741|
|20 |1688913853|
+---+----------+
only showing top 20 rows



In [80]:
dfIGDB_Games.select("id", "updated_at").show(20, False)

+------+----------+
|id    |updated_at|
+------+----------+
|426   |1688928791|
|2025  |1688928843|
|3239  |1688928628|
|6880  |1688928805|
|7348  |1688928661|
|9241  |1688928806|
|26194 |1688928858|
|36198 |1688928869|
|43165 |1688928863|
|51528 |1688928816|
|113119|1688928841|
|113175|1688928844|
|124992|1688928860|
|131946|1688928842|
|136512|1688928838|
|143772|1688928809|
|191857|1688928808|
|194205|1688928803|
|202858|1688928860|
+------+----------+

