<a href="https://colab.research.google.com/github/prasadvaze-markel/data-engineer-learning-path/blob/published/pandastopyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

https://www.kaggle.com/datasets/hernan4444/anime-recommendation-database-2020?select=animelist.csv


## 🐌 Hadoop MapReduce
```
// === Hadoop MapReduce (Java): Word Count ===
// Verbose and boilerplate-heavy

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {
  public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}
```

## ⚡ Apache Spark
```
// === Apache Spark (Java): Word Count ===
// Concise and expressive

import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.SparkConf;
import java.util.Arrays;

public class SimpleWordCount {
  public static void main(String[] args) {
    SparkConf conf = new SparkConf().setAppName("WordCount").setMaster("local");
    JavaSparkContext sc = new JavaSparkContext(conf);

    JavaRDD<String> lines = sc.textFile(args[0]);
    JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
    JavaPairRDD<String, Integer> wordCounts = words
        .mapToPair(word -> new scala.Tuple2<>(word, 1))
        .reduceByKey(Integer::sum);

    wordCounts.saveAsTextFile(args[1]);
    sc.close();
  }
}
```

|  Feature | Hadoop  |  Spark |
|---|---|---|
|  Lines of Code |  ~55 |  ~20 |
|  Classes Needed | Mapper, Reducer, Driver  |  Single main class |
| Disk usage between stages  |  Writes to disk | 	In-memory processing   |
|  Complexity |  	High (verbose, boilerplate-heavy) | Low (functional, expressive)  |
| API Paradigm  |  Rigid, procedural |  Functional, declarative |


# Setting up Spark

## By default, PySpark isn’t included in the Colab environment

In [None]:
!pip install pyspark



## Creating a Spark session, which is the entry point

The SparkSession automatically starts a local cluster behind the scenes (within the Colab environment).

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("PandasToPySpark") \
    .master("local[*]") \
    .getOrCreate()

print(f"Spark version: {spark.version}")

Spark version: 3.5.5


In [None]:
# header=True: treats the first row as headers
# inferSchema=True: automatically infer data types
#user_ratings_df = spark.read.csv("/content/drive/MyDrive/PySpark Workshop/Scrapping MyAnimelist/animelist.csv", header=True, inferSchema=True)
user_ratings_df = spark.read.csv("/animelist.csv", header=True, inferSchema=True)

user_ratings_df.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- anime_id: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- watching_status: integer (nullable = true)
 |-- watched_episodes: integer (nullable = true)



In [None]:
user_ratings_df.show(5)

+-------+--------+------+---------------+----------------+
|user_id|anime_id|rating|watching_status|watched_episodes|
+-------+--------+------+---------------+----------------+
|      0|      67|     9|              1|               1|
|      0|    6702|     7|              1|               4|
|      0|     242|    10|              1|               4|
|      0|    4898|     0|              1|               1|
|      0|      21|    10|              1|               0|
+-------+--------+------+---------------+----------------+
only showing top 5 rows



You can explicitly define a schema too.

```
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType

anime_schema = StructType([
    StructField("user_id", IntegerType(), True),
    StructField("anime_id", IntegerType(), True),
    StructField("rating", IntegerType(), True),
    StructField("watching_status", IntegerType(), True),
    StructField("watched_episodes", IntegerType(), True)
])

anime_df_explicit = spark.read.format("csv") \
    .option("header", "true") \
    .schema(anime_schema) \
    .load("/content/drive/MyDrive/animelist.csv")
```

In [None]:
#animes_df = spark.read.csv("/content/drive/MyDrive/PySpark Workshop/Scrapping MyAnimelist/anime.csv", header=True, inferSchema=True)
animes_df = spark.read.csv("/anime.csv", header=True, inferSchema=True)

animes_df.printSchema()

