# Preprocess Data

Take all the new JSON files and preprocess them:
- break apart logs to get pokemon used, moves, items, tera
- label winner
- rating change

At the end, the dataframe will be written to parquet files.

We also track processed battle ids, and filter them out early.

### Checking Versions

First, let's check which versions of Python and Spark we're using:

In [1]:
# Print Python and Spark versions
import sys
import time
from pyspark.sql import SparkSession

start_time = time.time()
print(f"Script started at: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))}")

# Create a new SparkSession with more robust timeout settings
spark = SparkSession.builder \
    .appName("Showdown-Replay-Exploration") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.memory.offHeap.enabled", "true") \
    .config("spark.memory.offHeap.size", "4g") \
    .config("spark.driver.maxResultSize", "2g") \
    .config("spark.network.timeout", "3600s") \
    .config("spark.executor.heartbeatInterval", "600s") \
    .config("spark.sql.mapKeyDedupPolicy", "LAST_WIN") \
    .config("spark.driver.extraJavaOptions", "-XX:+UseG1GC") \
    .getOrCreate()


print(f"Python version: {sys.version}")
print(f"Spark version: {spark.version}")

Script started at: 2025-03-24 03:13:03
Python version: 3.11.6 | packaged by conda-forge | (main, Oct  3 2023, 10:40:35) [GCC 12.3.0]
Spark version: 3.5.0


In [3]:
# Save this as consolidate.py and run it once before running your notebook
import os
import json
import pandas as pd
import time

# Define paths
input_dir = "../../data/replays/gen9vgc2025regg/raw"
output_dir = "../../data/replays/gen9vgc2025regg/consolidated"

# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)

start_time = time.time()
print(f"Starting consolidation at: {time.strftime('%Y-%m-%d %H:%M:%S')}")

# Process data by day
day_dirs = [d for d in os.listdir(input_dir) if os.path.isdir(os.path.join(input_dir, d))]
print(f"Found {len(day_dirs)} day directories to process")

total_files = 0

for day_dir in day_dirs:
    day_path = os.path.join(input_dir, day_dir)
    print(f"Processing {day_dir}...")
    
    # Get all JSON files in the directory
    json_files = [f for f in os.listdir(day_path) if f.endswith('.json')]
    if not json_files:
        continue
        
    print(f"  Found {len(json_files)} JSON files")
    
    # Gather all JSON data
    all_data = []
    for file_name in json_files:
        file_path = os.path.join(day_path, file_name)
        try:
            with open(file_path, 'r') as f:
                all_data.append(json.load(f))
        except:
            pass
    
    if all_data:
        # Convert to pandas DataFrame
        df = pd.DataFrame(all_data)
        
        # Save as parquet file
        output_file = os.path.join(output_dir, f"{day_dir}.parquet")
        df.to_parquet(output_file)
        print(f"  Saved {len(all_data)} records to {output_file}")
        
        total_files += len(json_files)

print(f"Consolidation complete! Processed {total_files} files")
print(f"Total time: {time.time() - start_time:.2f} seconds")

Starting consolidation at: 2025-03-24 00:54:20
Found 103 day directories to process
Processing 2024-12-01...
  Found 250 JSON files
  Saved 250 records to ../../data/replays/gen9vgc2025regg/consolidated/2024-12-01.parquet
Processing 2024-12-02...
  Found 689 JSON files
  Saved 689 records to ../../data/replays/gen9vgc2025regg/consolidated/2024-12-02.parquet
Processing 2024-12-03...
  Found 526 JSON files
  Saved 526 records to ../../data/replays/gen9vgc2025regg/consolidated/2024-12-03.parquet
Processing 2024-12-04...
  Found 531 JSON files
  Saved 531 records to ../../data/replays/gen9vgc2025regg/consolidated/2024-12-04.parquet
Processing 2024-12-05...
  Found 405 JSON files
  Saved 405 records to ../../data/replays/gen9vgc2025regg/consolidated/2024-12-05.parquet
Processing 2024-12-06...
  Found 419 JSON files
  Saved 419 records to ../../data/replays/gen9vgc2025regg/consolidated/2024-12-06.parquet
Processing 2024-12-07...
  Found 352 JSON files
  Saved 352 records to ../../data/replay

### Step 2: Load in consolidated parquet and filter out already processed rows

