<a href="https://colab.research.google.com/github/sudoice/Unit-4-Citation-Search-Using-Apache-Spark/blob/main/citation_search.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, StopWordsRemover, VectorAssembler
from pyspark.ml.classification import LogisticRegression, NaiveBayes, LinearSVC, GBTClassifier
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier, MultilayerPerceptronClassifier
from pyspark.ml import Pipeline
from pyspark.sql.functions import col, when, split, udf, abs
from pyspark.sql.types import IntegerType, DoubleType, ArrayType, StringType
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.mllib.evaluation import BinaryClassificationMetrics
import pyspark.sql.functions as F
import numpy as np

In [None]:
# Configuration parameters
NODE_INFO_FILENAME = "node_information.csv"
TRAINING_SET_FILENAME = "training_set.txt"
TESTING_SET_FILENAME = "testing_set.txt"
GROUND_TRUTH_FILENAME = "Cit-HepTh.txt"
INFO_DATAFRAME_PORTION = 0.1
LOGISTIC_REGRESSION_ITERATIONS = 100

In [None]:
# Define helper functions
def get_publication_year_difference(year_from, year_to):
    """Calculate absolute difference between publication years"""
    if year_from is None or year_to is None:
        return 0
    try:
        # Convert string values to integers before subtraction
        year_from_int = int(year_from)
        year_to_int = int(year_to)
        return abs(year_from_int - year_to_int)
    except (ValueError, TypeError):
        return 0

def is_published_on_same_journal(journal_a, journal_b):
    """Check if two journals are the same"""
    if journal_a == journal_b:
        return 1
    else:
        return 0

def count_common_words(text_a, text_b):
    """Count common words between two text sequences"""
    if text_a is None or text_b is None:
        return 0
    else:
        return len(set(text_a).intersection(set(text_b)))

In [None]:
# Register UDFs with proper return types
count_common_words_udf = udf(count_common_words, IntegerType())
is_same_journal_udf = udf(is_published_on_same_journal, IntegerType())
publication_year_difference_udf = udf(get_publication_year_difference, IntegerType())
to_double_udf = udf(lambda i: 1.0 if i == 1 else 0.0, DoubleType())

In [None]:
def get_info_dataframe(filename):
    column_names = ["srcId", "year", "title", "authors", "journal", "abstract"]
    return spark.read.option("header", "false").csv(filename).toDF(*column_names)

In [None]:
# Function to preprocess the data
def pre_process(dataframe):
    # Create tokenizers
    abstract_tokenizer = Tokenizer(inputCol="abstract", outputCol="abstract_tokens_raw")
    title_tokenizer = Tokenizer(inputCol="title", outputCol="title_tokens_raw")

    # Create stopword removers
    abstract_stop_words_remover = StopWordsRemover(
        inputCol="abstract_tokens_raw",
        outputCol="abstract_tokens_clean"
    )
    title_stop_words_remover = StopWordsRemover(
        inputCol="title_tokens_raw",
        outputCol="title_tokens_clean"
    )

    # Fill NA values and split authors
    transformed_dataframe = dataframe.na.fill({
        "abstract": "",
        "title": "",
        "authors": "",
        "journal": ""
    })
    transformed_dataframe = transformed_dataframe.withColumn(
        "authors_tokens_raw",
        split(col("authors"), ",")
    )

    # Set up pipeline stages
    stages = [
        abstract_tokenizer,
        abstract_stop_words_remover,
        title_tokenizer,
        title_stop_words_remover
    ]

    # Build the pipeline
    pipeline = Pipeline(stages=stages)

    # Run the pipeline
    return pipeline.fit(transformed_dataframe).transform(transformed_dataframe)

In [None]:
# Function to load training data
def get_training_dataframe(filename):
    # Read text file and map to DataFrame
    return spark.read.text(filename).rdd \
        .map(lambda row: row.value.split(" ")) \
        .map(lambda fields: (fields[0], fields[1], int(fields[2]))) \
        .toDF(["srcId", "dstId", "label"])

In [None]:
# Function to load testing data
def get_testing_dataframe(filename):
    # Read text file and map to DataFrame
    return spark.read.text(filename).rdd \
        .map(lambda row: row.value.split(" ")) \
        .map(lambda fields: (fields[0], fields[1])) \
        .toDF(["srcId", "dstId"])

