#### DiC Assignment 2

Group 6
Members:
 Theresa Mayer
 Theresa Bruckner
 Jan Tölken
 Can Kenan Kandil 
 Thomas Klar


In [1]:
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [2]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, IDF, ChiSqSelector, IndexToString, StringIndexer, CountVectorizer, Normalizer
from pyspark.sql import SparkSession

In [3]:
from pyspark import SparkContext, SparkFiles
import json
import re
from heapq import nlargest


In [4]:
path = "hdfs:///user/dic25_shared/amazon-reviews/full/reviews_devset.json"


In [5]:
spark = SparkSession.builder.appName("Assignment2").getOrCreate()

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/spark/jars/log4j-slf4j-impl-2.17.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


25/05/13 11:37:37 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/05/13 11:37:37 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
25/05/13 11:37:37 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
25/05/13 11:37:37 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.
25/05/13 11:37:37 WARN Utils: Service 'SparkUI' could not bind on port 4044. Attempting port 4045.
25/05/13 11:37:37 WARN Utils: Service 'SparkUI' could not bind on port 4045. Attempting port 4046.
25/05/13 11:37:37 WARN Utils: Service 'SparkUI' could not bind on port 4046. Attempting port 4047.
25/05/13 11:37:37 WARN Utils: Service 'SparkUI' could not bind on port 4047. Attempting port 4048.
25/05/13 11:37:37 WARN Utils: Service 'SparkUI' could not bind on port 4048. Attempting port 4049.
25/05/13 11:37:37 WARN Utils: Service 'SparkUI' could not bind on port 4049. Attempting port 4050.
25/05/13 1

## Part 1

In [6]:
categories_json = "categories.json"
stopwords_txt   = "stopwords.txt"
output_path     = "chi_square_rdd.txt"

In [7]:
sc = spark.sparkContext

In [8]:
docs = sc.textFile(path)

In [9]:
df   = spark.read.json(path).select("category","reviewText")

                                                                                

In [10]:
docs = df.rdd.map(lambda row: json.dumps({
    "category":   row.category,
    "reviewText": row.reviewText
}))

In [11]:
sc.addFile(categories_json)
sc.addFile(stopwords_txt)

categories_map = json.load(open(SparkFiles.get(categories_json), encoding='utf-8'))
stopwords      = set(line.strip() for line in open(SparkFiles.get(stopwords_txt), encoding='utf-8'))

### Regex similar to Assignment 1


In [12]:
split_on = r"[ \t\d(){}\[\].!?;:,+=\"~#@&*%€$§\\'\n\r/-]+"


In [13]:
splitter = re.compile(split_on)

#### implementing broadcasts for efficiency


In [14]:
bc_categories = sc.broadcast(categories_map)
bc_stopwords  = sc.broadcast(stopwords)
bc_splitter   = sc.broadcast(splitter)

### Helper functions

In [15]:
def extract_terms(line, categories_map, stopwords, splitter):
    """
    - function for determining the category, putting the text in lower case, performing the tokenization and stopword removal
    The functionality is imitated according to mapper_count from Assignment 1
    """
    doc = json.loads(line)
    cat = categories_map.get(doc.get('category',''), 'X')
    text = doc.get('reviewText','').lower()
    tokens = set(splitter.split(text))
    tokens = tokens.difference(stopwords)
    return [((token, cat), 1) for token in tokens if len(token) > 1]

In [16]:
def compute_chi(item):
    """
    calculating the chi-square values similar to the reducer_calc_chisq
    """
    term, counts = item
    counts_dict = dict(counts)
    n_term = sum(counts_dict.values())
    N      = total_docs
    out    = []
    for cat, n_cat in bc_cat_counts.value.items():
        A = counts_dict.get(cat, 0)
        B = n_term - A
        C = n_cat   - A
        D = N - n_cat - B
        denom = (A + B)*(A + C)*(B + D)*(C + D)
        if denom > 0:
            chi = float(N)*(A*D - B*C)**2/denom
            out.append((cat, (term, chi)))
    return out

In [17]:
docs = sc.textFile(path)
total_docs = docs.count()

                                                                                