We will load only the Regulation G Json files.

In [2]:
# Use spark to read files then partition after to speed up processing
import os

# Path to consolidated parquet files
consolidated_path = "../../data/replays/gen9vgc2025regg/consolidated"

# Read all parquet files from the consolidated directory
logs_df = spark.read.parquet(consolidated_path)

processed_ids_path = "../../data/replays/gen9vgc2025regg/pre_processed/replay_ids/*"

# Skip the filtering step on first run
try:
    # Try to read processed IDs
    processed_ids = spark.read.parquet(processed_ids_path)
    logs_df = logs_df.join(processed_ids, "id", "left_anti")
except:
    # If it fails, just create the directory and continue
    os.makedirs(os.path.dirname(processed_ids_path.replace("*", "")), exist_ok=True)
    print("No previously processed IDs found. Processing all replays.")

# Repartition
logs_df = logs_df.repartition(8)

No previously processed IDs found. Processing all replays.


### Step 3: Extract Relevant Information

Player Information

In [3]:
from pyspark.sql.functions import regexp_extract, transform, expr, regexp_replace, col, explode, split, lit
from pyspark.sql.functions import collect_list, struct, map_from_entries, collect_set, explode_outer
from pyspark.sql.types import StructType, StructField, StringType, ArrayType


# Extract player names and ratings
def extract_player_info(df):
    df = df.withColumn("player1", logs_df["players"].getItem(0))
    df = df.withColumn("player2", logs_df["players"].getItem(1))
    
    # For player1_rating_before and player2_rating_before
    df = df.withColumn("player1_rating_before", regexp_extract("log", r"\{\}'s rating: (\d+)", 1))
    df = df.withColumn("player2_rating_before", regexp_extract("log", r"\{\}'s rating: (\d+)", 1))
    
    # For player1_rating_after and player2_rating_after
    df = df.withColumn("player1_rating_after", regexp_extract("log", r"\{\}'s rating: \d+ → (\d+)", 1))
    df = df.withColumn("player2_rating_after", regexp_extract("log", r"\{\}'s rating: \d+ → (\d+)", 1))

df_step_1 = extract_player_info(logs_df)

### Open Team Sheets

Players can choose to allow open team sheets. This will give a preview of the entire team, items, moves, and tera types. We will extract these team sheets when available, and then add them back to the original data frames. We can use open team sheets to compare the rest of our extraction work for accuracy/extraction issues

