# Import libraries

In [1]:
import re

from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace, split, col, udf
from pyspark.sql.types import IntegerType, StringType
from pyspark.ml import Pipeline, Transformer
from pyspark.ml.linalg import Vectors
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, IDF, ChiSqSelector, StringIndexer, CountVectorizer

# Define paths

In [2]:
dev_file_path = "hdfs:///user/dic25_shared/amazon-reviews/full/reviews_devset.json"
full_file_path = "hdfs:///user/dic25_shared/amazon-reviews/full/reviewscombined.json"

output_file_path = "output_assignment2.txt"

stop_words_path = "stopwords.txt"

# Create spark session

In [3]:
spark = SparkSession.builder.appName("Assignment_2_session").getOrCreate()

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/spark/jars/log4j-slf4j-impl-2.17.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


25/04/30 17:30:17 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/04/30 17:30:17 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
25/04/30 17:30:17 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
25/04/30 17:30:17 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.
25/04/30 17:30:17 WARN Utils: Service 'SparkUI' could not bind on port 4044. Attempting port 4045.
25/04/30 17:30:19 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


# Load file

In [4]:
df_full = spark.read.json(dev_file_path)
#df_full = spark.read.json(full_file_path)

                                                                                

# Select columns category and reviewText

In [5]:
df = df_full.select("reviewText", "category")

# Create custom casefolder

In [6]:
# Define udf
@udf(returnType=StringType())
def casefold_text(text):
    if text is not None:
        return text.casefold()
    else:
        return None

# Create a custom casefolder
class CasefoldTransformer(Transformer, DefaultParamsReadable, DefaultParamsWritable):
    def __init__(self, inputCol="text", outputCol="casefolded_text"):
        super(CasefoldTransformer, self).__init__()
        self.inputCol = inputCol
        self.outputCol = outputCol

    def _transform(self, df):
        return df.withColumn(self.outputCol, casefold_text(df[self.inputCol]))

# Load stopwords

In [7]:
with open(stop_words_path, "r") as stop_words_file:
    stop_words = list(stop_words_file.read().splitlines())

# Define steps

## Casefolding -> Tokenization -> Stopwords removal -> TF-IDF calculation -> Chi square selection (using 2000 top terms overall)

In [8]:
casefolder = CasefoldTransformer(inputCol="reviewText", outputCol="reviewText_casefolded")

tokenizer = RegexTokenizer(inputCol="reviewText_casefolded", outputCol="tokens", 
                           pattern="[ \t\d\(\)\[\]\{\}\.\!\?,;:+=\-_'\"`~#@&\*\%€\$§\\/]+", gaps=True, toLowercase=False)

remover = StopWordsRemover(inputCol = "tokens", outputCol = "filtered_tokens", stopWords = stop_words)

tf = CountVectorizer(inputCol="filtered_tokens", outputCol="rawFeatures")

idf = IDF(inputCol="rawFeatures", outputCol="features")

indexer = StringIndexer(inputCol="category", outputCol="category_index")

selector = ChiSqSelector(numTopFeatures=2000, featuresCol="features",
                         outputCol="selectedFeatures", labelCol="category_index")

# Create a pipeline

In [9]:
pipeline = Pipeline(stages = [casefolder, tokenizer, remover, tf, idf, indexer, selector])

# Apply pipeline on the dataset

In [10]:
model = pipeline.fit(df)

                                                                                

25/04/30 17:31:27 WARN DAGScheduler: Broadcasting large task binary with size 1066.4 KiB


                                                                                

25/04/30 17:31:41 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB


                                                                                

25/04/30 17:31:41 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB


                                                                                

25/04/30 17:31:50 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB


                                                                                

# Extract vocabulary, selected features and save selected terms

In [11]:
vocabulary = model.stages[3].vocabulary

selected_features = model.stages[6].selectedFeatures

selected_terms = sorted([vocabulary[i] for i in selected_features])

with open(output_file_path, "w") as f:
    f.write(" ".join(selected_terms))