In [None]:
def get_ground_truth_dataframe(filename):
    # Read text file and map to DataFrame
    return spark.read.text(filename).rdd \
        .map(lambda row: row.value.split("\t")) \
        .map(lambda fields: (fields[0], fields[1])) \
        .toDF(["srcId", "dstId"])

In [None]:
# Function to add labels to test data
def add_labels_to_test_dataframe(testing_dataframe, ground_truth_dataframe):
    # Join test data with ground truth to get labels
    return testing_dataframe.alias("a").join(
        ground_truth_dataframe.alias("b"),
        (col("a.srcId") == col("b.srcId")) & (col("a.dstId") == col("b.dstId")),
        "left"
    ).withColumn(
        "label",
        when(col("b.srcId").isNull(), 0).otherwise(1)
    ).drop(
        col("b.srcId")
    ).drop(
        col("b.dstId")
    )

In [None]:
# Function to join dataframes
def join_dataframes(training_dataframe, info_dataframe):
    # First join to get source paper info
    first_join = training_dataframe.alias("a").join(
        info_dataframe.alias("b"),
        col("a.srcId") == col("b.srcId")
    ).select(
        col("a.srcId"),
        col("a.dstId"),
        col("a.label"),
        col("b.year"),
        col("b.title_tokens_clean"),
        col("b.authors_tokens_raw"),
        col("b.journal"),
        col("b.abstract_tokens_clean")
    ).withColumnRenamed("srcId", "id_from") \
     .withColumnRenamed("dstId", "id_to") \
     .withColumnRenamed("year", "year_from") \
     .withColumnRenamed("title_tokens_clean", "title_from") \
     .withColumnRenamed("authors_tokens_raw", "authors_from") \
     .withColumnRenamed("journal", "journal_from") \
     .withColumnRenamed("abstract_tokens_clean", "abstract_from")

    # Second join to get target paper info
    second_join = first_join.alias("a").join(
        info_dataframe.alias("b"),
        col("a.id_to") == col("b.srcId")
    ).select(
        "id_from", "id_to", "label",
        "year_from", "title_from", "authors_from", "journal_from", "abstract_from",
        col("b.year").alias("year_to"),
        col("b.title_tokens_clean").alias("title_to"),
        col("b.authors_tokens_raw").alias("authors_to"),
        col("b.journal").alias("journal_to"),
        col("b.abstract_tokens_clean").alias("abstract_to")
    )

    return second_join

In [None]:
def get_final_dataframe(joined_dataframe):
    # Add feature columns
    feature_df = joined_dataframe.withColumn(
        "common_title_words",
        count_common_words_udf(col("title_from"), col("title_to"))
    ).withColumn(
        "common_authors",
        count_common_words_udf(col("authors_from"), col("authors_to"))
    ).withColumn(
        "common_abstract_words",
        count_common_words_udf(col("abstract_from"), col("abstract_to"))
    ).withColumn(
        "publication_year_difference",
        publication_year_difference_udf("year_from", "year_to")
    ).withColumn(
        "is_same_journal",
        is_same_journal_udf(col("journal_from"), col("journal_to"))
    ).withColumn(
        "label",
        to_double_udf(col("label"))
    ).select(
        "label",
        "common_title_words",
        "common_authors",
        "common_abstract_words",
        "publication_year_difference",
        "is_same_journal"
    )

    # Create feature vector using VectorAssembler
    assembler = VectorAssembler(
        inputCols=[
            "common_title_words",
            "common_authors",
            "common_abstract_words",
            "publication_year_difference",
            "is_same_journal"
        ],
        outputCol="features"
    )

    # Apply the assembler and drop null values
    return assembler.transform(feature_df).na.drop()