In [4]:
def analyze_team_sheets(df):
    from pyspark.sql.functions import regexp_extract, transform, expr, regexp_replace, col, explode, split, lit
    from pyspark.sql.functions import collect_list, struct, map_from_entries, when
    
    # Check if team sheets were accepted (presence of showteam indicates acceptance)
    df_with_acceptance = df.withColumn(
        "team_sheets_accepted", 
        when(col("log").contains("|showteam|p"), lit(True)).otherwise(lit(False))
    )
    
    # Extract showteam information when accepted
    df_with_team_info = df_with_acceptance.withColumn(
        "p1_team_details", 
        when(col("log").contains("|showteam|p"), 
             regexp_extract(col("log"), r"\|showteam\|p1\|(.*?)(?=\n|\|showteam\|p2\|)", 1))
        .otherwise(lit(None))
    ).withColumn(
        "p2_team_details", 
        when(col("log").contains("|showteam|p2|"), 
             regexp_extract(col("log"), r"\|showteam\|p2\|(.*?)(?=\n|\|j\||\|inactive\|)", 1))
        .otherwise(lit(None))
    )
    
    # Process player 1 team details
    df_p1 = df_with_team_info.filter(col("p1_team_details").isNotNull())
    # Split the team details into individual Pokémon entries
    df_p1 = df_p1.withColumn("p1_team_entries", split(col("p1_team_details"), r"\]"))
    
    # Explode the entries to process each Pokémon
    df_p1_exploded = df_p1.select("id", explode("p1_team_entries").alias("p1_entry"))
    
    # Extract details for each Pokémon
    df_p1_pokemon = df_p1_exploded.withColumn(
        "pokemon", regexp_extract(col("p1_entry"), r"^(.*?)\|\|", 1)
    ).withColumn(
        "item", regexp_extract(col("p1_entry"), r"\|\|(.*?)\|", 1)
    ).withColumn(
        "ability", regexp_extract(col("p1_entry"), r"\|([^|]+)\|[^|]+,[^|]+,", 1)
    ).withColumn(
        "moves", regexp_extract(col("p1_entry"), r"\|([^|]+,[^|]+,[^|]+,[^|]+)\|", 1)
    ).withColumn(
        "tera_type", regexp_extract(col("p1_entry"), r",,,,,([^,\]]+)", 1)
    )
    
    # Filter out empty entries
    df_p1_pokemon = df_p1_pokemon.filter(col("pokemon").isNotNull() & (col("pokemon") != ""))
    
    # Create a map of Pokémon to their details
    p1_pokemon_map = df_p1_pokemon.groupBy("id").agg(
        map_from_entries(
            collect_list(
                struct(
                    col("pokemon"), 
                    struct(
                        col("item"), 
                        col("ability"), 
                        col("moves"), 
                        col("tera_type")
                    )
                )
            )
        ).alias("p1_pokemon_data_open_team")
    )
    
    # Join back to the main dataframe
    df_with_team_info = df_with_team_info.join(p1_pokemon_map, "id", "left")

    
    # Process player 2 team details
    df_p2 = df_with_team_info.filter(col("p2_team_details").isNotNull())

    # Split the team details into individual Pokémon entries
    df_p2 = df_p2.withColumn("p2_team_entries", split(col("p2_team_details"), r"\]"))
    
    # Explode the entries to process each Pokémon
    df_p2_exploded = df_p2.select("id", explode("p2_team_entries").alias("p2_entry"))
    
    # Extract details for each Pokémon
    df_p2_pokemon = df_p2_exploded.withColumn(
        "pokemon", regexp_extract(col("p2_entry"), r"^(.*?)\|\|", 1)
    ).withColumn(
        "item", regexp_extract(col("p2_entry"), r"\|\|(.*?)\|", 1)
    ).withColumn(
        "ability", regexp_extract(col("p2_entry"), r"\|([^|]+)\|[^|]+,[^|]+,", 1)
    ).withColumn(
        "moves", regexp_extract(col("p2_entry"), r"\|([^|]+,[^|]+,[^|]+,[^|]+)\|", 1)
    ).withColumn(
        "tera_type", regexp_extract(col("p2_entry"), r",,,,,([^,\]]+)", 1)
    )
    
    # Filter out empty entries
    df_p2_pokemon = df_p2_pokemon.filter(col("pokemon").isNotNull() & (col("pokemon") != ""))
    
    # Create a map of Pokémon to their details
    p2_pokemon_map = df_p2_pokemon.groupBy("id").agg(
        map_from_entries(
            collect_list(
                struct(
                    col("pokemon"), 
                    struct(
                        col("item"), 
                        col("ability"), 
                        col("moves"), 
                        col("tera_type")
                    )
                )
            )
        ).alias("p2_pokemon_data_open_team")
    )
    
    # Join back to the main dataframe
    df_with_team_info = df_with_team_info.join(p2_pokemon_map, "id", "left")

    # Drop the temporary columns before returning
    final_df = df_with_team_info.drop("p1_team_details", "p2_team_details")
    
    
    return final_df

# Call the function
df_step_2 = analyze_team_sheets(logs_df)

# Call the function
# logs_df2 = analyze_team_sheets(logs_df)

Extract logs for Pokémon, Moves, and Items- we will process these logs later.