root
 |-- MAL_ID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Score: string (nullable = true)
 |-- Genres: string (nullable = true)
 |-- English name: string (nullable = true)
 |-- Japanese name: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Episodes: string (nullable = true)
 |-- Aired: string (nullable = true)
 |-- Premiered: string (nullable = true)
 |-- Producers: string (nullable = true)
 |-- Licensors: string (nullable = true)
 |-- Studios: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- Duration: string (nullable = true)
 |-- Rating: string (nullable = true)
 |-- Ranked: string (nullable = true)
 |-- Popularity: string (nullable = true)
 |-- Members: double (nullable = true)
 |-- Favorites: integer (nullable = true)
 |-- Watching: integer (nullable = true)
 |-- Completed: integer (nullable = true)
 |-- On-Hold: integer (nullable = true)
 |-- Dropped: integer (nullable = true)
 |-- Plan to Watch: integer (nullable = t

In [None]:
animes_df.show(5)

+------+--------------------+-----+--------------------+--------------------+---------------------------+-----+--------+--------------------+-----------+--------------------+--------------------+--------------+--------+---------------+--------------------+------+----------+---------+---------+--------+---------+-------+-------+-------------+--------+--------+--------+-------+-------+-------+-------+-------+-------+-------+
|MAL_ID|                Name|Score|              Genres|        English name|              Japanese name| Type|Episodes|               Aired|  Premiered|           Producers|           Licensors|       Studios|  Source|       Duration|              Rating|Ranked|Popularity|  Members|Favorites|Watching|Completed|On-Hold|Dropped|Plan to Watch|Score-10| Score-9| Score-8|Score-7|Score-6|Score-5|Score-4|Score-3|Score-2|Score-1|
+------+--------------------+-----+--------------------+--------------------+---------------------------+-----+--------+--------------------+-----

# PySpark vs Pandas: Schemas
## Why Schemas Matter 🩻

- Spark doesn't need to scan the entire dataset repeatedly to understand types, it already has that metadata.

- Since each column has a known type, Spark can optimize execution, including:
  - Memory layout
  - Serialization (sending data between nodes)
  - Generating JVM bytecode to process columns efficiently

- Schema-awareness allows Spark to scale big data without losing performance or reliability.

<br>

- Pandas does have data type tracking

```
import pandas as pd

df = pd.read_csv("animelist.csv")
df.dtypes

user_id           int64
anime_id          int64
rating            int64
watching_status   int64
watched_episodes  int64
dtype: int64

```

- But this isn't enfoced. A column could have mixed types defaulting the data type to object.

<br>

- Schemas in Spark are enforced.

- Pandas uses NumPy arrays under the hood, which expect consistent types. As soon as it sees "unknown" (a string), it converts the whole column to object — which is a catch-all type.

`df["rating"].mean()  # results in an error`

- You'd need to manually fix the error

`df["rating"] = pd.to_numeric(df["rating"], errors="coerce")`
- The `"unknown"` becomes `NAN`

### Correcting a column's datatype

In [None]:
from pyspark.sql.functions import col

score_cols = [f"Score-{i}" for i in range(1, 10)]

animes_df = animes_df.select([
    col(score_col).cast("int").alias(score_col) if score_col in score_cols else col(score_col)
    for score_col in animes_df.columns
])

animes_df.printSchema()

root
 |-- MAL_ID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Score: string (nullable = true)
 |-- Genres: string (nullable = true)
 |-- English name: string (nullable = true)
 |-- Japanese name: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Episodes: string (nullable = true)
 |-- Aired: string (nullable = true)
 |-- Premiered: string (nullable = true)
 |-- Producers: string (nullable = true)
 |-- Licensors: string (nullable = true)
 |-- Studios: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- Duration: string (nullable = true)
 |-- Rating: string (nullable = true)
 |-- Ranked: string (nullable = true)
 |-- Popularity: string (nullable = true)
 |-- Members: double (nullable = true)
 |-- Favorites: integer (nullable = true)
 |-- Watching: integer (nullable = true)
 |-- Completed: integer (nullable = true)
 |-- On-Hold: integer (nullable = true)
 |-- Dropped: integer (nullable = true)
 |-- Plan to Watch: integer (nullable = t

In [None]:
animes_df = animes_df.withColumn("Score-10", col("Score-10").cast("int")) \
                     .withColumn("Popularity", col("Popularity").cast("int")) \
                     .withColumn("Ranked", col("Ranked").cast("int"))

animes_df.printSchema()

root
 |-- MAL_ID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Score: string (nullable = true)
 |-- Genres: string (nullable = true)
 |-- English name: string (nullable = true)
 |-- Japanese name: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Episodes: string (nullable = true)
 |-- Aired: string (nullable = true)
 |-- Premiered: string (nullable = true)
 |-- Producers: string (nullable = true)
 |-- Licensors: string (nullable = true)
 |-- Studios: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- Duration: string (nullable = true)
 |-- Rating: string (nullable = true)
 |-- Ranked: integer (nullable = true)
 |-- Popularity: integer (nullable = true)
 |-- Members: double (nullable = true)
 |-- Favorites: integer (nullable = true)
 |-- Watching: integer (nullable = true)
 |-- Completed: integer (nullable = true)
 |-- On-Hold: integer (nullable = true)
 |-- Dropped: integer (nullable = true)
 |-- Plan to Watch: integer (nullable =

# Operations in Spark
- Transformations: tell Spark how to manipulate data but don't do anything until an action is called.
-Actions: trigger execution. Spark reads the data, runs all the queued transformations, and produces a result.


In [None]:
# @title
from IPython.display import HTML, display

sentence = '''
<span style="font-size:20px">
<span style="color:#00acc1; font-weight:bold">Transformations</span> build the plan while <span style="color:#42a5f5; font-weight:bold">Actions</span> execute the plan.
</span>
'''

display(HTML(sentence))

In [None]:
user_ratings_df.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- anime_id: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- watching_status: integer (nullable = true)
 |-- watched_episodes: integer (nullable = true)



In [None]:
filtered_ratings = user_ratings_df.filter(user_ratings_df["rating"] > 8) # this is a lazy eval

In [None]:
filtered_ratings.count() # action

5242071

In [None]:
filtered_ratings.explain() # see the DAG

== Physical Plan ==
*(1) Filter (isnotnull(rating#19) AND (rating#19 > 8))
+- FileScan csv [user_id#17,anime_id#18,rating#19,watching_status#20,watched_episodes#21] Batched: false, DataFilters: [isnotnull(rating#19), (rating#19 > 8)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/animelist.csv], PartitionFilters: [], PushedFilters: [IsNotNull(rating), GreaterThan(rating,8)], ReadSchema: struct<user_id:int,anime_id:int,rating:int,watching_status:int,watched_episodes:int>




# PySpark vs Pandas: Parallelism

`pandas.read_csv("animelist.csv")`

- Your entire file is read by a single CPU core.

- The whole dataset must fit into memory (RAM).

- All operations like `user_ratings_df[user_ratings_df["rating"] > 8]` happen in one thread.

⚠️ So if your CSV is 10GB and your RAM is 8GB → 💥 you might experience crashes.

`park.read.csv("animelist.csv")`

- Spark splits the file into chunks (partitions).

- Each chunk is sent to a different executor (worker process).

- All of them read and process data in parallel.


<br>

For example, Spark can split up a 10GB file into 20 partitions where there are 4 executors with 5 cores. Each parition can handle 20 tasks and all paritions are running in parallel.



# Basic Data Cleaning 🧼

### count the number of nulls or empty values

In [None]:
from pyspark.sql.functions import isnan, when, count, trim

filtered_ratings.select([
    count(when(isnan(column) | col(column).isNull() | (trim(col(column)) == ""), column)).alias(column)
    for column in filtered_ratings.columns
]).show()

+-------+--------+------+---------------+----------------+
|user_id|anime_id|rating|watching_status|watched_episodes|
+-------+--------+------+---------------+----------------+
|      0|       0|     0|              0|               0|
+-------+--------+------+---------------+----------------+



In [None]:
animes_df.select([
    count(when(isnan(column) | col(column).isNull() | (trim(col(column)) == ""), column)).alias(column)
    for column in animes_df.columns
]).show()

+------+----+-----+------+------------+-------------+----+--------+-----+---------+---------+---------+-------+------+--------+------+------+----------+-------+---------+--------+---------+-------+-------+-------------+--------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|MAL_ID|Name|Score|Genres|English name|Japanese name|Type|Episodes|Aired|Premiered|Producers|Licensors|Studios|Source|Duration|Rating|Ranked|Popularity|Members|Favorites|Watching|Completed|On-Hold|Dropped|Plan to Watch|Score-10|Score-9|Score-8|Score-7|Score-6|Score-5|Score-4|Score-3|Score-2|Score-1|
+------+----+-----+------+------------+-------------+----+--------+-----+---------+---------+---------+-------+------+--------+------+------+----------+-------+---------+--------+---------+-------+-------+-------------+--------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|     0|   0|    0|     0|           0|            0|   0|       0|    0|        0|        0|    

# PySpark vs Pandas: Mutability

- You cannot change a Spark DataFrame in place.
- Every transformation returns a new DataFrame.

<br>

Why?

- Spark needs to keep track of the DAG,
- Spark needs to recompute or recover partitions in case of failure,
- Immutability allows for distribute transformations across machines safely.

Upsides ✅
- Fault tolerance
- Optimization
- Parallelism

In [None]:
filtered_ratings_v2 = filtered_ratings.fillna({'rating': 0.0})

You can change a pandas DataFrame in place.

```
df["rating"] = df["rating"].fillna(0)  
df.dropna(inplace=True)   
```           
This is great for exploratory data analysis on small-to-medium data.

Downsides 🚫
- Harder to track changes
- Not fault-tolerant
- Can lead to bugs in larger codebases if not careful

### Creating new columns

In [None]:
from pyspark.sql.functions import split, explode

animes_df \
  .withColumn("genre_array", split(col("Genres"), ",")) \
  .select(col("MAL_ID"), col("Name"), col("Genres"), col("genre_array")) \
  .show(5, False) # we don't want to truncate our view of the text


# explode will flatten the array
animes_df \
  .withColumn("genre_array", split(col("Genres"), ",")) \
  .withColumn("genre", explode(col("genre_array"))) \
  .select(col("MAL_ID"), col("Name"), col("Genres"), col("genre_array"), col("genre")) \
  .show(30, False)

+------+-------------------------------+---------------------------------------------------+----------------------------------------------------------+
|MAL_ID|Name                           |Genres                                             |genre_array                                               |
+------+-------------------------------+---------------------------------------------------+----------------------------------------------------------+
|1     |Cowboy Bebop                   |Action, Adventure, Comedy, Drama, Sci-Fi, Space    |[Action,  Adventure,  Comedy,  Drama,  Sci-Fi,  Space]    |
|5     |Cowboy Bebop: Tengoku no Tobira|Action, Drama, Mystery, Sci-Fi, Space              |[Action,  Drama,  Mystery,  Sci-Fi,  Space]               |
|6     |Trigun                         |Action, Sci-Fi, Adventure, Comedy, Drama, Shounen  |[Action,  Sci-Fi,  Adventure,  Comedy,  Drama,  Shounen]  |
|7     |Witch Hunter Robin             |Action, Mystery, Police, Supernatural, Drama, Ma

In [None]:
animes_df \
.filter(col("MAL_ID") == 120).show()

+------+-------------+-----+--------------------+-------------+------------------+----+--------+--------------------+-----------+--------------------+----------+-----------+------+---------------+--------------------+------+----------+--------+---------+--------+---------+-------+-------+-------------+--------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|MAL_ID|         Name|Score|              Genres| English name|     Japanese name|Type|Episodes|               Aired|  Premiered|           Producers| Licensors|    Studios|Source|       Duration|              Rating|Ranked|Popularity| Members|Favorites|Watching|Completed|On-Hold|Dropped|Plan to Watch|Score-10|Score-9|Score-8|Score-7|Score-6|Score-5|Score-4|Score-3|Score-2|Score-1|
+------+-------------+-----+--------------------+-------------+------------------+----+--------+--------------------+-----------+--------------------+----------+-----------+------+---------------+--------------------+------+------

## String Cleaning

### Trimming
|  Function |  Description |
|---|---|
|  `trim(columnName)` | Removes both leading and trailing whitespace  |
| `ltrim(columnName)`  | Removes leading whitespace  |
|  `rtrim(columnName)` |  Removes trailing whitespace |

<br>

### Casing
|  Function |  Description |
|---|---|
|  `lower(columnName)` | 	Converts strings to lowercase |
| `upper(columnName)`  | 	Converts strings to uppercase |

<br>

### Regex and Substrings
|  Function |  Description |
|---|---|
|  `regexp_replace(col, "old", "new")` | 	Replacing substrings |
| `regexp_replace(col, "[^a-zA-Z0-9 ]", "")`  | Removing punctuation |
|  `substring(col, start, length)` |  Extracting substrings|

<br>

## Data Handling
### Missing or Corrupt Data
|  Function |  Description |
|---|---|
|  `fillna()` | Replace Nulls and NaNs|
| `dropna()`  | 	Drops any row with missing data |
|  `replace()` or `.na.replace()` |  	Changes "N/A", "unknown", etc. to something else |

<br>

### Dedupe and Standardize
|  Function |  Description |
|---|---|
|  `round(col, 2)` | 	Limit floats to 2 decimal places|
| `dropDuplicates(["col1", "col2"])`  | Drop duplicates |
|  `replace()` + `lower()` + `trim()` |  Standardize values. Fix things like "Male", "male ", " MALE"|
|  `orderBy("col")` | 	Sort rows|
|  `concat_ws(' ', col1, col2)` | 	Combine columns|

# Shuffling 🔄

A process where data gets moved around between executors to satisfy a transformation that requires grouping or combining data from different partitions.

<br>

```filtered_ratings_v2.groupBy("user_id").count()```

Spark has to move all rows with th same user_id to the same partition so it can count them.

<br>

Examples:


|  Function |  Description |
|---|---|
|  `groupBy()` | 	All rows with the same key need to go to the same partition|
| `join()`  | 	Matching keys must be co-located across datasets |
|  `distinct()` |  	Needs to group identical records together|
|  `repartition()` | 	Explicitly reshuffles data into new partitions|
|  `orderBy()` | 	Sorts across all data, requiring a full shuffle|

<br>

Shuffle is a side effect of a wide transformation. Shuffles are performed during execution of an action.

Wide transformations require data to be shuffled across the network, because each output partition depends on multiple input partitions.



## What happens during a shuffle 🕺?
- It sends that data across the cluster to new partitions based on a key (e.g., all "MAL_ID=123" go to one executor).
- Spark re-reads the shuffled data. Each executor receives its assigned range of rows.
- Each executor then applies the transformation locally, and Spark merges the data from all partitions.

Shuffle is expensive! Involves disk I/O, network transfer, and serialization 💸.

This can cause performance bottlenecks and can also lead to OOM errors if data is skewed or poorly partitioned.

## Broadcast Joins

A broadcast join is a special type of join in Spark where a small DataFrame is *broadcasted* to all executors, so that each executor can join it locally with the partitions of the bigger table.

It's a great performance boost for joins with small lookup tables.

<br>

**When to Use It?**
- The smaller dataset fits in memory (usually under 10 MB–100 MB)
- You're doing a join and one side is tiny (e.g., a lookup table)
- You're okay with duplicating the small dataset across all nodes

In [None]:
from pyspark.sql.functions import broadcast

users_who_completed_a_series = user_ratings_df.select(["user_id", "anime_id"]) \
                                              .filter(col("watching_status") == 2) \
                                              .join(broadcast(animes_df), user_ratings_df.anime_id == animes_df.MAL_ID, how="inner")

users_who_completed_a_series.show(10)

+-------+--------+------+--------------------+-----+--------------------+--------------------+--------------------------------------+-----+--------+--------------------+-----------+--------------------+--------------------+----------------+------------+---------------+--------------------+------+----------+---------+---------+--------+---------+-------+-------+-------------+--------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|user_id|anime_id|MAL_ID|                Name|Score|              Genres|        English name|                         Japanese name| Type|Episodes|               Aired|  Premiered|           Producers|           Licensors|         Studios|      Source|       Duration|              Rating|Ranked|Popularity|  Members|Favorites|Watching|Completed|On-Hold|Dropped|Plan to Watch|Score-10|Score-9|Score-8|Score-7|Score-6|Score-5|Score-4|Score-3|Score-2|Score-1|
+-------+--------+------+--------------------+-----+--------------------+---------

In [None]:
from pyspark.sql.functions import countDistinct, avg

users_who_completed_a_series.groupBy("user_id").agg(
                              count("*").alias("Total_Entries"),
                              countDistinct("anime_id").alias("Unique_Anime_Watched")
                          ).show(10)


+-------+-------------+--------------------+
|user_id|Total_Entries|Unique_Anime_Watched|
+-------+-------------+--------------------+
|    148|           92|                  92|
|    496|          156|                 156|
|    833|           77|                  77|
|   1342|          107|                 107|
|   2122|           64|                  64|
|   2142|          325|                 325|
|   2866|          486|                 486|
|   3749|          100|                 100|
|   6357|          318|                 318|
|   6466|          675|                 675|
+-------+-------------+--------------------+
only showing top 10 rows



In [None]:
animes_df.select(["MAL_ID", "Name", "Favorites"]) \
         .orderBy(col("Favorites").desc()) \
         .show(10, False)

+------+------------------------------------+---------+
|MAL_ID|Name                                |Favorites|
+------+------------------------------------+---------+
|5114  |Fullmetal Alchemist: Brotherhood    |183914   |
|9253  |Steins;Gate                         |148452   |
|11061 |Hunter x Hunter (2011)              |147274   |
|1535  |Death Note                          |145201   |
|16498 |Shingeki no Kyojin                  |129844   |
|21    |One Piece                           |126645   |
|31630 |"Gyakuten Saiban: Sono ""Shinjitsu""|107475   |
|1575  |Code Geass: Hangyaku no Lelouch     |90487    |
|1735  |Naruto: Shippuuden                  |84651    |
|30    |Neon Genesis Evangelion             |71308    |
+------+------------------------------------+---------+
only showing top 10 rows



# Spark SQL 💻

Spark comes with a full SQL engine.

<br>

We'll recreate this logic using Spark SQL 👇🏾
```
from pyspark.sql.functions import col, isnan, when, count

anime_df.select([
    count(when(isnan(c) | col(c).isNull(), c)).alias(c)
    for c in anime_df.columns
]).show()
```

In [None]:
# register DataFrame as a temporary view
animes_df.createOrReplaceTempView("anime")

query = """SELECT
    SUM(CASE WHEN Rating IS NULL OR isnan(Rating) THEN 1 ELSE 0 END) AS rating,
    SUM(CASE WHEN Name IS NULL OR isnan(Name) THEN 1 ELSE 0 END) AS title,
    SUM(CASE WHEN MAL_ID IS NULL OR isnan(MAL_ID) THEN 1 ELSE 0 END) AS anime_id,
    SUM(CASE WHEN Members IS NULL OR isnan(Members) THEN 1 ELSE 0 END) AS members
FROM anime
"""

spark.sql(query).show()

+------+-----+--------+-------+
|rating|title|anime_id|members|
+------+-----+--------+-------+
|     0|    0|       0|      0|
+------+-----+--------+-------+



# Catalyst Optimizer

This is Spark's engine for writing optimized SQL queries.

<br>

When you run something like

```
df.filter(col("rating") > 8).groupBy("genre").avg("rating")
```

Catalyst steps in and performs four major phases.

<br>

1. Catalyst builds an unresolved logical plan and turns it into a resolved logical plan once all columns are validated.
2. Spark rewrites your query logically to make it faster.
3. Spark considers multiple ways to physically run the query
  * should it use a sort merge join or a broadcast join?
  * should it hash aggregate or sort aggregate?

4. Catalyst generates JVM bytecode to execute the plan, so it runs fast.

<br>

Without Catalyst, you'd have to:
- Manually optimize every query
- Hand-code joins and filters in the right order
- Figure out the best execution strategy based on cluster size and data skew

Catalyst saves you from that.

# ⚙️ Optimization Hacks
## Caching
Spark doesn't automatically cache a dataframe for you. Spark will recompute the entire DAG everytime an action is triggered.

Use caching:
* When the same DataFrame is used multiple times
* After an expensive transformation (e.g., filtering or groupBy)

Otherwise, you'll waste time and compute power redoing the same work.

## Increasing Partitions
More Partitions Can Help When:
- You have lots of data and many CPU cores to parallelize work
- You're doing CPU-heavy tasks (joins, groupBy, transformations)
- You want to avoid skew (e.g., one giant partition that takes forever)

Rule of Thumb: You want 2–4x more partitions than total cores to keep everything busy.

If you have more partitions than cores, Spark can:
- Keep cores busy all the time, even if some tasks finish earlier than others
- Recover faster from slow tasks or stragglers (not all tasks are equal!)
- Support better fault tolerance (if a task fails, only that small piece needs to rerun)

# ⛓ Code Patterns

### 1. Chain Transformations
This allows for cleaner DAGs and easier readability.

✅
```
df_cleaned = (
    df.filter(col("rating").isNotNull())
      .withColumn("genre", trim(lower(col("genre"))))
      .dropDuplicates(["id"])
)
```
❌
```
df_filtered = df.filter(col("rating").isNotNull())
df_lowered = df_filtered.withColumn("genre", lower(col("genre")))
df_trimmed = df_lowered.withColumn("genre", trim(col("genre")))
df_cleaned = df_trimmed.dropDuplicates(["id"])
```

### 2. Use Built-in Spark Functions
Built-ins are faster and Catalyst-optimized.

✅
```
from pyspark.sql.functions import regexp_replace

df = df.withColumn("clean_title", regexp_replace("title", "[^a-zA-Z0-9 ]", ""))

```
❌
```
def remove_special(s):
    return re.sub("[^a-zA-Z0-9 ]", "", s)

remove_special_udf = udf(remove_special)
df = df.withColumn("clean_title", remove_special_udf("title"))

```

### 3. Select Only Columns Needed Early
This saves memory and I/O.

✅
```
df = spark.read.parquet("data.parquet").select("id", "genre", "rating")

```
❌
```
df = spark.read.parquet("data.parquet")  # Loads everything
df = df.select("id", "genre", "rating")  # Unnecessary overhead
```

### 4. Use Vectorized pandas UDFs
Pandas UDFs are vectorized and perform better.

✅
```
from pyspark.sql.functions import pandas_udf
import pandas as pd

@pandas_udf("double")
def normalize(s: pd.Series) -> pd.Series:
    return (s - s.mean()) / s.std()

df = df.withColumn("rating_norm", normalize("rating"))
```
❌
```
def normalize(x):
    return (x - mean) / std

norm_udf = udf(normalize)
df = df.withColumn("rating_norm", norm_udf("rating"))
```

### 5. Handling Data Skew with Salting
This provides better workload balance and reduces skew.

✅
```
df_salted = df.withColumn("salted_genre", concat(col("genre"), lit("_"), floor(rand()*10)))
df_repartitioned = df_salted.repartition("salted_genre")
```
❌
```
df.repartition("genre")  # skew still remains
```

# Back to 🐼s

There are a few reasons why you'd want to go back to Pandas.

- Post-cleaning
  - Use Spark to clean or filter down a huge dataset
- Conduct analysis on a sample
- Visualization
  - Spark isn't great for matplotlib, seaborn, or plotly
  - Also, many visual tools in Python are looking for a Pandas DataFrame
- Modeling using modern ML or Deep Learning libraries
  - Spark can be used to prep features
  - You'd then switch to Pandas to train a model

<br>

💡 Best practice: Always filter or sample before converting to pandas

In [None]:
users_pandas_df = users_who_completed_a_series.limit(100).toPandas()
users_pandas_df.head()

Unnamed: 0,user_id,anime_id,MAL_ID,Name,Score,Genres,English name,Japanese name,Type,Episodes,...,Score-10,Score-9,Score-8,Score-7,Score-6,Score-5,Score-4,Score-3,Score-2,Score-1
0,0,68,68,Black Cat (TV),7.38,"Sci-Fi, Adventure, Comedy, Super Power, Shounen",Black Cat,ブラックキャット,TV,23,...,8654,15253,29392,34019,16081,7534,2521,863,395,234
1,0,1689,1689,Byousoku 5 Centimeter,7.73,"Drama, Romance, Slice of Life",5 Centimeters Per Second,秒速５センチメートル,Movie,3,...,65007,80388,107226,86657,43552,21319,10108,3821,2081,1835
2,0,2913,2913,Daisougen no Chiisana Tenshi: Bush Baby,7.01,"Adventure, Drama","Bush Baby, Little Angel of the Great Plains",大草原の小さな天使　ブッシュベイビー,TV,40,...,57,54,91,153,93,40,17,4,5,6
3,0,1250,1250,Erementar Gerad,7.3,"Adventure, Comedy, Super Power, Magic, Romance...",Elemental Gelade,エレメンタルジェレイド,TV,26,...,3279,4969,9868,11831,6331,3267,1125,386,158,103
4,0,356,356,Fate/stay night,7.34,"Action, Supernatural, Magic, Romance, Fantasy",Fate/stay night,Fate/stay night,TV,24,...,35378,57284,101538,107818,55337,26103,13081,5032,2350,1693


# FAQs 🤔
1. Why is Spark slower than pandas on small datasets?
> Spark has startup overhead because it launches a JVM, sets up executors, and builds a DAG. For tiny datasets, that overhead outweighs the benefits of distributed computing. Pandas is faster for small data because it's lightweight and runs natively in Python. Spark was built for large datasets.

2. What is a partition, and how big is it?
> A partition is a chunk of your DataFrame that's processed in parallel by a task on an executor. Spark decides partitioning based on file size, source format, and your config (e.g. spark.sql.shuffle.partitions). You can manually repartition if needed: `df = df.repartition(10)  # Creates 10 partitions`

3. How does Spark handle memory? What if my data is too big?
> Spark caches data in memory only when you tell it to using `.cache()` or `.persist()`. If memory isn't enough, Spark spills to disk, making it slower but stable. Unlike pandas, Spark won't crash just because your data is too big for RAM.

4. What if one executor crashes?
> Spark tracks a DAG of all transformations. If a task fails, it recomputes just that partition from upstream steps — not the whole dataset.