#### implementing category counts - similar to the CategoryCounter job of MapReduce


In [18]:
cat_counts = (
    docs
    .map(lambda line: json.loads(line).get('category',''))
    .map(lambda c: (bc_categories.value.get(c,'X'), 1))
    .reduceByKey(lambda a, b: a + b)
    .collectAsMap()
)
bc_cat_counts = sc.broadcast(cat_counts)

                                                                                

#### implementing term-category-counts - similar to mapper_count (by using the function extract terms)


In [19]:
term_cat_counts = (
    docs
    .flatMap(lambda line: extract_terms(
        line,
        bc_categories.value,
        bc_stopwords.value,
        bc_splitter.value
    ))
    .reduceByKey(lambda a, b: a + b)
)

#### grouping by term


In [20]:
term_counts = (
    term_cat_counts
    .map(lambda kv: (kv[0][0], (kv[0][1], kv[1])))
    .groupByKey()
)

#### calculating the chi values

In [21]:
chi_terms = term_counts.flatMap(compute_chi)

### Top75

In [22]:
top_terms = (
    chi_terms
    .groupByKey()
    .mapValues(lambda vals: nlargest(75, vals, key=lambda x: x[1]))
    .collect()
)

                                                                                

#### Formatting the output as in Assignment1
 

In [23]:
with open(output_path, 'w', encoding='utf-8') as f:
    for cat, terms in sorted(top_terms, key=lambda x: x[0]):
        cat_name = next((k for k, v in categories_map.items() if v == cat), cat)
        elems = [f"{t}:{chi:.2f}" for t, chi in terms]
        f.write(cat_name + " " + " ".join(elems) + "\n")
    merged = sorted({t for _, terms in top_terms for t, _ in terms})
    f.write(" ".join(merged))

sc.stop()


## Part 2

In [25]:
spark = SparkSession.builder.appName("Assignment2").getOrCreate()


25/05/13 11:40:07 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/05/13 11:40:07 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
25/05/13 11:40:07 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
25/05/13 11:40:07 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.
25/05/13 11:40:07 WARN Utils: Service 'SparkUI' could not bind on port 4044. Attempting port 4045.
25/05/13 11:40:07 WARN Utils: Service 'SparkUI' could not bind on port 4045. Attempting port 4046.
25/05/13 11:40:07 WARN Utils: Service 'SparkUI' could not bind on port 4046. Attempting port 4047.
25/05/13 11:40:07 WARN Utils: Service 'SparkUI' could not bind on port 4047. Attempting port 4048.
25/05/13 11:40:07 WARN Utils: Service 'SparkUI' could not bind on port 4048. Attempting port 4049.
25/05/13 11:40:07 WARN Utils: Service 'SparkUI' could not bind on port 4049. Attempting port 4050.
25/05/13 1

In [26]:
input_file = spark.read.format("json").load(path).select("category", "reviewText")

                                                                                

In [27]:
input_file.show(n=5)

+--------------------+--------------------+
|            category|          reviewText|
+--------------------+--------------------+
|Patio_Lawn_and_Garde|This was a gift f...|
|Patio_Lawn_and_Garde|This is a very ni...|
|Patio_Lawn_and_Garde|The metal base wi...|
|Patio_Lawn_and_Garde|For the most part...|
|Patio_Lawn_and_Garde|This hose is supp...|
+--------------------+--------------------+
only showing top 5 rows



### Label Encoding

In [28]:
indexer = StringIndexer(inputCol="category", outputCol="label")
indexModel = indexer.fit(input_file)
input_file_1 = indexModel.transform(input_file)

                                                                                

In [29]:
reindexer = IndexToString(inputCol=indexer.getOutputCol(), outputCol="category_reindexed")
reindexer.transform(input_file_1).show(n=5)

