## Importing libraries

In [3]:
import requests
import os
import json

from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, StructType, ArrayType, TimestampType

%load_ext sparksql_magic

## Getting data from API

In [3]:
start_date = '2000-01-01'
end_date = '2024-08-03'

def get_endpoint(start_date, end_date):
    url = f'https://db.ygoprodeck.com/api/v7/cardinfo.php?&startdate={start_date}&enddate={end_date}'
    resp = requests.get(url)
    
    if resp.status_code == 200:
        return resp.json()

In [4]:
data = get_endpoint(start_date, end_date)
output_directory = '../data/input/'
data_ingestion = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
now = datetime.now().strftime('%Y%m%d_%H%M%S.%f')

def save_data(data, output_directory, data_ingestion, now):
    cards_data = data.get("data", [])
    for card in cards_data:
        card['date_ingestion'] = data_ingestion

    filename = os.path.join(output_directory, f'{now}.json')
    with open(filename, "w") as open_file:
        json.dump(cards_data, open_file, indent=4)

save_data(data, output_directory, data_ingestion, now)

## Building the spark session

In [5]:
spark = SparkSession.builder \
    .appName("yugioh-api") \
    .master("spark://spark-master:7077") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/05 18:27:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Extracting and analyzing

In [14]:
schema = StructType([
    StructField("archetype", StringType(), True),
    StructField("atk", IntegerType(), True),
    StructField("attribute", StringType(), True),
    StructField("banlist_info", StructType([
        StructField("ban_goat", StringType(), True),
        StructField("ban_ocg", StringType(), True),
        StructField("ban_tcg", StringType(), True)
    ]), True),
    StructField("card_images", ArrayType(StructType([
        StructField("id", IntegerType(), True),
        StructField("image_url", StringType(), True),
        StructField("image_url_cropped", StringType(), True),
        StructField("image_url_small", StringType(), True)
    ])), True),
    StructField("card_prices", ArrayType(StructType([
        StructField("amazon_price", StringType(), True),
        StructField("cardmarket_price", StringType(), True),
        StructField("coolstuffinc_price", StringType(), True),
        StructField("ebay_price", StringType(), True),
        StructField("tcgplayer_price", StringType(), True)
    ])), True),
    StructField("card_sets", ArrayType(StructType([
        StructField("set_code", StringType(), True),
        StructField("set_name", StringType(), True),
        StructField("set_price", StringType(), True),
        StructField("set_rarity", StringType(), True),
        StructField("set_rarity_code", StringType(), True)
    ])), True),
    StructField("date_ingestion", TimestampType(), True),  
    StructField("def", IntegerType(), True),
    StructField("desc", StringType(), True),
    StructField("frameType", StringType(), True),
    StructField("id", IntegerType(), True),
    StructField("level", IntegerType(), True),
    StructField("linkmarkers", ArrayType(StringType()), True),
    StructField("linkval", IntegerType(), True),
    StructField("monster_desc", StringType(), True),
    StructField("name", StringType(), True),
    StructField("pend_desc", StringType(), True),
    StructField("race", StringType(), True),
    StructField("scale", IntegerType(), True),
    StructField("type", StringType(), True),
    StructField("ygoprodeck_url", StringType(), True)
])

In [15]:
file_path_json = "/opt/spark/yugioh-api-with-spark/data/input"

df = spark.read \
    .schema(schema) \
    .option('multiline', 'true') \
    .json(file_path_json)

In [16]:
df.show(1)

[Stage 4:>                                                          (0 + 1) / 1]