In [5]:
# Extract data we will use several times later on
def extract_raw_data(df):
    df = df.withColumn("player1_pokemon_array", 
                                 expr("regexp_extract_all(log, '\\\\|poke\\\\|p1\\\\|([^\\\\|]+)', 1)"))
    df = df.withColumn("player2_pokemon_array", 
                                 expr("regexp_extract_all(log, '\\\\|poke\\\\|p2\\\\|([^\\\\|]+)', 1)"))
    
    # Then clean each array element to keep only the Pokémon name
    df = df.withColumn("player1_pokemon_array", 
                                 transform("player1_pokemon_array", lambda x: regexp_replace(x, ", L\\d+.*", "")))
    df = df.withColumn("player2_pokemon_array", 
                                 transform("player2_pokemon_array", lambda x: regexp_replace(x, ", L\\d+.*", "")))
    
    # Extract full team preview for both players
    df = df.withColumn("player1_full_team_raw", 
                               expr("regexp_extract_all(log, '\\\\|poke\\\\|p1\\\\|([^,]+)', 1)"))
    df = df.withColumn("player2_full_team_raw", 
                               expr("regexp_extract_all(log, '\\\\|poke\\\\|p2\\\\|([^,]+)', 1)"))
    
    # Clean the team arrays to handle any special forms (like Urshifu-*)
    df = df.withColumn("player1_full_team", 
                               transform("player1_full_team_raw", lambda x: regexp_replace(x, "\\-\\*", "")))
    df = df.withColumn("player2_full_team", 
                               transform("player2_full_team_raw", lambda x: regexp_replace(x, "\\-\\*", "")))
    
    
    
    # Extract all moves used by each player's Pokémon (will parse this later)
    df = df.withColumn("player1_moves_raw", 
                                 expr("regexp_extract_all(log, '\\\\|move\\\\|p1[ab]: ([^\\\\|]+)\\\\|([^\\\\|]+)', 0)"))
    df = df.withColumn("player2_moves_raw", 
                                 expr("regexp_extract_all(log, '\\\\|move\\\\|p2[ab]: ([^\\\\|]+)\\\\|([^\\\\|]+)', 0)"))
    
    return df


df_step_3 = extract_raw_data(df_step_2)

# pandas_df_preview2 = logs_df_with_teamsheet_raw.limit(15).toPandas()

# pandas_df_preview2


Weather, Forfeits, Status Effects, Winner

In [6]:
# Extract weather, status, forfeit, winner
def extract_weather_status_forfeit_winner_changes(df):
    df = df.withColumn("weather", regexp_extract("log", r"\|weather\|([^\|]+)", 1))
    
    # Extract forfeit information
    df = df.withColumn("forfeit", regexp_extract("log", r"\|message\|([^\|]+ forfeited)", 1))
    
    # Extract status effects like confusion, sleep, etc.
    df = df.withColumn("status_effects", regexp_extract("log", r"\|start\|p1a: [^\|]+\|([^\|]+)", 1))

    df = df.withColumn("winner", regexp_extract("log", r"\|win\|([^\|]+)", 1))

    return df

df_step_4 = extract_weather_status_forfeit_winner_changes(df_step_3)



### Extract Pokemon Moves

Previously we built a column with all the data for a pokemon and its moves.

Now we will create a new df for the moves.

