In [6]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LinearSVC
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import Normalizer, ChiSqSelector
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, ArrayType
from pyspark.ml.feature import Tokenizer, CountVectorizer
from pyspark.sql.functions import split
from pyspark.ml import Pipeline
from pyspark.ml.classification import LinearSVC
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import Normalizer, ChiSqSelector
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.sql.functions import col

In [8]:
# Create a Spark session
spark = SparkSession.builder.appName("SVMClassifier").getOrCreate()

# Define the file path
file_path = "reviews_devset.json"

# Read the JSON file into a DataFrame
df = spark.read.json(file_path)

# Show the DataFrame
#df.show(truncate=True)

df_sel=df.select('category','reviewText')
df_sel.show()


+--------------------+--------------------+
|            category|          reviewText|
+--------------------+--------------------+
|Patio_Lawn_and_Garde|This was a gift f...|
|Patio_Lawn_and_Garde|This is a very ni...|
|Patio_Lawn_and_Garde|The metal base wi...|
|Patio_Lawn_and_Garde|For the most part...|
|Patio_Lawn_and_Garde|This hose is supp...|
|Patio_Lawn_and_Garde|This tool works v...|
|Patio_Lawn_and_Garde|This product is a...|
|Patio_Lawn_and_Garde|I was excited to ...|
|Patio_Lawn_and_Garde|I purchased the L...|
|Patio_Lawn_and_Garde|Never used a manu...|
|Patio_Lawn_and_Garde|Good price. Good ...|
|Patio_Lawn_and_Garde|I have owned the ...|
|Patio_Lawn_and_Garde|I had "won" a sim...|
|Patio_Lawn_and_Garde|The birds ate all...|
|Patio_Lawn_and_Garde|Bought last summe...|
|Patio_Lawn_and_Garde|I knew I had a mo...|
|Patio_Lawn_and_Garde|I was a little wo...|
|Patio_Lawn_and_Garde|I have used this ...|
|Patio_Lawn_and_Garde|I actually do not...|
|Patio_Lawn_and_Garde|Just what 

In [10]:
# Tokenize the text column using regular expressions
tokenized_df = df_sel.withColumn("tokenized_words", split(df["reviewText"], r"[ \t\d(){}\[\].!?,;:+=\-_\"'`~#@&*%€$§\\/]+"))
tokenized_df.show()

+--------------------+--------------------+--------------------+
|            category|          reviewText|     tokenized_words|
+--------------------+--------------------+--------------------+
|Patio_Lawn_and_Garde|This was a gift f...|[This, was, a, gi...|
|Patio_Lawn_and_Garde|This is a very ni...|[This, is, a, ver...|
|Patio_Lawn_and_Garde|The metal base wi...|[The, metal, base...|
|Patio_Lawn_and_Garde|For the most part...|[For, the, most, ...|
|Patio_Lawn_and_Garde|This hose is supp...|[This, hose, is, ...|
|Patio_Lawn_and_Garde|This tool works v...|[This, tool, work...|
|Patio_Lawn_and_Garde|This product is a...|[This, product, i...|
|Patio_Lawn_and_Garde|I was excited to ...|[I, was, excited,...|
|Patio_Lawn_and_Garde|I purchased the L...|[I, purchased, th...|
|Patio_Lawn_and_Garde|Never used a manu...|[Never, used, a, ...|
|Patio_Lawn_and_Garde|Good price. Good ...|[Good, price, Goo...|
|Patio_Lawn_and_Garde|I have owned the ...|[I, have, owned, ...|
|Patio_Lawn_and_Garde|I h

In [11]:
# Create a CountVectorizer instance
cv = CountVectorizer(inputCol='tokenized_words', outputCol='token_counts')

# Apply the CountVectorizer to the tokenized DataFrame
cv_model = cv.fit(tokenized_df)
count_vectorized_df = cv_model.transform(tokenized_df)

count_vectorized_df.show()

#cv_model.getParam()

# Get the vocabulary and token counts
# vocabulary = cv_model.vocabulary
# token_counts = cv_model.transform(tokenized_df).select('token_counts').collect()[0].token_counts

# Convert token counts to a dictionary
# token_count_pairs = dict(zip(vocabulary, token_counts))

# # Print token-count pairs
# for token, count in token_count_pairs.items():
#     print(token, count)

# Show the resulting DataFrame with token counts
#count_vectorized_df.show(truncate=False)

+--------------------+--------------------+--------------------+--------------------+
|            category|          reviewText|     tokenized_words|        token_counts|
+--------------------+--------------------+--------------------+--------------------+
|Patio_Lawn_and_Garde|This was a gift f...|[This, was, a, gi...|(123970,[0,1,3,4,...|
|Patio_Lawn_and_Garde|This is a very ni...|[This, is, a, ver...|(123970,[0,1,3,4,...|
|Patio_Lawn_and_Garde|The metal base wi...|[The, metal, base...|(123970,[0,1,3,4,...|
|Patio_Lawn_and_Garde|For the most part...|[For, the, most, ...|(123970,[0,1,2,3,...|
|Patio_Lawn_and_Garde|This hose is supp...|[This, hose, is, ...|(123970,[0,1,2,3,...|
|Patio_Lawn_and_Garde|This tool works v...|[This, tool, work...|(123970,[0,1,2,3,...|
|Patio_Lawn_and_Garde|This product is a...|[This, product, i...|(123970,[0,1,3,4,...|
|Patio_Lawn_and_Garde|I was excited to ...|[I, was, excited,...|(123970,[0,1,2,3,...|
|Patio_Lawn_and_Garde|I purchased the L...|[I, purchas

<bound method _CountVectorizerParams.getMaxDF of CountVectorizerModel: uid=CountVectorizer_91c85590022d, vocabularySize=123970>

In [None]:
# Split the data into training, validation, and test sets
train_ratio = 0.7
val_ratio = 0.2
test_ratio = 0.1
seed = 42

train_data, val_data, test_data = count_vectorized_df.randomSplit([train_ratio, val_ratio, test_ratio], seed=seed)

# Set the feature column and label column
feature_column = 'token_counts'
label_column = 'label'

# Create the SVM classifier
svm = LinearSVC(featuresCol='selected_features')

# Create the Normalizer for vector length normalization
normalizer = Normalizer(inputCol=feature_column, outputCol='normalized_features')

# Create the ChiSqSelector for feature selection
selector = ChiSqSelector(numTopFeatures=2000, outputCol='category', labelCol=label_column)

# Create the pipeline
pipeline = Pipeline(stages=[normalizer, selector, svm])

# Define the parameter grid for grid search
param_grid = ParamGridBuilder() \
    .addGrid(svm.regParam, [0.1, 0.01, 0.001]) \
    .addGrid(svm.standardization, [True, False]) \
    .addGrid(svm.maxIter, [10, 100]) \
    .build()

# Create the evaluator
evaluator = MulticlassClassificationEvaluator(labelCol=label_column, metricName='f1')

# Create the TrainValidationSplit for parameter optimization
tvs = TrainValidationSplit(estimator=pipeline,
                           estimatorParamMaps=param_grid,
                           evaluator=evaluator,
                           trainRatio=train_ratio,
                           seed=seed)

# Train the model and select the best model using TrainValidationSplit
model = tvs.fit(train_data)

# Evaluate the model on the test set
predictions = model.transform(test_data)
f1_score = evaluator.evaluate(predictions)

# Print the F1 score on the test set
print("F1 Score on the test set: {:.4f}".format(f1_score))
