In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, struct, udf, lit, concat_ws,pandas_udf
from pyspark.sql.types import ArrayType, StringType, FloatType, StructType, StructField
import pandas as pd
import numpy as np
from sentence_transformers import SentenceTransformer
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.cluster import KMeans
from scipy import stats
import matplotlib.pyplot as plt
import seaborn as sns
import uuid
import os

import datasets

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# Set Java Home
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@17"
os.environ["SPARK_HOME"] = "/Users/tankwin08/Documents/spark"
os.environ["PYSPARK_PYTHON"] = 'python3'

# Initialize Spark Session
# Initialize Spark session with optimized configurations
spark = SparkSession.builder \
    .appName("SEC10KAnalysis") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.memory.offHeap.enabled", "true") \
    .config("spark.memory.offHeap.size", "4g") \
    .config("spark.default.parallelism", "200") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.task.maxDirectMemory", "2g") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/08 17:48:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# Function to chunk text
def chunk_text(text, max_length=500):
    if not text or not isinstance(text, str):
        return []
    words = text.split()
    chunks = []
    current_chunk = []
    current_length = 0
    for word in words:
        current_length += len(word) + 1
        if current_length > max_length:
            chunks.append(" ".join(current_chunk))
            current_chunk = [word]
            current_length = len(word) + 1
        else:
            current_chunk.append(word)
    if current_chunk:
        chunks.append(" ".join(current_chunk))
    return chunks

chunk_text_udf = udf(chunk_text, ArrayType(StringType()))

# Overview

## load data

In [4]:
year_2020 = datasets.load_dataset("eloukas/edgar-corpus",
                                "year_2020", 
                                split="train")

pandas_df = year_2020.to_pandas()
pandas_df = pandas_df.fillna(pd.NA).replace([pd.NA], [None])

df = spark.createDataFrame(pandas_df)


In [5]:
## chose top 10 as example for following demo
data = df.limit(10)

In [6]:
data.show()