In [None]:
def calculate_metrics_svm(predictions):
    # Count for confusion matrix and derived metrics
    tp = predictions.filter((col("prediction") == 1.0) & (col("label") == 1.0)).count()
    fp = predictions.filter((col("prediction") == 1.0) & (col("label") == 0.0)).count()
    tn = predictions.filter((col("prediction") == 0.0) & (col("label") == 0.0)).count()
    fn = predictions.filter((col("prediction") == 0.0) & (col("label") == 1.0)).count()

    # Calculate metrics for positive class (class 1)
    precision1 = tp / (tp + fp) if (tp + fp) > 0 else 0.0
    recall1 = tp / (tp + fn) if (tp + fn) > 0 else 0.0
    f1_score1 = 2 * precision1 * recall1 / (precision1 + recall1) if (precision1 + recall1) > 0 else 0.0

    print("\nMetrics for positive class:")
    print(f"Precision: {precision1}")
    print(f"Recall: {recall1}")
    print(f"F1-Score: {f1_score1}")

    # Calculate metrics for negative class (class 0)
    precision0 = tn / (tn + fn) if (tn + fn) > 0 else 0.0
    recall0 = tn / (tn + fp) if (tn + fp) > 0 else 0.0
    f1_score0 = 2 * precision0 * recall0 / (precision0 + recall0) if (precision0 + recall0) > 0 else 0.0

    print("\nMetrics for negative class:")
    print(f"Precision: {precision0}")
    print(f"Recall: {recall0}")
    print(f"F1-Score: {f1_score0}")

    # Print confusion matrix
    print("\nConfusion Matrix:")
    print(f"TP: {tp}, FP: {fp}")
    print(f"FN: {fn}, TN: {tn}")

    # Overall accuracy
    accuracy = (tp + tn) / (tp + tn + fp + fn)
    print(f"\nOverall Accuracy: {accuracy}")

    return accuracy

In [None]:
def calculate_metrics_mlp(predictions):
    # Count for confusion matrix
    tp = predictions.filter((col("prediction") == 1.0) & (col("label") == 1.0)).count()
    fp = predictions.filter((col("prediction") == 1.0) & (col("label") == 0.0)).count()
    tn = predictions.filter((col("prediction") == 0.0) & (col("label") == 0.0)).count()
    fn = predictions.filter((col("prediction") == 0.0) & (col("label") == 1.0)).count()

    # Calculate accuracy
    accuracy = (tp + tn) / (tp + tn + fp + fn)
    return accuracy

In [None]:
from pyspark.sql.functions import col, when, udf
from pyspark.sql.types import DoubleType
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegressionModel
import numpy as np

# Function to calculate metrics for probability-based models using ROC Curve
def calculate_metrics(predictions):
    # Extract probability of class 1
    get_prob1 = udf(lambda v: float(v[1]), DoubleType())
    predictions = predictions.withColumn("prob1", get_prob1(col("probability")))

    # Use built-in evaluator to get ROC AUC
    evaluator = BinaryClassificationEvaluator(rawPredictionCol="probability", labelCol="label", metricName="areaUnderROC")
    roc_auc = evaluator.evaluate(predictions)
    print(f"Area under ROC curve: {roc_auc}")

    # Manually create thresholds between 0.0 and 1.0 with a step size (for efficiency)
    thresholds = np.arange(0.0, 1.0, 0.1)  # 0.0 to 1.0 in 0.01 steps

    best_f1_score = 0.0
    optimal_threshold = 0.5

    f1_scores = []

    for threshold in thresholds:
        # Apply threshold to get predictions
        pred_thresh = predictions.withColumn(
            "prediction_optimal", when(col("prob1") >= threshold, 1.0).otherwise(0.0)
        )

        tp = pred_thresh.filter((col("prediction_optimal") == 1.0) & (col("label") == 1.0)).count()
        fp = pred_thresh.filter((col("prediction_optimal") == 1.0) & (col("label") == 0.0)).count()
        fn = pred_thresh.filter((col("prediction_optimal") == 0.0) & (col("label") == 1.0)).count()

        precision = tp / (tp + fp) if (tp + fp) > 0 else 0.0
        recall = tp / (tp + fn) if (tp + fn) > 0 else 0.0
        f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0.0

        f1_scores.append((threshold, f1))  # Store F1 score for this threshold

        if f1 > best_f1_score:
            best_f1_score = f1
            optimal_threshold = threshold

    # Plot the F1 scores to see the best threshold
    best_f1_threshold = max(f1_scores, key=lambda x: x[1])
    print(f"\nOptimal threshold based on best F1-Score: {best_f1_threshold[0]}")
    print(f"Best F1-Score: {best_f1_threshold[1]}")

    # Final metrics using optimal threshold
    final_pred = predictions.withColumn(
        "prediction_optimal", when(col("prob1") >= best_f1_threshold[0], 1.0).otherwise(0.0)
    )

    tp = final_pred.filter((col("prediction_optimal") == 1.0) & (col("label") == 1.0)).count()
    fp = final_pred.filter((col("prediction_optimal") == 1.0) & (col("label") == 0.0)).count()
    tn = final_pred.filter((col("prediction_optimal") == 0.0) & (col("label") == 0.0)).count()
    fn = final_pred.filter((col("prediction_optimal") == 0.0) & (col("label") == 1.0)).count()

    precision1 = tp / (tp + fp) if (tp + fp) > 0 else 0.0
    recall1 = tp / (tp + fn) if (tp + fn) > 0 else 0.0

    print("\nMetrics for positive class at optimal threshold:")
    print(f"Precision: {precision1}")
    print(f"Recall: {recall1}")

    precision0 = tn / (tn + fn) if (tn + fn) > 0 else 0.0
    recall0 = tn / (tn + fp) if (tn + fp) > 0 else 0.0
    f1_score0 = 2 * precision0 * recall0 / (precision0 + recall0) if (precision0 + recall0) > 0 else 0.0

    print("\nMetrics for negative class at optimal threshold:")
    print(f"Precision: {precision0}")
    print(f"Recall: {recall0}")
    print(f"F1-Score: {f1_score0}")

    print("\nConfusion Matrix at optimal threshold:")
    print(f"TP: {tp}, FP: {fp}")
    print(f"FN: {fn}, TN: {tn}")

    accuracy = (tp + tn) / (tp + tn + fp + fn)
    print(f"\nOverall Accuracy at optimal threshold: {accuracy}")

