In [None]:
import os
from pathlib import Path
import pandas as pd
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from pyspark.sql.functions import col, lit, lower, concat, regexp_replace, pandas_udf
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.streaming import StreamingContext
import findspark
import pyspark
import json

# ---- PATH SETUP ----
cwd = Path.cwd()
spark_home = cwd / "spark-3.5.5-bin-hadoop3"
hadoop_home = cwd / "winutils"

spark_home_str = str(spark_home.resolve())
hadoop_home_str = str(hadoop_home.resolve())

print(f"I am using the following SPARK_HOME: {spark_home_str}")
if os.name == 'nt':
    os.environ["HADOOP_HOME"] = hadoop_home_str
    print(f"Windows detected: set HADOOP_HOME to: {hadoop_home_str}")
    hadoop_bin = hadoop_home / "bin"
    os.environ["PATH"] = f"{hadoop_bin};{os.environ['PATH']}"
    print(f"  Also added Hadoop bin directory to PATH: {hadoop_bin}")

# ---- INIT SPARK ----
findspark.init(spark_home_str)

spark = pyspark.sql.SparkSession.builder \
    .appName("DistilBERTStreamingPredictor") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "2g") \
    .getOrCreate()

sc = spark.sparkContext
ssc = StreamingContext(sc, 10)

# ---- Enable Arrow for pandas_udf ----
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

# ---- SCHEMA ----
schema = StructType([
    StructField("aid", StringType(), True),
    StructField("title", StringType(), True),
    StructField("summary", StringType(), True),
    StructField("main_category", StringType(), True),
    StructField("categories", StringType(), True),
    StructField("published", StringType(), True)
])

# ---- BROADCAST MODEL PATH ----
model_path = "results"
model_path_broadcast = sc.broadcast(model_path)

# ---- DEVICE SETUP ----
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# ---- MODEL CACHING FOR WORKERS ----
model_cache = None
tokenizer_cache = None
id2label_cache = None

@pandas_udf(StringType())
def predict_udf_batch(texts: pd.Series) -> pd.Series:
    global model_cache, tokenizer_cache, id2label_cache

    if model_cache is None:
        try:
            tokenizer_cache = AutoTokenizer.from_pretrained(model_path_broadcast.value)
            model_cache = AutoModelForSequenceClassification.from_pretrained(model_path_broadcast.value)
            model_cache.eval()
            model_cache.to(device)

            # Load id2label from config.json or fallback to JSON
            id2label_cache = model_cache.config.id2label
            if not id2label_cache:
                id2label_path = os.path.join(model_path_broadcast.value, "id2label.json")
                print(f">>> model.config.id2label was empty. Attempting to load from {id2label_path}")
                if os.path.exists(id2label_path):
                    with open(id2label_path, "r") as f:
                        id2label_cache = json.load(f)
                else:
                    print(">>> id2label.json not found. id2label will remain empty.")
                    id2label_cache = {}

            # Convert keys to int once for consistency
            id2label_cache = {int(k): v for k, v in id2label_cache.items()}

        except Exception as e:
            print(f"[Worker Init Error] {e}")
            return pd.Series(["ERROR_LOADING_MODEL"] * len(texts))

    try:
        inputs = tokenizer_cache(texts.tolist(), return_tensors="pt", truncation=True, padding=True)
        inputs = {k: v.to(device) for k, v in inputs.items()}

        with torch.no_grad():
            logits = model_cache(**inputs).logits

        preds = torch.argmax(logits, dim=1).tolist()
        labels = [id2label_cache.get(p, "UNKNOWN") for p in preds]

        return pd.Series(labels)

    except Exception as e:
        print(f"[Inference Error] {e}")
        return pd.Series(["ERROR"] * len(texts))

# ---- STREAMING LOGIC ----
socket_host = "seppe.net"
socket_port = 7778

def process_rdd(time, rdd):
    print(f"-------- {str(time)} --------")
    if rdd.isEmpty():
        print("RDD is empty, skipping.")
        return

    try:
        df = spark.read.json(rdd, schema=schema)

        processed_df = df.withColumn(
            'combined',
            lower(concat(lit(" [TITLE] "), col('title'), lit(" [SUMMARY] "), col('summary')))
        ).withColumn("combined", regexp_replace(col("combined"), "[^a-zA-Z0-9\\s:]", "")) \
         .withColumn("combined", regexp_replace(col("combined"), "\\s+", " "))

        df_withpreds = processed_df.withColumn("predicted_category", predict_udf_batch(col("combined")))

        df_withpreds.select("title", "main_category", "predicted_category").show(truncate=False)

    except Exception as e:
        print(f"Error processing RDD: {e}")

# ---- STREAM SETUP ----
lines = ssc.socketTextStream(socket_host, socket_port)
lines.foreachRDD(process_rdd)