25/06/08 17:50:10 WARN TaskSetManager: Stage 0 contains a task of very large size (7985 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+----------------+-------+----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|        filename|    cik|year|           section_1|          section_1A|          section_1B|           section_2|           section_3|           section_4|           section_5|           section_6|           section_7|          section_7A|           section_8|           section_9|          section_9A|          section_9B|          section_10|          section_11|          section_12|          section_13|          section_14|          section_15|
+----------------+-------+----+--------------------+--------------------+--------------------+--

In [7]:
## all of text data we should use is in data
sections = data.columns[3:]

In [8]:
# Convert sections to chunks
## here repartition to reduce task size is very important, as it runs locally
chunked_data = data.select(
    "cik", "year",
    *[chunk_text_udf(col(section)).alias(f"{section}_chunks") for section in sections]
).repartition(1000)  # Increase partitions to reduce task size

In [13]:
chunked_data.show(2)

25/06/08 18:08:17 WARN TaskSetManager: Stage 16 contains a task of very large size (7985 KiB). The maximum recommended task size is 1000 KiB.
25/06/08 18:08:23 WARN PythonRunner: Detected deadlock while completing task 66.0 in stage 16 (TID 393): Attempting to kill Python Worker
25/06/08 18:08:25 WARN PythonRunner: Detected deadlock while completing task 133.0 in stage 16 (TID 460): Attempting to kill Python Worker

+-------+----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|    cik|year|    section_1_chunks|   section_1A_chunks|   section_1B_chunks|    section_2_chunks|    section_3_chunks|    section_4_chunks|    section_5_chunks|    section_6_chunks|    section_7_chunks|   section_7A_chunks|    section_8_chunks|    section_9_chunks|   section_9A_chunks|   section_9B_chunks|   section_10_chunks|   section_11_chunks|   section_12_chunks|   section_13_chunks|   section_14_chunks|   section_15_chunks|
+-------+----+--------------------+--------------------+--------------------+--------------------+--------------------+-----------

25/06/08 18:08:26 WARN PythonRunner: Detected deadlock while completing task 199.0 in stage 16 (TID 526): Attempting to kill Python Worker
                                                                                

In [9]:
# Explode each section's chunks individually and union
dfs = []
for section in sections:
    section_df = chunked_data.select(
        "cik",
        "year",
        lit(section).alias("section"),
        explode(col(f"{section}_chunks")).alias("chunk_text")
    ).filter(col("chunk_text").isNotNull())
    dfs.append(section_df)

# Union all section DataFrames
final_chunks = dfs[0]
for df in dfs[1:]:
    final_chunks = final_chunks.union(df)

In [11]:
section_df.show(2)

25/06/08 18:04:47 WARN TaskSetManager: Stage 1 contains a task of very large size (7985 KiB). The maximum recommended task size is 1000 KiB.
25/06/08 18:04:53 WARN PythonRunner: Detected deadlock while completing task 66.0 in stage 1 (TID 67): Attempting to kill Python Worker
25/06/08 18:04:54 WARN PythonRunner: Detected deadlock while completing task 133.0 in stage 1 (TID 134): Attempting to kill Python Worker

+------+----+----------+--------------------+
|   cik|year|   section|          chunk_text|
+------+----+----------+--------------------+
|723531|2020|section_15|Item 15. Exhibits...|
|723531|2020|section_15|Non-Qualified Sto...|
+------+----+----------+--------------------+
only showing top 2 rows



25/06/08 18:04:55 WARN PythonRunner: Detected deadlock while completing task 199.0 in stage 1 (TID 200): Attempting to kill Python Worker
                                                                                

In [15]:
final_chunks.show(2)

25/06/08 18:09:26 WARN TaskSetManager: Stage 31 contains a task of very large size (7985 KiB). The maximum recommended task size is 1000 KiB.
25/06/08 18:09:29 WARN TaskSetManager: Stage 32 contains a task of very large size (7985 KiB). The maximum recommended task size is 1000 KiB.
25/06/08 18:09:31 WARN PythonRunner: Detected deadlock while completing task 66.0 in stage 31 (TID 719): Attempting to kill Python Worker
25/06/08 18:09:32 WARN TaskSetManager: Stage 33 contains a task of very large size (7985 KiB). The maximum recommended task size is 1000 KiB.
25/06/08 18:09:33 WARN PythonRunner: Detected deadlock while completing task 199.0 in stage 31 (TID 852): Attempting to kill Python Worker
25/06/08 18:09:34 WARN PythonRunner: Detected deadlock while completing task 66.0 in stage 32 (TID 919): Attempting to kill Python Worker
25/06/08 18:09:34 WARN TaskSetManager: Stage 34 contains a task of very large size (7985 KiB). The maximum recommended task size is 1000 KiB.
25/06/08 18:09:35

+-------+----+---------+--------------------+
|    cik|year|  section|          chunk_text|
+-------+----+---------+--------------------+
|1559720|2020|section_1|Item 1. Business ...|
|1559720|2020|section_1|stakeholders and ...|
+-------+----+---------+--------------------+
only showing top 2 rows



In [16]:
# Convert to Pandas
chunks_df = final_chunks.select("cik", "year", "section", "chunk_text").toPandas()

25/06/08 18:10:30 WARN TaskSetManager: Stage 255 contains a task of very large size (7985 KiB). The maximum recommended task size is 1000 KiB.
25/06/08 18:10:34 WARN TaskSetManager: Stage 256 contains a task of very large size (7985 KiB). The maximum recommended task size is 1000 KiB.
25/06/08 18:10:35 WARN PythonRunner: Detected deadlock while completing task 66.0 in stage 255 (TID 4864): Attempting to kill Python Worker
25/06/08 18:10:37 WARN PythonRunner: Detected deadlock while completing task 133.0 in stage 255 (TID 4931): Attempting to kill Python Worker
25/06/08 18:10:37 WARN TaskSetManager: Stage 257 contains a task of very large size (7985 KiB). The maximum recommended task size is 1000 KiB.
25/06/08 18:10:38 WARN PythonRunner: Detected deadlock while completing task 199.0 in stage 255 (TID 4997): Attempting to kill Python Worker
25/06/08 18:10:39 WARN PythonRunner: Detected deadlock while completing task 66.0 in stage 256 (TID 5064): Attempting to kill Python Worker
25/06/08 

In [None]:
# Define Pandas UDF for embeddings with dimensionality reduction,
# as local mac has memory issue, we need to used a lightweight model and small embedding dimension

# model = SentenceTransformer('all-distilroberta-v1')  # Lightweight model
# pca_reducer = PCA(n_components=128, random_state=42)  # Reduce to 128 dimensions
# @pandas_udf(ArrayType(FloatType()))
# def get_embedding_pandas(texts: pd.Series) -> pd.Series:
#     # Process embeddings in smaller batches
#     batch_size = 16
#     embeddings = []
#     for i in range(0, len(texts), batch_size):
#         batch = texts[i:i + batch_size]
#         batch_embeddings = model.encode(batch.tolist(), batch_size=batch_size, show_progress_bar=False)
#         # Reduce dimensionality to 128
#         batch_embeddings = pca_reducer.fit_transform(batch_embeddings) if i == 0 else pca_reducer.transform(batch_embeddings)
#         embeddings.extend(batch_embeddings)
#     return pd.Series(embeddings)


In [17]:
# Initialize model and PCA
model = SentenceTransformer('paraphrase-MiniLM-L3-v2')  # Smallest model
pca_reducer = PCA(n_components=64, random_state=42)  # Reduce to 64 dimensions

# Compute embeddings in batches
batch_size = 64
embeddings = []
for i in range(0, len(chunks_df), batch_size):
    batch = chunks_df['chunk_text'][i:i + batch_size]
    batch_embeddings = model.encode(batch.tolist(), batch_size=batch_size, show_progress_bar=False)
    batch_embeddings = pca_reducer.fit_transform(batch_embeddings) if i == 0 else pca_reducer.transform(batch_embeddings)
    embeddings.extend(batch_embeddings)

# Add embeddings to DataFrame
chunks_df['embedding'] = embeddings

# Save to CSV
chunks_df.to_csv("results/chunk_embeddings.csv", index=False)

print("Embeddings saved to chunk_embeddings.csv")
spark.stop()

  X_transformed = X @ self.components_.T
  X_transformed = X @ self.components_.T
  X_transformed = X @ self.components_.T
  X_transformed -= xp.reshape(self.mean_, (1, -1)) @ self.components_.T
  X_transformed -= xp.reshape(self.mean_, (1, -1)) @ self.components_.T
  X_transformed -= xp.reshape(self.mean_, (1, -1)) @ self.components_.T
  X_transformed = X @ self.components_.T
  X_transformed = X @ self.components_.T
  X_transformed = X @ self.components_.T
  X_transformed -= xp.reshape(self.mean_, (1, -1)) @ self.components_.T
  X_transformed -= xp.reshape(self.mean_, (1, -1)) @ self.components_.T
  X_transformed -= xp.reshape(self.mean_, (1, -1)) @ self.components_.T
  X_transformed = X @ self.components_.T
  X_transformed = X @ self.components_.T
  X_transformed = X @ self.components_.T
  X_transformed -= xp.reshape(self.mean_, (1, -1)) @ self.components_.T
  X_transformed -= xp.reshape(self.mean_, (1, -1)) @ self.components_.T
  X_transformed -= xp.reshape(self.mean_, (1, -1)) @ se

Embeddings saved to chunk_embeddings.csv


# Analysis for these embedding,
we also can used pyspark, but my memory is not enough to run pyspark.

In [18]:
# Standardize embeddings
scaler = StandardScaler()
scaled_embeddings = scaler.fit_transform(embeddings)

# Apply PCA for dimensionality reduction
pca = PCA(n_components=2)
pca_embeddings = pca.fit_transform(scaled_embeddings)

  C = X.T @ X
  C = X.T @ X
  C = X.T @ X
  X_transformed = X @ self.components_.T
  X_transformed = X @ self.components_.T
  X_transformed = X @ self.components_.T


In [19]:
chunks_df['pca_x'] = pca_embeddings[:, 0]
chunks_df['pca_y'] = pca_embeddings[:, 1]

In [20]:
# Perform KMeans clustering, here just randomly pick up 5, if we need to optmize,
# then we need to use eblow or silhouette method.
kmeans = KMeans(n_clusters=5, random_state=42)
chunks_df['cluster'] = kmeans.fit_predict(pca_embeddings)

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
  ret = a @ b
  ret = a @ b
  ret = a @ b
  current_pot = closest_dist_sq @ sample_weight
  current_pot = closest_dist_sq @ sample_weight
  current_pot = closest_dist_sq @ sample_weight


In [21]:
# Identify outliers using Z-score, this also use other mothods
z_scores = np.abs(stats.zscore(pca_embeddings))
chunks_df['outlier'] = (z_scores > 3).any(axis=1).astype(int)

In [23]:
chunks_df.head()

Unnamed: 0,cik,year,section,chunk_text,embedding,pca_x,pca_y,cluster,outlier
0,1559720,2020,section_1,Item 1. Business Overview Airbnb is a communit...,"[1.2164693, 0.69509125, -1.0215118, 0.39924327...",-0.116572,0.240739,4,0
1,1559720,2020,section_1,stakeholders and is designed with all of them ...,"[0.22025602, 0.41346008, -0.26433048, -0.69899...",3.417047,-1.685867,2,0
2,1559720,2020,section_1,"artists, our hosts span more than 220 countrie...","[1.0187154, -0.855402, -0.3409041, -0.03173965...",1.211673,0.538441,2,0
3,1559720,2020,section_1,they visit and the people who live there. Our ...,"[1.1903803, -0.45058036, -0.6615783, 0.1293522...",1.66439,1.002963,2,0
4,1559720,2020,section_1,directly on Airbnb through our website or mobi...,"[1.3744688, 0.18420734, -0.9135285, -0.1172117...",0.922817,-0.116744,4,0


In [24]:
# Save processed data for later use
chunks_df.to_csv("results/processed_sec_data.csv", index=False)

## plot

There is a lot of room we can make this more beautiful and informative. but time not allows

In [25]:
# Generate plots
output_dir = "plots"
os.makedirs(output_dir, exist_ok=True)

# Plot 1: Embeddings colored by cluster
plt.figure(figsize=(10, 8))
sns.scatterplot(data=chunks_df, x='pca_x', y='pca_y', hue='cluster', palette='viridis')
plt.title('Document Chunks in 2D Space (Colored by Cluster)')
plt.savefig(os.path.join(output_dir, 'clusters.jpg'))
plt.close()

# Plot 2: Embeddings colored by outlier flag
plt.figure(figsize=(10, 8))
sns.scatterplot(data=chunks_df, x='pca_x', y='pca_y', hue='outlier', palette='coolwarm')
plt.title('Document Chunks in 2D Space (Colored by Outlier Flag)')
plt.savefig(os.path.join(output_dir, 'outliers.jpg'))
plt.close()

# Plot 3: Embeddings colored by section
plt.figure(figsize=(10, 8))
sns.scatterplot(data=chunks_df, x='pca_x', y='pca_y', hue='section', palette='tab20')
plt.title('Document Chunks in 2D Space (Colored by Section)')
plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.tight_layout()
plt.savefig(os.path.join(output_dir, 'sections.jpg'))
plt.close()



In [26]:
## to optimize my memoery usage
spark.stop()