In [None]:
# Function to generate MLP layer configurations
def generate_layer_configs(input_size, output_size=2, max_hidden_layers=3, max_neurons=16, step=4):
    configs = []

    for num_hidden_layers in range(1, max_hidden_layers + 1):
        neuron_options = list(range(step, max_neurons + 1, step))

        # Generate a limited number of configurations for each layer count
        for i in range(min(5, len(neuron_options)**num_hidden_layers)):
            hidden_layers = []
            for j in range(num_hidden_layers):
                # Simple way to get different layer sizes
                size = neuron_options[(i+j) % len(neuron_options)]
                hidden_layers.append(size)

            configs.append([input_size] + hidden_layers + [output_size])

    return configs

In [None]:
# Main function
def main(model_number=3):
    print("Retrieving DataFrames...")

    # Load and process data
    info_dataframe = pre_process(
        get_info_dataframe(NODE_INFO_FILENAME).sample(withReplacement=False, fraction=INFO_DATAFRAME_PORTION, seed=12345)
    )
    training_dataframe = get_training_dataframe(TRAINING_SET_FILENAME)
    testing_dataframe = get_testing_dataframe(TESTING_SET_FILENAME)
    ground_truth_dataframe = get_ground_truth_dataframe(GROUND_TRUTH_FILENAME)
    labeled_testing_dataframe = add_labels_to_test_dataframe(testing_dataframe, ground_truth_dataframe)

    print("Joining DataFrames...")
    joined_train_dataframe = join_dataframes(training_dataframe, info_dataframe)
    joined_test_dataframe = join_dataframes(labeled_testing_dataframe, info_dataframe)

    final_train_dataframe = get_final_dataframe(joined_train_dataframe)
    final_test_dataframe = get_final_dataframe(joined_test_dataframe)

    # Run different models based on model_number
    if model_number == 0:
        print("Running Logistic Regression classification...\n")
        model = LogisticRegression(
            featuresCol="features",
            labelCol="label",
            predictionCol="prediction",
            probabilityCol="probability",
            maxIter=LOGISTIC_REGRESSION_ITERATIONS
        )

        predictions = model.fit(final_train_dataframe).transform(final_test_dataframe)
        print("Calculating metrics...\n")
        calculate_metrics(predictions)

    elif model_number == 1:
        print("Running Naive Bayes classification...\n")
        model = NaiveBayes(
            featuresCol="features",
            labelCol="label",
            predictionCol="prediction",
            probabilityCol="probability",
            modelType="multinomial",
            smoothing=1.0
        )

        predictions = model.fit(final_train_dataframe).transform(final_test_dataframe)
        print("Calculating metrics...\n")
        calculate_metrics(predictions)

    elif model_number == 2:
        print("Running Linear SVM classification...\n")
        model = LinearSVC(
            featuresCol="features",
            labelCol="label",
            predictionCol="prediction",
            maxIter=10,
            regParam=0.1,
            tol=1e-4
        )

        predictions = model.fit(final_train_dataframe).transform(final_test_dataframe)
        print("Calculating metrics...\n")
        calculate_metrics_svm(predictions)

    elif model_number == 3:
        print("Running Gradient Boosted Trees classification...\n")
        model = GBTClassifier(
            featuresCol="features",
            labelCol="label",
            predictionCol="prediction",
            maxIter=4,
            maxDepth=3,
            stepSize=0.1,
            maxBins=32,
            minInstancesPerNode=1,
            minInfoGain=0.0,
            subsamplingRate=0.8,
            seed=1234
        )

        predictions = model.fit(final_train_dataframe).transform(final_test_dataframe)
        print("Calculating metrics...\n")
        calculate_metrics_svm(predictions)

    elif model_number == 4:
        print("Running Decision Tree classification with dynamic maxDepth tuning...\n")

        model = DecisionTreeClassifier(
            featuresCol="features",
            labelCol="label",
            predictionCol="prediction"
        )

        evaluator = BinaryClassificationEvaluator(
            labelCol="label",
            rawPredictionCol="prediction"
        )

        param_grid = ParamGridBuilder() \
            .addGrid(model.maxDepth, [5, 10, 15, 20]) \
            .build()

        cross_validator = CrossValidator(
            estimator=model,
            estimatorParamMaps=param_grid,
            evaluator=evaluator,
            numFolds=5
        )

        print("Running Cross-Validation for optimal maxDepth...\n")
        cv_model = cross_validator.fit(final_train_dataframe)

        best_model = cv_model.bestModel
        predictions = best_model.transform(final_test_dataframe)

        best_max_depth = best_model.getMaxDepth()
        print(f"Best maxDepth: {best_max_depth}")

        print("Calculating metrics...\n")
        calculate_metrics_svm(predictions)

    elif model_number == 5:
        print("Running Random Forest classification with dynamic maxDepth tuning...\n")

        model = RandomForestClassifier(
            featuresCol="features",
            labelCol="label",
            predictionCol="prediction"
        )

        evaluator = BinaryClassificationEvaluator(
            labelCol="label",
            rawPredictionCol="prediction"
        )

        param_grid = ParamGridBuilder() \
            .addGrid(model.maxDepth, [5, 10, 15, 20]) \
            .build()

        cross_validator = CrossValidator(
            estimator=model,
            estimatorParamMaps=param_grid,
            evaluator=evaluator,
            numFolds=5
        )

        print("Running Cross-Validation for optimal maxDepth...\n")
        cv_model = cross_validator.fit(final_train_dataframe)

        best_model = cv_model.bestModel
        predictions = best_model.transform(final_test_dataframe)

        best_max_depth = best_model.getMaxDepth()
        print(f"Best maxDepth: {best_max_depth}")

        print("Calculating metrics...\n")
        calculate_metrics_svm(predictions)

    else:
        print("Running Multilayer Perceptron classification ...")
        input_size = len(final_train_dataframe.select("features").first()[0])
        layer_options = generate_layer_configs(input_size)

        best_acc = 0.0
        best_layers = []
        best_prediction_df = None

        for layer in layer_options:
            mlp = MultilayerPerceptronClassifier(
                featuresCol="features",
                labelCol="label",
                predictionCol="prediction",
                layers=layer,
                maxIter=100,
                blockSize=128,
                seed=1234
            )

            mod = mlp.fit(final_train_dataframe)
            preds = mod.transform(final_test_dataframe)
            accuracy = calculate_metrics_mlp(preds)

            if accuracy > best_acc:
                best_acc = accuracy
                best_layers = layer
                best_prediction_df = preds

        print(f"Best MLP layers: {best_layers}")
        print("Calculating metrics for best MLP model...\n")
        calculate_metrics_svm(best_prediction_df)

# Initialize Spark session
spark = SparkSession.builder \
    .appName("citation-search") \
    .master("local[*]") \
    .getOrCreate()

# Set log level to reduce noise
spark.sparkContext.setLogLevel("ERROR")

# Run the main function with model_number=3 (GBT Classifier)
main(0)

# Stop Spark session when done
spark.stop()

Retrieving DataFrames...
Joining DataFrames...
Running Logistic Regression classification...

Calculating metrics...

Area under ROC curve: 0.8348714004955994

Optimal threshold based on best F1-Score: 0.4
Best F1-Score: 0.7837837837837838

Metrics for positive class at optimal threshold:
Precision: 0.7107843137254902
Recall: 0.8734939759036144

Metrics for negative class at optimal threshold:
Precision: 0.7961165048543689
Recall: 0.5815602836879432
F1-Score: 0.6721311475409836

Confusion Matrix at optimal threshold:
TP: 145, FP: 59
FN: 21, TN: 82

Overall Accuracy at optimal threshold: 0.739413680781759