ssc.start()
ssc.awaitTermination()


I am using the following SPARK_HOME: /home/nika/Desktop/spark/spark-3.5.5-bin-hadoop3


25/05/30 18:23:05 WARN Utils: Your hostname, nika-hp resolves to a loopback address: 127.0.1.1; using 192.168.10.236 instead (on interface wlp6s0)
25/05/30 18:23:05 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/30 18:23:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark Streaming Context started.
RDD is empty, skipping.


25/05/30 18:23:10 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
25/05/30 18:23:10 WARN BlockManager: Block input-0-1748622190000 replicated to only 0 peer(s) instead of 1 peers
25/05/30 18:23:10 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
25/05/30 18:23:10 WARN BlockManager: Block input-0-1748622190200 replicated to only 0 peer(s) instead of 1 peers
25/05/30 18:23:10 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
25/05/30 18:23:10 WARN BlockManager: Block input-0-1748622190400 replicated to only 0 peer(s) instead of 1 peers
25/05/30 18:23:10 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
25/05/30 18:23:10 WARN BlockManager: Block input-0-1748622190600 replicated to only 0 peer(s) instead of 1 peers
25/05/30 18:23:11 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
25/05/30 18:23:11 WARN BlockManager: Block input-0-1748622190800 replicated to



25/05/30 18:23:20 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
25/05/30 18:23:20 WARN BlockManager: Block input-0-1748622200200 replicated to only 0 peer(s) instead of 1 peers
25/05/30 18:23:20 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
25/05/30 18:23:20 WARN BlockManager: Block input-0-1748622200400 replicated to only 0 peer(s) instead of 1 peers
25/05/30 18:23:20 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
25/05/30 18:23:20 WARN BlockManager: Block input-0-1748622200600 replicated to only 0 peer(s) instead of 1 peers
25/05/30 18:23:21 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
25/05/30 18:23:21 WARN BlockManager: Block input-0-1748622200800 replicated to only 0 peer(s) instead of 1 peers
25/05/30 18:23:21 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
25/05/30 18:23:21 WARN BlockManager: Block input-0-1748622201000 replicated to

+---------------------------------------------------------------------------------------------------------------------------------+---------------+------------------+
|title                                                                                                                            |main_category  |predicted_category|
+---------------------------------------------------------------------------------------------------------------------------------+---------------+------------------+
|An Empirical Study on Strong-Weak Model Collaboration for Repo-level\n  Code Generation                                          |cs.AI          |cs                |
|Exposing Go's Hidden Bugs: A Novel Concolic Framework                                                                            |cs.SE          |cs                |
|THiNK: Can Large Language Models Think-aloud?                                                                                    |cs.CL          |cs                

