In [8]:
import requests
import pandas as pd
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("AnimeDataIngestion").getOrCreate()

# API URL
url = "https://api.jikan.moe/v4/anime"

# Fetch data
response = requests.get(url)
data = response.json()

# Extract anime list
anime_list = data["data"]

# Convert to Pandas DataFrame
pdf = pd.DataFrame(anime_list)

# Convert to Spark DataFrame
df = spark.createDataFrame(pdf)

# Show first few rows
df.show(5)

StatementMeta(, 6a641e71-3444-49a6-abf9-8953416b20dc, 10, Finished, Available, Finished)

+------+--------------------+--------------------+--------------------+--------+--------------------+--------------------+--------------------+---------------------------+--------------------+-----+--------+--------+---------------+------+--------------------+-------------+--------------------+-----+---------+----+----------+-------+---------+--------------------+--------------------+------+------+--------------------+--------------------+--------------------+--------------------+--------------------+---------------+--------------------+--------------------+
|mal_id|                 url|              images|             trailer|approved|              titles|               title|       title_english|             title_japanese|      title_synonyms| type|  source|episodes|         status|airing|               aired|     duration|              rating|score|scored_by|rank|popularity|members|favorites|            synopsis|          background|season|  year|           broadcast|           pr

In [13]:
from pyspark.sql.functions import col, when, size, lit

# 1. Extract studio and genre names safely
df_extracted = df.withColumn(
    "studio",
    when(
        (col("studios").isNotNull()) & (size(col("studios")) > 0),
        col("studios")[0]["name"]
    ).otherwise("Unknown")
).withColumn(
    "genre",
    when(
        (col("genres").isNotNull()) & (size(col("genres")) > 0),
        col("genres")[0]["name"]
    ).otherwise("Unknown")
)

# 2. Select and rename columns
df_selected = df_extracted.select(
    col("mal_id").alias("anime_id"),
    col("title"),
    col("title_english"),
    col("type"),
    col("source"),
    col("episodes"),
    col("status"),
    col("duration"),
    col("rating"),
    col("score"),
    col("scored_by"),
    col("rank"),
    col("popularity"),
    col("members"),
    col("favorites"),
    col("season"),
    col("year"),
    col("studio"),
    col("genre")
)

# 3. Fill nulls
df_clean = df_selected.fillna({
    "title_english": "Unknown",
    "score": 0.0,
    "episodes": 0,
    "studio": "Unknown",
    "genre": "Unknown"
})

# 4. Cast types
df_final = df_clean \
    .withColumn("score", col("score").cast("float")) \
    .withColumn("episodes", col("episodes").cast("int")) \
    .withColumn("members", col("members").cast("int")) \
    .withColumn("favorites", col("favorites").cast("int")) \
    .withColumn("rank", col("rank").cast("int")) \
    .withColumn("popularity", col("popularity").cast("int"))

# Show result
print("✅ Final DataFrame schema:")
df_final.printSchema()

print("\n✅ Sample data:")
df_final.show(10, truncate=False)

# Count rows
print(f"\n✅ Total records: {df_final.count()}")

StatementMeta(, 6a641e71-3444-49a6-abf9-8953416b20dc, 15, Finished, Available, Finished)

✅ Final DataFrame schema:
root
 |-- anime_id: long (nullable = true)
 |-- title: string (nullable = true)
 |-- title_english: string (nullable = false)
 |-- type: string (nullable = true)
 |-- source: string (nullable = true)
 |-- episodes: integer (nullable = true)
 |-- status: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- score: float (nullable = false)
 |-- scored_by: long (nullable = true)
 |-- rank: integer (nullable = true)
 |-- popularity: integer (nullable = true)
 |-- members: integer (nullable = true)
 |-- favorites: integer (nullable = true)
 |-- season: string (nullable = true)
 |-- year: double (nullable = true)
 |-- studio: string (nullable = false)
 |-- genre: string (nullable = false)


✅ Sample data:
+--------+-------------------------------+-----------------------+-----+--------+--------+---------------+-------------+------------------------------+-----+---------+----+----------+-------+---------+------+---

In [14]:
# Write to Delta table in Lakehouse
df_final.write.mode("overwrite").format("delta").saveAsTable("anime_cleaned")

print("✅ Data saved as Delta table: anime_cleaned")

StatementMeta(, 6a641e71-3444-49a6-abf9-8953416b20dc, 16, Finished, Available, Finished)

✅ Data saved as Delta table: anime_cleaned


In [15]:
# Verify the table was created
tables = spark.sql("SHOW TABLES")
tables.show()

# Check if our table exists
if "anime_cleaned" in [row.tableName for row in tables.collect()]:
    print("✅ Table 'anime_cleaned' exists!")
    
    # Preview data from the table
    spark.sql("SELECT * FROM anime_cleaned LIMIT 5").show()
else:
    print("❌ Table not found. Let's create it...")
    
    # Create it manually if needed
    df_final.write.mode("overwrite").format("delta").saveAsTable("anime_cleaned")
    print("✅ Table created successfully!")

StatementMeta(, 6a641e71-3444-49a6-abf9-8953416b20dc, 17, Finished, Available, Finished)

+---------------+-------------+-----------+
|      namespace|    tableName|isTemporary|
+---------------+-------------+-----------+
|anime_lakehouse|anime_cleaned|      false|
+---------------+-------------+-----------+

✅ Table 'anime_cleaned' exists!
+--------+-------------+-------------+----+-----------+--------+---------------+-------------+--------------------+-----+---------+----+----------+-------+---------+------+------+--------------+-----------+
|anime_id|        title|title_english|type|     source|episodes|         status|     duration|              rating|score|scored_by|rank|popularity|members|favorites|season|  year|        studio|      genre|
+--------+-------------+-------------+----+-----------+--------+---------------+-------------+--------------------+-----+---------+----+----------+-------+---------+------+------+--------------+-----------+
|      25|    Sunabouzu|  Desert Punk|  TV|      Manga|      24|Finished Airing|24 min per ep|R - 17+ (violence...| 7.36|    5