1. Explode the moves ie create a new row for each move
2. Extract the pokemon for each move, and its moves
3. Group each pokemon by the battle it took place in (because we seperated the dfs into a player 1 and player 2 we don't need to worry about 2 players using the same pokemon accidentally being joined together)
4. Convert to a map

In [7]:
from pyspark.sql.functions import regexp_extract, transform, expr, regexp_replace, col, explode, split, lit
from pyspark.sql.functions import collect_list, struct, map_from_entries, collect_set

def extract_pokemon_moves(df):
    # Explode the arrays to work with individual move patterns
    p1_moves_df = df.select("id", explode("player1_moves_raw").alias("move_pattern"))
    p2_moves_df = df.select("id", explode("player2_moves_raw").alias("move_pattern"))
    
    # Extract Pokémon and move from the patterns using regexp_extract
    p1_moves_df = p1_moves_df.withColumn("pokemon", 
                                         regexp_extract("move_pattern", "\\|move\\|p1[ab]: ([^\\|]+)\\|", 1))
    p1_moves_df = p1_moves_df.withColumn("move", 
                                         regexp_extract("move_pattern", "\\|move\\|p1[ab]: [^\\|]+\\|([^\\|]+)", 1))
    
    p2_moves_df = p2_moves_df.withColumn("pokemon", 
                                         regexp_extract("move_pattern", "\\|move\\|p2[ab]: ([^\\|]+)\\|", 1))
    p2_moves_df = p2_moves_df.withColumn("move", 
                                         regexp_extract("move_pattern", "\\|move\\|p2[ab]: [^\\|]+\\|([^\\|]+)", 1))
    
    # Group by battle ID and Pokémon to get all moves used by each Pokémon. 
    # We use set to remove duplicates (ie pokemon using same move more than once per match)
    p1_moves_by_pokemon = p1_moves_df.groupBy("id", "pokemon").agg(collect_set("move").alias("moves"))
    p2_moves_by_pokemon = p2_moves_df.groupBy("id", "pokemon").agg(collect_set("move").alias("moves"))
    
    # Convert to a map structure for easier joining
    p1_moves_map = p1_moves_by_pokemon.groupBy("id").agg(
        map_from_entries(collect_list(struct("pokemon", "moves"))).alias("player1_pokemon_moves")
    )
    p2_moves_map = p2_moves_by_pokemon.groupBy("id").agg(
        map_from_entries(collect_list(struct("pokemon", "moves"))).alias("player2_pokemon_moves")
    )
    
    # Join back to the main dataframe
    df = df.join(p1_moves_map, "id", "left")
    df = df.join(p2_moves_map, "id", "left")

    # drop moves raw since we don't need it anymore
    df = df.drop('player1_moves_raw', 'player2_moves_raw')

    return df


df_step_5 = extract_pokemon_moves(df_step_4)

# pandas_df_preview4 = df_semi_final.limit(15).toPandas()

# pandas_df_preview4


#### Cache the logs df so we don't have to do these expensive computes

In [8]:
# Cache the DataFrame after the expensive move mapping operations
# df_step_5 = df_step_5.cache()

### Process Items

Each pokemon holds an item. This item is not revealed unless some game event leads to it being revealed.

To solve this, we parse all item logs to match items to a pokemon. This could be an item knocked off mid turn, or an end turn item useage like berry consumption.

In [9]:

from pyspark.sql.functions import regexp_extract, transform, expr, regexp_replace, col, explode, split, lit
from pyspark.sql.functions import collect_list, struct, map_from_entries, collect_set, explode_outer

# Process both enditem events and item events

def extract_items(df):
    # First, extract all item-related patterns from the logs
    df = df.withColumn("player1_item_patterns", 
                               expr("regexp_extract_all(log, '\\\\|-item\\\\|p1[ab]: ([^\\\\|]+)\\\\|([^\\\\|]+)', 0)"))
    df = df.withColumn("player2_item_patterns", 
                               expr("regexp_extract_all(log, '\\\\|-item\\\\|p2[ab]: ([^\\\\|]+)\\\\|([^\\\\|]+)', 0)"))
    
    # Also capture enditem events which show when items are consumed/lost
    df = df.withColumn("player1_enditem_patterns", 
                               expr("regexp_extract_all(log, '\\\\|-enditem\\\\|p1[ab]: ([^\\\\|]+)\\\\|([^\\\\|]+)', 0)"))
    df = df.withColumn("player2_enditem_patterns", 
                               expr("regexp_extract_all(log, '\\\\|-enditem\\\\|p2[ab]: ([^\\\\|]+)\\\\|([^\\\\|]+)', 0)"))
    
    # Process item events for player 1
    p1_item_df = df.select("id", explode_outer("player1_item_patterns").alias("item_pattern"))
    p1_item_df = p1_item_df.withColumn("pokemon", 
                                     regexp_extract("item_pattern", "\\|-item\\|p1[ab]: ([^\\|]+)\\|", 1))
    p1_item_df = p1_item_df.withColumn("item", 
                                     regexp_extract("item_pattern", "\\|-item\\|p1[ab]: [^\\|]+\\|([^\\|]+)", 1))
    
    # Process enditem events for player 1
    p1_enditem_df = df.select("id", explode_outer("player1_enditem_patterns").alias("enditem_pattern"))
    p1_enditem_df = p1_enditem_df.withColumn("pokemon", 
                                           regexp_extract("enditem_pattern", "\\|-enditem\\|p1[ab]: ([^\\|]+)\\|", 1))
    p1_enditem_df = p1_enditem_df.withColumn("item", 
                                           regexp_extract("enditem_pattern", "\\|-enditem\\|p1[ab]: [^\\|]+\\|([^\\|]+)", 1))
    
    # Process item events for player 2
    p2_item_df = df.select("id", explode_outer("player2_item_patterns").alias("item_pattern"))
    p2_item_df = p2_item_df.withColumn("pokemon", 
                                     regexp_extract("item_pattern", "\\|-item\\|p2[ab]: ([^\\|]+)\\|", 1))
    p2_item_df = p2_item_df.withColumn("item", 
                                     regexp_extract("item_pattern", "\\|-item\\|p2[ab]: [^\\|]+\\|([^\\|]+)", 1))
    
    # Process enditem events for player 2
    p2_enditem_df = df.select("id", explode_outer("player2_enditem_patterns").alias("enditem_pattern"))
    p2_enditem_df = p2_enditem_df.withColumn("pokemon", 
                                           regexp_extract("enditem_pattern", "\\|-enditem\\|p2[ab]: ([^\\|]+)\\|", 1))
    p2_enditem_df = p2_enditem_df.withColumn("item", 
                                           regexp_extract("enditem_pattern", "\\|-enditem\\|p2[ab]: [^\\|]+\\|([^\\|]+)", 1))
    
    # Union the item and enditem dataframes for each player
    p1_all_items_df = p1_item_df.union(p1_enditem_df)
    p2_all_items_df = p2_item_df.union(p2_enditem_df)
    
    # Group by battle ID and Pokémon to get items used by each Pokémon
    p1_items_by_pokemon = p1_all_items_df.filter(col("pokemon").isNotNull() & col("item").isNotNull()) \
                                        .groupBy("id", "pokemon").agg(collect_set("item").alias("items"))
    p2_items_by_pokemon = p2_all_items_df.filter(col("pokemon").isNotNull() & col("item").isNotNull()) \
                                        .groupBy("id", "pokemon").agg(collect_set("item").alias("items"))
    
    # Convert to a map structure for easier joining
    p1_items_map = p1_items_by_pokemon.groupBy("id").agg(
        map_from_entries(collect_list(struct("pokemon", "items"))).alias("player1_pokemon_items")
    )
    p2_items_map = p2_items_by_pokemon.groupBy("id").agg(
        map_from_entries(collect_list(struct("pokemon", "items"))).alias("player2_pokemon_items")
    )
    
    # Join back to the main dataframe
    df = df.join(p1_items_map, "id", "left")
    df = df.join(p2_items_map, "id", "left")

    df = df.drop('player1_item_patterns', 'player2_item_patterns', 'player1_enditem_patterns', 'player2_enditem_patterns')

    return df


df_step_6 = extract_items(df_step_5)

# pandas_df_preview5 = df_semi_semi_final.limit(15).toPandas()

# pandas_df_preview5


### Extracting Tera Information

In [10]:
from pyspark.sql.functions import regexp_extract, transform, expr, regexp_replace, col, explode, split, lit
from pyspark.sql.functions import collect_list, struct, map_from_entries, collect_set, explode_outer

# Process terastallize events for both players
# First, extract all terastallize patterns from the logs
def extract_tera(df):
    df = df.withColumn("player1_tera_patterns", 
                               expr("regexp_extract_all(log, '\\\\|-terastallize\\\\|p1[ab]: ([^\\\\|]+)\\\\|([^\\\\|]+)', 0)"))
    df = df.withColumn("player2_tera_patterns", 
                               expr("regexp_extract_all(log, '\\\\|-terastallize\\\\|p2[ab]: ([^\\\\|]+)\\\\|([^\\\\|]+)', 0)"))
    
    # Process terastallize events for player 1
    p1_tera_df = df.select("id", explode_outer("player1_tera_patterns").alias("tera_pattern"))
    p1_tera_df = p1_tera_df.withColumn("pokemon", 
                                     regexp_extract("tera_pattern", "\\|-terastallize\\|p1[ab]: ([^\\|]+)\\|", 1))
    p1_tera_df = p1_tera_df.withColumn("tera_type", 
                                     regexp_extract("tera_pattern", "\\|-terastallize\\|p1[ab]: [^\\|]+\\|([^\\|\\n]+)", 1))
    
    # Process terastallize events for player 2
    p2_tera_df = df.select("id", explode_outer("player2_tera_patterns").alias("tera_pattern"))
    p2_tera_df = p2_tera_df.withColumn("pokemon", 
                                     regexp_extract("tera_pattern", "\\|-terastallize\\|p2[ab]: ([^\\|]+)\\|", 1))
    p2_tera_df = p2_tera_df.withColumn("tera_type", 
                                     regexp_extract("tera_pattern", "\\|-terastallize\\|p2[ab]: [^\\|]+\\|([^\\|\\n]+)", 1))
    
    # Filter out null values and create maps
    p1_tera_map = p1_tera_df.filter(col("pokemon").isNotNull() & col("tera_type").isNotNull()) \
                           .groupBy("id") \
                           .agg(map_from_entries(collect_list(struct("pokemon", "tera_type"))).alias("player1_pokemon_tera"))
    
    p2_tera_map = p2_tera_df.filter(col("pokemon").isNotNull() & col("tera_type").isNotNull()) \
                           .groupBy("id") \
                           .agg(map_from_entries(collect_list(struct("pokemon", "tera_type"))).alias("player2_pokemon_tera"))
    
    # Join back to the main dataframe
    df = df.join(p1_tera_map, "id", "left")
    df = df.join(p2_tera_map, "id", "left")

    # drop raw patterns
    df = df.drop('player1_tera_patterns', 'player2_tera_patterns')

    return df


df_final = extract_tera(df_step_6)

# pandas_df_preview6 = df_final.limit(15).toPandas()

# pandas_df_preview6

In [11]:
# Convert to Pandas DataFrame (only do this for small result sets!)

# uncomment below to see preview
# pandas_df = logs_df.limit(5).toPandas()

# pandas_df

### Save to parquet

We will end this exploration and save our results to parquet so we can reuse (and also put other functions so we can copy and paste them for later)

In [12]:
from datetime import datetime

# Create a timestamp for the processed data that includes time
process_datetime = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

# Define the output paths with the more specific timestamp
processed_data_path = f"../../data/replays/gen9vgc2025regg/pre_processed/replays/{process_datetime}"
processed_ids_path = f"../../data/replays/gen9vgc2025regg/pre_processed/replay_ids/{process_datetime}"

df_final.cache()

# Drop the log column to reduce storage size
df_final_no_logs = df_final.drop("log")

# Save the full processed dataframe as parquet
df_final_no_logs.write.mode("overwrite").parquet(processed_data_path)
print(f"Saved processed data to {processed_data_path}")

# Save just the IDs of processed replays for future reference
id_df = logs_df.select("id")
id_df.write.mode("overwrite").parquet(processed_ids_path)
print(f"Saved processed IDs to {processed_ids_path}")

replay_count = logs_df.count()
print(f"\nProcessed {replay_count} replays in this run")

end_time = time.time()
execution_time = end_time - start_time
print(f"Script completed at: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))}")
print(f"Total execution time: {execution_time:.2f} seconds ({execution_time/60:.2f} minutes)")

Saved processed data to ../../data/replays/gen9vgc2025regg/pre_processed/replays/2025-03-24_03-13-20
Saved processed IDs to ../../data/replays/gen9vgc2025regg/pre_processed/replay_ids/2025-03-24_03-13-20

Processed 235522 replays in this run
Script completed at: 2025-03-24 03:15:44
Total execution time: 160.99 seconds (2.68 minutes)


In [13]:
# WORKAROUND FOR PY4J ERROR: Use pandas to save data instead of Spark
# This approach collects data to the driver node and writes it using pandas
# Note: This will only work if your dataset fits in driver memory


# try:
#     # Create output directories if they don't exist
#     os.makedirs(os.path.dirname(processed_data_path), exist_ok=True)
#     os.makedirs(os.path.dirname(processed_ids_path), exist_ok=True)
    
#     # Convert to pandas dataframe and save using pandas
#     print("Converting to pandas dataframe and saving using pandas...")
    
#     # Limit the records to avoid memory issues (adjust based on your data size)
#     pandas_df = df_final_no_logs.limit(50000).toPandas()
#     pandas_df.to_parquet(f"{processed_data_path}_pandas.parquet")
    
#     # Also save IDs
#     id_pd_df = logs_df.select("id").limit(50000).toPandas()
#     id_pd_df.to_parquet(f"{processed_ids_path}_pandas.parquet")
    
#     print(f"Successfully saved data using pandas to {processed_data_path}_pandas.parquet")
#     print(f"Successfully saved IDs using pandas to {processed_ids_path}_pandas.parquet")
    
# except Exception as e:
#     print(f"Error with pandas approach: {str(e)}")

Converting to pandas dataframe and saving using pandas...
Error with pandas approach: [Errno 111] Connection refused