25/05/30 18:23:47 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
25/05/30 18:23:47 WARN BlockManager: Block input-0-1748622227000 replicated to only 0 peer(s) instead of 1 peers
25/05/30 18:23:47 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
25/05/30 18:23:47 WARN BlockManager: Block input-0-1748622227200 replicated to only 0 peer(s) instead of 1 peers
25/05/30 18:23:47 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
25/05/30 18:23:47 WARN BlockManager: Block input-0-1748622227400 replicated to only 0 peer(s) instead of 1 peers
>>> Loading model and tokenizer on worker
>>> Loaded id2label_cache: {0: 'cs', 1: 'econ', 2: 'eess', 3: 'math', 4: 'physics', 5: 'q-bio', 6: 'q-fin', 7: 'stat'}
>>> Raw input batch:
[' title gpumc: a stateless model checker for gpu weak memory concurrency summary gpu computing is embracing weak memory concurrency for performance improvement however compared to cpus modern gpus

+-----------------------------------------------------------------------------------------------------------------------------------------+------------------+------------------+
|title                                                                                                                                    |main_category     |predicted_category|
+-----------------------------------------------------------------------------------------------------------------------------------------+------------------+------------------+
|GPUMC: A Stateless Model Checker for GPU Weak Memory Concurrency                                                                         |cs.LO             |cs                |
|Estimation of multivariate traces of states given partial classical\n  information                                                       |quant-ph          |physics           |
|How to Improve the Robustness of Closed-Source Models on NLI                                                 

>>> Loading model and tokenizer on worker                           (0 + 1) / 1]
25/05/30 18:23:51 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
25/05/30 18:23:51 WARN BlockManager: Block input-0-1748622231000 replicated to only 0 peer(s) instead of 1 peers
>>> Loaded id2label_cache: {0: 'cs', 1: 'econ', 2: 'eess', 3: 'math', 4: 'physics', 5: 'q-bio', 6: 'q-fin', 7: 'stat'}
>>> Raw input batch:
[' title multimodal federated learning with missing modalities through feature imputation network summary multimodal federated learning holds immense potential for collaboratively training models from multiple sources without sharing raw data addressing both data scarcity and privacy concerns two key challenges in healthcare a major challenge in training multimodal federated models in healthcare is the presence of missing modalities due to multiple reasons including variations in clinical practice cost and accessibility constraints retrospective data collection priv

+------------------------------------------------------------------------------------------------------------------------------------------------+-------------+------------------+
|title                                                                                                                                           |main_category|predicted_category|
+------------------------------------------------------------------------------------------------------------------------------------------------+-------------+------------------+
|Multimodal Federated Learning With Missing Modalities through Feature\n  Imputation Network                                                     |cs.LG        |cs                |
|Resonances in Lifetimes of AdS Oscillon                                                                                                         |hep-th       |physics           |
|Derivations for the MPS overlap formulas of rational spin chains                                   

>>> Loading model and tokenizer on worker
>>> Loaded id2label_cache: {0: 'cs', 1: 'econ', 2: 'eess', 3: 'math', 4: 'physics', 5: 'q-bio', 6: 'q-fin', 7: 'stat'}
>>> Raw input batch:
[' title testtime learning for large language models summary while large language models llms have exhibited remarkable emergent capabilities through extensive pretraining they still face critical limitations in generalizing to specialized domains and handling diverse linguistic variations known as distribution shifts in this paper we propose a testtime learning ttl paradigm for llms namely tlm which dynamically adapts llms to target domains using only unlabeled test data during testing specifically we first provide empirical evidence and theoretical insights to reveal that more accurate predictions from llms can be achieved by minimizing the input perplexity of the unlabeled test data based on this insight we formulate the testtime learning process of llms as input perplexity minimization enabling selfsupe

Py4JError: An error occurred while calling o32.awaitTermination

Error processing RDD: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/home/nika/Desktop/spark/spark-3.5.5-bin-hadoop3/python/lib/pyspark.zip/pyspark/worker.py", line 1094, in main
    split_index = read_int(infile)
                  ^^^^^^^^^^^^^^^^
  File "/home/nika/Desktop/spark/spark-3.5.5-bin-hadoop3/python/lib/pyspark.zip/pyspark/serializers.py", line 594, in read_int
    length = stream.read(4)
             ^^^^^^^^^^^^^^
KeyboardInterrupt



25/05/30 18:23:57 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
25/05/30 18:23:57 WARN BlockManager: Block input-0-1748622237000 replicated to only 0 peer(s) instead of 1 peers
25/05/30 18:23:57 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
25/05/30 18:23:57 WARN BlockManager: Block input-0-1748622237200 replicated to only 0 peer(s) instead of 1 peers
25/05/30 18:23:57 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
25/05/30 18:23:57 WARN BlockManager: Block input-0-1748622237400 replicated to only 0 peer(s) instead of 1 peers
25/05/30 18:23:57 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
25/05/30 18:23:57 WARN BlockManager: Block input-0-1748622237600 replicated to only 0 peer(s) instead of 1 peers
25/05/30 18:23:58 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
25/05/30 18:23:58 WARN BlockManager: Block input-0-1748622237800 replicated to

+---------------------------------------------------------------------------------------------------------------------+-------------+------------------+
|title                                                                                                                |main_category|predicted_category|
+---------------------------------------------------------------------------------------------------------------------+-------------+------------------+
|Enhancing Transformation from Natural Language to Signal Temporal Logic\n  Using LLMs with Diverse External Knowledge|cs.CL        |cs                |
|An Optimisation Framework for Unsupervised Environment Design                                                        |cs.LG        |cs                |
|BacktrackAgent: Enhancing GUI Agent with Error Detection and\n  Backtracking Mechanism                               |cs.CL        |cs                |
|Positive Mass in Scalar-Torsion Holography                                       

In [2]:
ssc.stop(stopSparkContext=True, stopGraceFully=True)

25/05/30 18:23:59 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
25/05/30 18:23:59 WARN BlockManager: Block input-0-1748622239400 replicated to only 0 peer(s) instead of 1 peers
25/05/30 18:23:59 WARN SocketReceiver: Error receiving data
java.net.SocketException: Socket closed
	at java.base/java.net.SocketInputStream.socketRead0(Native Method)
	at java.base/java.net.SocketInputStream.socketRead(SocketInputStream.java:115)
	at java.base/java.net.SocketInputStream.read(SocketInputStream.java:168)
	at java.base/java.net.SocketInputStream.read(SocketInputStream.java:140)
	at java.base/sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:295)
	at java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:337)
	at java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:179)
	at java.base/java.io.InputStreamReader.read(InputStreamReader.java:181)
	at java.base/java.io.BufferedReader.fill(BufferedReader.java:161)
	at java.base/java.io.BufferedReader.readLin