+--------------------+--------------------+-----+--------------------+
|            category|          reviewText|label|  category_reindexed|
+--------------------+--------------------+-----+--------------------+
|Patio_Lawn_and_Garde|This was a gift f...| 18.0|Patio_Lawn_and_Garde|
|Patio_Lawn_and_Garde|This is a very ni...| 18.0|Patio_Lawn_and_Garde|
|Patio_Lawn_and_Garde|The metal base wi...| 18.0|Patio_Lawn_and_Garde|
|Patio_Lawn_and_Garde|For the most part...| 18.0|Patio_Lawn_and_Garde|
|Patio_Lawn_and_Garde|This hose is supp...| 18.0|Patio_Lawn_and_Garde|
+--------------------+--------------------+-----+--------------------+
only showing top 5 rows



### Tokenization

In [30]:
tokenizer = RegexTokenizer(inputCol='reviewText', outputCol='tokens', pattern=r"[ \t\d(){}\[\].!?;:,\-=\"~#@&*%€$§\\'\n\r\/]+", minTokenLength=2, toLowercase=True)

In [31]:
input_2 = tokenizer.transform(input_file_1)
input_2.show(n=5)

+--------------------+--------------------+-----+--------------------+
|            category|          reviewText|label|              tokens|
+--------------------+--------------------+-----+--------------------+
|Patio_Lawn_and_Garde|This was a gift f...| 18.0|[this, was, gift,...|
|Patio_Lawn_and_Garde|This is a very ni...| 18.0|[this, is, very, ...|
|Patio_Lawn_and_Garde|The metal base wi...| 18.0|[the, metal, base...|
|Patio_Lawn_and_Garde|For the most part...| 18.0|[for, the, most, ...|
|Patio_Lawn_and_Garde|This hose is supp...| 18.0|[this, hose, is, ...|
+--------------------+--------------------+-----+--------------------+
only showing top 5 rows



### Stopword Removal

In [32]:
stopword_file = "stopwords.txt"
with open(stopword_file, 'r', encoding='utf-8') as f:
    # Strip whitespace and convert to lowercase
    stopwords = [line.strip() for line in f]

In [33]:
stopword_remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), 
                                    outputCol="tokens_nostop",
                                    stopWords=stopwords)

In [34]:
input_3 = stopword_remover.transform(input_2)
input_3.select("tokens", "tokens_nostop").show(n=5)

+--------------------+--------------------+
|              tokens|       tokens_nostop|
+--------------------+--------------------+
|[this, was, gift,...|[gift, husband, m...|
|[this, is, very, ...|[nice, spreader, ...|
|[the, metal, base...|[metal, base, hos...|
|[for, the, most, ...|[part, works, pre...|
|[this, hose, is, ...|[hose, supposed, ...|
+--------------------+--------------------+
only showing top 5 rows



### TF-IDF Calculation

In [35]:
tf = CountVectorizer(inputCol=stopword_remover.getOutputCol(), 
                      outputCol="tf_output", 
                      vocabSize=40_000)

In [36]:
idf = IDF(inputCol=tf.getOutputCol(), 
          outputCol="tfidf_output",
          minDocFreq=4)

In [37]:
tfmodel = tf.fit(input_3)
input_4 = tfmodel.transform(input_3)
input_4.select("tokens_nostop", "tf_output").show(n=5)

                                                                                

+--------------------+--------------------+
|       tokens_nostop|           tf_output|
+--------------------+--------------------+
|[gift, husband, m...|(40000,[2,3,7,8,3...|
|[nice, spreader, ...|(40000,[0,1,3,21,...|
|[metal, base, hos...|(40000,[4,10,29,1...|
|[part, works, pre...|(40000,[1,3,4,9,1...|
|[hose, supposed, ...|(40000,[12,32,42,...|
+--------------------+--------------------+
only showing top 5 rows



In [38]:
idfModel = idf.fit(input_4)
input_5 = idfModel.transform(input_4)
input_5.select("tf_output", "tfidf_output").show(n=5)

                                                                                

25/05/13 11:42:10 WARN DAGScheduler: Broadcasting large task binary with size 1082.1 KiB
+--------------------+--------------------+
|           tf_output|        tfidf_output|
+--------------------+--------------------+
|(40000,[2,3,7,8,3...|(40000,[2,3,7,8,3...|
|(40000,[0,1,3,21,...|(40000,[0,1,3,21,...|
|(40000,[4,10,29,1...|(40000,[4,10,29,1...|
|(40000,[1,3,4,9,1...|(40000,[1,3,4,9,1...|
|(40000,[12,32,42,...|(40000,[12,32,42,...|
+--------------------+--------------------+
only showing top 5 rows



### Selection of top 2000 features

In [39]:
chisq = ChiSqSelector(featuresCol=idf.getOutputCol(),
                      labelCol="label",
                      outputCol="features",
                      numTopFeatures=2000)

In [40]:
chisqModel = chisq.fit(input_5)
input_6 = chisqModel.transform(input_5)
input_6.select("features").show(n=5)

25/05/13 11:42:11 WARN DAGScheduler: Broadcasting large task binary with size 1093.4 KiB
25/05/13 11:42:11 WARN DAGScheduler: Broadcasting large task binary with size 1095.5 KiB


                                                                                

25/05/13 11:42:25 WARN DAGScheduler: Broadcasting large task binary with size 1097.5 KiB


                                                                                

25/05/13 11:42:44 WARN DAGScheduler: Broadcasting large task binary with size 1087.3 KiB
+--------------------+
|            features|
+--------------------+
|(2000,[2,3,7,8,35...|
|(2000,[0,1,3,21,3...|
|(2000,[4,10,174,3...|
|(2000,[1,3,4,9,10...|
|(2000,[12,29,101,...|
+--------------------+
only showing top 5 rows



### Pipeline Creation

In [41]:
def get_pipeline(n_features=2000):
    chisq.setNumTopFeatures(n_features)
    pipeline = Pipeline(stages=[
        indexer,
        tokenizer,
        stopword_remover,
        tf,
        idf,
        chisq
    ])
    return pipeline

In [42]:
pipeline = get_pipeline(n_features=2000)
preprocessing_pipeline = pipeline.fit(input_file)
preprocessing_pipeline.transform(input_file).select("label", "features").show(n=5)

                                                                                

25/05/13 11:43:12 WARN DAGScheduler: Broadcasting large task binary with size 1093.4 KiB
25/05/13 11:43:12 WARN DAGScheduler: Broadcasting large task binary with size 1095.5 KiB


                                                                                

25/05/13 11:43:25 WARN DAGScheduler: Broadcasting large task binary with size 1097.5 KiB


                                                                                

25/05/13 11:43:44 WARN DAGScheduler: Broadcasting large task binary with size 1093.3 KiB
+-----+--------------------+
|label|            features|
+-----+--------------------+
| 18.0|(2000,[2,3,7,8,35...|
| 18.0|(2000,[0,1,3,21,3...|
| 18.0|(2000,[4,10,174,3...|
| 18.0|(2000,[1,3,4,9,10...|
| 18.0|(2000,[12,29,101,...|
+-----+--------------------+
only showing top 5 rows



### Export most important tokens to file

In [43]:
def get_top_terms_from_pipeline(pipeline):
    n = len(pipeline.stages[5].selectedFeatures)


    vocab = pipeline.stages[3].vocabulary.copy()
    top_words = " ".join(sorted([vocab[i] for i in pipeline.stages[5].selectedFeatures]))
    
    with open("output_ds.txt", "w") as f:
        f.write(top_words)
        
    return n

In [44]:
get_top_terms_from_pipeline(preprocessing_pipeline)

2000

# Part 3

For this part we create a svm classifier to predict the categories based on their review text. 

First we import the necessary libraries from pyspark,...

In [45]:
from pyspark.ml.classification import LinearSVC, OneVsRest
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

import time 
from datetime import datetime

#to ignore the warning messages coming from the parameter grid search when training tvs
spark.sparkContext.setLogLevel("ERROR")

Now we add the needed steps for our pipeline, starting with a normalizer with L2 norm by using p=2

In [46]:
#Normalizer using features from the ChiSquare step and output should be the normalized features
normalizer = Normalizer(inputCol="features", outputCol="norm_features", p =2)


Next we create the classifier and add One-vs-Rest since SVM is only binary and we have a multi-class-problem. For further explanation on why OVR is used see the report. 

In [47]:
#SVM uses the normalized features and the created label from the indexer
svm = LinearSVC(labelCol="label", featuresCol="norm_features")
ovr = OneVsRest(classifier=svm, labelCol="label", featuresCol="norm_features")

Now we create the get_full_pipeline function to be able to switch between used features. 

In [48]:
#Pipeline with additional normalization and SVM with OVR 
def get_full_pipeline(n_features=2000):
    chisq.setNumTopFeatures(n_features)
    full_pipeline = Pipeline(stages=[
    indexer,
    tokenizer,
    stopword_remover,
    tf,
    idf,
    chisq,
    normalizer,
    ovr
    ])
    return full_pipeline

Now we create the parameter grid to compare the input parameters for SVM to get the best performing model based on F1-Score. 

In [49]:
paramGrid = (ParamGridBuilder()
    .addGrid(svm.regParam, [0.01, 0.1, 1.0])
    .addGrid(svm.maxIter, [10, 100])
    .addGrid(svm.standardization, [True, False])
    .build()
)

Now we split the input data into training and test (we also had to desample the input file, since otherwise the training of the classifier in combination with the parameter grid would take too long). We decided to use 30% of the input data, since with this split we get a good ratio between computing time and sample size. 

Here we dont split the data into training and validation since this will be done automaticall in the function TrainValidationSplit. 

In [50]:
# Splitting dataset 
devset_sample, devset_rest = input_file.randomSplit([0.3,0.7], seed = 1234)
train_data, test_data = devset_sample.randomSplit([0.8, 0.2], seed=1234)

In [51]:
print(f"used data for modeling has {devset_sample.count()} rows")
print(f"training data has {train_data.count()} rows")
print(f" so {train_data.count() * 0.2} rows will be used for validation")

                                                                                

used data for modeling has 23665 rows




training data has 18905 rows




 so 3781.0 rows will be used for validation


                                                                                

Lastly we need an evaluator with F1-metric.

In [52]:
# Evaluator
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")

## Model with 2000 features 

Now we train the model using the top 2000 features. We start with getting the full pipeline with n_features = 2000.

In [53]:
#Full pipeline with 2000 features
full_pipeline = get_full_pipeline(n_features=2000)


Now we set up the TrainValidationSplit setup for the computation later on. Here we predefined that we use 80% of the data for training and the remaining 20% for validation for parameter decision.

In [54]:
# TrainValidationSplit Setup
tvs = TrainValidationSplit(
    estimator=full_pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    trainRatio=0.8,
    parallelism=50
)


In [55]:
# Training with 2000 features 
start_time = time.time()
start_time_readable = datetime.fromtimestamp(start_time).strftime('%Y-%m-%d %H:%M:%S')
print(f"starting at {start_time_readable}") 
tvs_model = tvs.fit(train_data)
fit_time = time.time() - start_time
print(f'fit_time={fit_time}')

starting at 2025-05-13 11:43:49


[Stage 191:>  (0 + 0) / 2][Stage 192:=> (1 + 1) / 2][Stage 194:>  (0 + 1) / 2]  

KeyboardInterrupt: 

In [None]:
#fit_time=3726.6424901485443 bei 20 parallelism
#fit_time=2063.811951637268 bei 50 parallelism 
#fit_time=2226.436852455139 bei 100 
#fit_time=3263.985917568207 bei 50 parallelism und 30% von devset 

In [None]:
# Evaluation
test_predictions = tvs_model.transform(test_data)

In [None]:
best_model = tvs_model.bestModel

In [None]:
ovr_model = best_model.stages[-1] 
best_svm_model = ovr_model.getClassifier()

# Show Parameters
print("Best model:")
print(f"  regParam:        {best_svm_model.getRegParam()}")
print(f"  maxIter:         {best_svm_model.getMaxIter()}")
print(f"  standardization: {best_svm_model.getStandardization()}")

In [None]:
test_f1 = evaluator.evaluate(test_predictions)

print(f"Test f1: {test_f1:.4f}")

Validation f1: 0.5712
Test f1: 0.5680

## Model with 500 features

Now we do the same with only 500 features

In [None]:
full_pipeline_500 = get_full_pipeline(n_features=500)

In [None]:
# TrainValidationSplit Setup
tvs_500 = TrainValidationSplit(
    estimator=full_pipeline_500,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    trainRatio=0.8,
    parallelism=50
)


In [None]:
# Training with 500 features 
start_time = time.time()
start_time_readable = datetime.fromtimestamp(start_time).strftime('%Y-%m-%d %H:%M:%S')
print(f"starting at {start_time_readable}") 
tvs_model_500 = tvs_500.fit(train_data)
fit_time = time.time() - start_time
print(f'fit_time={fit_time}')

In [None]:
# Evaluation
test_predictions_500 = tvs_model_500.transform(test_data)

In [None]:
best_model_500 = tvs_model_500.bestModel

In [None]:
ovr_model_500 = best_model_500.stages[-1] 
best_svm_model_500 = ovr_model_500.getClassifier()

# Show best parameters
print("Best model:")
print(f"  regParam:        {best_svm_model_500.getRegParam()}")
print(f"  maxIter:         {best_svm_model_500.getMaxIter()}")
print(f"  standardization: {best_svm_model_500.getStandardization()}")

In [None]:
test_f1_500 = evaluator.evaluate(test_predictions_500)

print(f"Test f1: {test_f1_500:.4f}")

## Final turn with full model 

After getting the best parameter, we will create the final svm with these parameters and create a new pipeline for the full dataset (so no parameter optimization has to be done on the full set. 

In [None]:
path = "hdfs:///user/dic25_shared/amazon-reviews/full/reviewscombined.json"
final_file = spark.read.format("json").load(path).select("category", "reviewText")

In [None]:
train_data, test_data = final_file.randomSplit([0.8, 0.2], seed=1234)

In [None]:
best_regParam = best_svm_model.getRegParam()
best_maxIter = best_svm_model.getMaxIter() 
best_standardization = best_svm_model.getStandardization()
best_n_features  = 2000

svm_final = LinearSVC(labelCol="label", featuresCol="norm_features", regParam = best_regParam, maxIter = best_maxIter, standardization = best_standardization)
ovr_final = OneVsRest(classifier=svm_final, labelCol="label", featuresCol="norm_features")

In [None]:
def get_final_pipeline(n_features=best_n_features):
    chisq.setNumTopFeatures(n_features)
    full_pipeline = Pipeline(stages=[
    indexer,
    tokenizer,
    stopword_remover,
    tf,
    idf,
    chisq,
    normalizer,
    ovr
    ])
    return full_pipeline

In [None]:
#Final Pipeline for full set 
final_pipeline = get_final_pipeline(n_features=best_n_features)

final_model = final_pipeline.fit(train_data)

In [None]:
final_predictions = final_model.transform(test_data)
final_f1 = evaluator.evaluate(final_predictions)

print(f"Test f1: {final_f1:.4f}")

### Additional part only for Report 

This part is only used for comparison of different parameters, in finished code this should be commented 

In [None]:
#for protocoll to see how the different settings perform 
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")

# Loop über alle Parameterkombinationen
for params in paramGrid:
    # Setze die Parameter im SVM-Objekt
    for param, value in params.items():
        svm._set(**{param.name: value})

    # Pipeline neu aufbauen mit aktuellem SVM
    current_pipeline = full_pipeline.copy()
    current_pipeline.setStages(full_pipeline.getStages()[:-1] + [OneVsRest(classifier=svm)])

    # Trainiere das Modell
    model = current_pipeline.fit(train_data)

    # Vorhersage auf Validierungsdaten
    predictions = model.transform(val_data)

    # F1-Score berechnen
    f1 = evaluator.evaluate(predictions)

    # Parameter & Score speichern
    results.append((params, f1))

    # Ausgabe
    print("Param-Kombi:")
    for p, v in params.items():
        print(f"  {p.name}: {v}")
    print(f"  → F1-Score: {f1:.4f}\n")

In [None]:
best_params, best_f1 = max(results, key=lambda x: x[1])
print(f"Best combination → F1: {best_f1:.4f}")
for p, v in best_params.items():
    print(f"{p.name}: {v}")

[Stage 194:=> (1 + 1) / 2][Stage 196:>  (0 + 1) / 2][Stage 198:>  (0 + 0) / 2]  