+---------+----+---------+------------+--------------------+--------------------+--------------------+-------------------+----+--------------------+---------+--------+-----+-----------+-------+------------+--------------------+---------+----------+-----+----------+--------------------+
|archetype| atk|attribute|banlist_info|         card_images|         card_prices|           card_sets|     date_ingestion| def|                desc|frameType|      id|level|linkmarkers|linkval|monster_desc|                name|pend_desc|      race|scale|      type|      ygoprodeck_url|
+---------+----+---------+------------+--------------------+--------------------+--------------------+-------------------+----+--------------------+---------+--------+-----+-----------+-------+------------+--------------------+---------+----------+-----+----------+--------------------+
|    Alien|NULL|     NULL|        NULL|[{34541863, https...|[{24.45, 0.09, 0....|[{FOTB-EN043, For...|2024-08-05 18:25:50|NULL|During each 

                                                                                

## Transforming data

In [17]:
df.createOrReplaceTempView('yugioh')

In [18]:
%%sparksql

SELECT 
    A.id id_card, 
    A.name name_card, 
    A.type type_card, 
    A.frameType frame_type_card,
    A.race race_card,
    IFNULL(A.archetype, 'N/I') archetype_card,
    IFNULL(A.scale, 'N/I') scale_card,
    A.desc description_card,
    IFNULL(A.atk, 0) atk_card, 
    IFNULL(A.def, 0) def_card, 
    IFNULL(A.level, 0) level_card,
    card_set.set_name card_set_name_card,
    card_set.set_code card_set_code_card,
    card_set.set_rarity card_set_rarity_card,
    card_set.set_rarity_code card_set_rarity_code_card,
    card_set.set_price card_set_price_card,
    card_price.amazon_price amazon_price_card,
    card_price.cardmarket_price cardmarket_price_card,
    card_price.coolstuffinc_price coolstuffinc_price_card,
    card_price.ebay_price ebay_price_card,
    card_price.tcgplayer_price tcgplayer_price_card,
    card_image.image_url image_url_card,
    card_image.image_url_cropped image_url_cropped_card,
    card_image.image_url_small image_url_small_card,
    IFNULL(A.banlist_info.ban_goat, 'N/I') ban_goat_card,
    IFNULL(A.banlist_info.ban_ocg, 'N/I') ban_ocg_card,
    IFNULL(A.banlist_info.ban_tcg, 'N/I') ban_tcg_card,
    A.date_ingestion,
    A.ygoprodeck_url
FROM yugioh A 
LATERAL VIEW explode(card_sets) card_set AS card_set
LATERAL VIEW explode(card_prices) card_price AS card_price
LATERAL VIEW explode(card_images) card_image as card_image
WHERE 1=1 
LIMIT 1

24/08/05 18:34:55 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28
id_card,name_card,type_card,frame_type_card,race_card,archetype_card,scale_card,description_card,atk_card,def_card,level_card,card_set_name_card,card_set_code_card,card_set_rarity_card,card_set_rarity_code_card,card_set_price_card,amazon_price_card,cardmarket_price_card,coolstuffinc_price_card,ebay_price_card,tcgplayer_price_card,image_url_card,image_url_cropped_card,image_url_small_card,ban_goat_card,ban_ocg_card,ban_tcg_card,date_ingestion,ygoprodeck_url
34541863,"""A"" Cell Breeding Device",Spell Card,spell,Continuous,Alien,N/I,"During each of your Standby Phases, put 1 A-Counter on 1 face-up monster your opponent controls.",0,0,0,Force of the Breaker,FOTB-EN043,Common,(C),0,24.45,0.09,0.25,0.99,0.19,https://images.ygoprodeck.com/images/cards/34541863.jpg,https://images.ygoprodeck.com/images/cards_cropped/34541863.jpg,https://images.ygoprodeck.com/images/cards_small/34541863.jpg,N/I,N/I,N/I,2024-08-05 18:25:50,https://ygoprodeck.com/card/a-cell-breeding-device-9766


In [19]:
result_df = spark.sql("""
SELECT 
    A.id id_card, 
    A.name name_card, 
    A.type type_card, 
    A.frameType frame_type_card,
    A.race race_card,
    IFNULL(A.archetype, 'N/I') archetype_card,
    IFNULL(A.scale, 'N/I') scale_card,
    A.desc description_card,
    IFNULL(A.atk, 0) atk_card, 
    IFNULL(A.def, 0) def_card, 
    IFNULL(A.level, 0) level_card,
    card_set.set_name card_set_name_card,
    card_set.set_code card_set_code_card,
    card_set.set_rarity card_set_rarity_card,
    card_set.set_rarity_code card_set_rarity_code_card,
    card_set.set_price card_set_price_card,
    card_price.amazon_price amazon_price_card,
    card_price.cardmarket_price cardmarket_price_card,
    card_price.coolstuffinc_price coolstuffinc_price_card,
    card_price.ebay_price ebay_price_card,
    card_price.tcgplayer_price tcgplayer_price_card,
    card_image.image_url image_url_card,
    card_image.image_url_cropped image_url_cropped_card,
    card_image.image_url_small image_url_small_card,
    IFNULL(A.banlist_info.ban_goat, 'N/I') ban_goat_card,
    IFNULL(A.banlist_info.ban_ocg, 'N/I') ban_ocg_card,
    IFNULL(A.banlist_info.ban_tcg, 'N/I') ban_tcg_card,
    A.date_ingestion,
    A.ygoprodeck_url
FROM yugioh A 
LATERAL VIEW explode(card_sets) card_set AS card_set
LATERAL VIEW explode(card_prices) card_price AS card_price
LATERAL VIEW explode(card_images) card_image as card_image
WHERE 1=1 
""")

## Exporting data

In [59]:
output_path = '/opt/spark/yugioh-api-with-spark/data/output/yugioh_tcg/'
result_df.write.partitionBy('type_card').mode('append').parquet(output_path)

                                                                                

## Reading data

In [60]:
file_path_parquet = '/opt/spark/yugioh-api-with-spark/data/output/yugioh_tcg'

df_parquet = spark.read \
    .parquet(file_path_parquet)

In [61]:
df_parquet.createOrReplaceTempView('yugiohtcg')

In [63]:
%%sparksql

SELECT COUNT(*) FROM yugiohtcg

0
count(1)
39445


In [64]:
spark.stop()