In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

import matplotlib.pyplot as plt
%matplotlib inline

In [2]:
#create Spark session
spark = SparkSession.builder.appName('Stackoverflow_Project').getOrCreate()

#change configuration settings on Spark 
conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '5g'), 
                                        ('spark.app.name', 'Spark Updated Conf'), 
                                        ('spark.executor.cores', '4'), 
                                        ('spark.cores.max', '4'), 
                                        ('spark.driver.memory','8g')])

In [3]:
spark = (
    SparkSession.builder
    .appName("Stackoverflow_Project")
    .config("spark.yarn.maxAppAttempts", "4")  # Allow Spark job to retry
    .config("spark.yarn.am.attemptFailuresValidityInterval", "1h")  # Time window for retries
    .config("spark.task.maxFailures", "4")  # Retry failed tasks
    .config("spark.executor.instances", "5")  # Adjust to ensure redundancy
    .config("spark.dynamicAllocation.enabled", "true")
    .config("spark.dynamicAllocation.minExecutors", "2")
    .config("spark.dynamicAllocation.maxExecutors", "10")
    .getOrCreate()
)

#change configuration settings on Spark 
conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '5g'), 
                                        ('spark.app.name', 'Spark Updated Conf'), 
                                        ('spark.executor.cores', '4'), 
                                        ('spark.cores.max', '4'), 
                                        ('spark.driver.memory','10g')])

In [4]:
#Read the cleaned and pre-processed data from the GCS bucket
df = spark.read \
    .option("quote", "\"")  \
    .option("escape", "\"") \
    .option("ignoreLeadingWhiteSpace",True) \
    .parquet("gs://msca-bdp-student-gcs/Group6/extracted_StackOverflow.parquet",inferSchema=True, header=True )

                                                                                

In [5]:
#Data is at answers level, aggregate to get at post level
df_2 = df.select('post_body_text','post_tags') \
         .groupBy('post_body_text','post_tags').count()

df_2 = df_2.withColumnRenamed('count', 'Count of Answers')
df_2.show(5)

                                                                                

+--------------------+--------------------+----------------+
|      post_body_text|           post_tags|Count of Answers|
+--------------------+--------------------+----------------+
| whats the availa...|php|linux|ms-word...|              10|
| what is the best...|      c#|collections|              15|
| i d like to run ...|java|algorithm|jv...|              10|
| i m looking for ...|      python|math|3d|               9|
| i have some c co...|java|c|pointers|b...|               6|
+--------------------+--------------------+----------------+
only showing top 5 rows



In [6]:
df_2 = df_2.repartition(40)

In [7]:
df_2 = df_2.filter(df_2["post_body_text"].isNotNull())
df_2 = df_2.dropDuplicates(["post_body_text"])
df_2.count()

                                                                                

367336

In [8]:
from pyspark.sql.functions import split, explode, col, lit, array_contains

#Split tags into an array
df_2 = df_2.withColumn("tags_array", split(col("post_tags"), "\|"))

#Explode tags and count frequencies
exploded_df = df_2.select('post_body_text','post_tags',explode(col("tags_array")).alias("tag"))
tag_counts = exploded_df.groupBy("tag").count().orderBy(col("count").desc())

In [9]:
exploded_df.show(5)



+--------------------+--------------------+-------------------+
|      post_body_text|           post_tags|                tag|
+--------------------+--------------------+-------------------+
| a beginner quest...|build-process|dev...|      build-process|
| a beginner quest...|build-process|dev...|development-process|
| a few years ago ...|                perl|               perl|
| a friend told me...|c++|performance|c...|                c++|
| a friend told me...|c++|performance|c...|        performance|
+--------------------+--------------------+-------------------+
only showing top 5 rows



                                                                                

In [10]:
#Select top 50 tags
top_50_tags = tag_counts.limit(50).select("tag").rdd.flatMap(lambda x: x).collect()

                                                                                

In [11]:
#Filter data for top 50 tags
exploded_df_filtered = exploded_df.filter(col('tag').isin(top_50_tags))
exploded_df_filtered.count()

                                                                                

502985

In [12]:
#Pivot data
exploded_df_filtered = exploded_df_filtered.groupBy("post_body_text","post_tags").pivot("tag").count()
exploded_df_filtered.printSchema()



root
 |-- post_body_text: string (nullable = true)
 |-- post_tags: string (nullable = true)
 |-- .net: long (nullable = true)
 |-- ajax: long (nullable = true)
 |-- algorithm: long (nullable = true)
 |-- android: long (nullable = true)
 |-- arrays: long (nullable = true)
 |-- asp.net: long (nullable = true)
 |-- asp.net-mvc: long (nullable = true)
 |-- bash: long (nullable = true)
 |-- c: long (nullable = true)
 |-- c#: long (nullable = true)
 |-- c++: long (nullable = true)
 |-- class: long (nullable = true)
 |-- css: long (nullable = true)
 |-- database: long (nullable = true)
 |-- eclipse: long (nullable = true)
 |-- function: long (nullable = true)
 |-- html: long (nullable = true)
 |-- ios: long (nullable = true)
 |-- iphone: long (nullable = true)
 |-- java: long (nullable = true)
 |-- javascript: long (nullable = true)
 |-- jquery: long (nullable = true)
 |-- json: long (nullable = true)
 |-- language-agnostic: long (nullable = true)
 |-- linq: long (nullable = true)
 |-- linux:

                                                                                

In [13]:
#The dot character triggers an error when used in column names. So rename these columns.
exploded_df_filtered = exploded_df_filtered.withColumnRenamed('.net', 'dot_net')
exploded_df_filtered = exploded_df_filtered.withColumnRenamed('asp.net-mvc', 'asp_dot_net-mvc')
exploded_df_filtered = exploded_df_filtered.withColumnRenamed('asp.net', 'asp_dot_net')
exploded_df_filtered = exploded_df_filtered.withColumnRenamed('vb.net', 'vb_dot_net')

In [14]:
#Also rename in tags list
top_50_tags.remove('.net')
top_50_tags.remove('asp.net-mvc')
top_50_tags.remove('asp.net')
top_50_tags.remove('vb.net')
top_50_tags = top_50_tags + ['asp_dot_net','asp_dot_net-mvc','vb_dot_net','dot_net']
top_50_tags

['java',
 'c#',
 'javascript',
 'php',
 'c++',
 'jquery',
 'html',
 'python',
 'css',
 'android',
 'c',
 'sql',
 'mysql',
 'arrays',
 'string',
 'sql-server',
 'iphone',
 'ios',
 'regex',
 'objective-c',
 'algorithm',
 'ruby',
 'performance',
 'database',
 'linux',
 'ruby-on-rails',
 'windows',
 'list',
 'multithreading',
 'oop',
 'bash',
 'eclipse',
 'ajax',
 'perl',
 'json',
 'pointers',
 'visual-studio',
 'xml',
 'winforms',
 'linq',
 'function',
 'class',
 'tsql',
 'wpf',
 'xcode',
 'language-agnostic',
 'asp_dot_net',
 'asp_dot_net-mvc',
 'vb_dot_net',
 'dot_net']

In [15]:
#Since there are lot of null values because of pivoting the df, fill null values with 0s
exploded_df_filtered = exploded_df_filtered.na.fill(value = 0)
exploded_df_filtered.show(5)

24/12/02 01:26:40 WARN org.apache.spark.sql.catalyst.util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

+--------------------+--------------------+-------+----+---------+-------+------+-----------+---------------+----+---+---+---+-----+---+--------+-------+--------+----+---+------+----+----------+------+----+-----------------+----+-----+----+--------------+-----+-----------+---+-----------+----+---+--------+------+-----+----+-------------+---+----------+------+----+----------+-------------+-------+--------+---+-----+---+
|      post_body_text|           post_tags|dot_net|ajax|algorithm|android|arrays|asp_dot_net|asp_dot_net-mvc|bash|  c| c#|c++|class|css|database|eclipse|function|html|ios|iphone|java|javascript|jquery|json|language-agnostic|linq|linux|list|multithreading|mysql|objective-c|oop|performance|perl|php|pointers|python|regex|ruby|ruby-on-rails|sql|sql-server|string|tsql|vb_dot_net|visual-studio|windows|winforms|wpf|xcode|xml|
+--------------------+--------------------+-------+----+---------+-------+------+-----------+---------------+----+---+---+---+-----+---+--------+-------+-

                                                                                

### Create pipeline for feature engineering/data transformation

In [16]:
#Spark ML imports
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#Tokenize the data into words
tokenizer = Tokenizer(inputCol="post_body_text", outputCol="Words")

#Remove stop words
remove_stopwords = StopWordsRemover(inputCol="Words", outputCol="Filtered_Words")

#HashingTF
hashing_tf = HashingTF(inputCol="Filtered_Words", outputCol="Hashing_TF_Features")

#IDF
idf = IDF(inputCol="Hashing_TF_Features", outputCol="Hashing_TFIDF_Features")

#Creating a pipeline to transform the data and prepare it for the model
pipeline = Pipeline(stages=[tokenizer, remove_stopwords, hashing_tf, idf])

In [17]:
#Fit and transform the data using the pipeline
pipeline_final = pipeline.fit(exploded_df_filtered)
model_df = pipeline_final.transform(exploded_df_filtered)
model_df.printSchema()

                                                                                

root
 |-- post_body_text: string (nullable = true)
 |-- post_tags: string (nullable = true)
 |-- dot_net: long (nullable = true)
 |-- ajax: long (nullable = true)
 |-- algorithm: long (nullable = true)
 |-- android: long (nullable = true)
 |-- arrays: long (nullable = true)
 |-- asp_dot_net: long (nullable = true)
 |-- asp_dot_net-mvc: long (nullable = true)
 |-- bash: long (nullable = true)
 |-- c: long (nullable = true)
 |-- c#: long (nullable = true)
 |-- c++: long (nullable = true)
 |-- class: long (nullable = true)
 |-- css: long (nullable = true)
 |-- database: long (nullable = true)
 |-- eclipse: long (nullable = true)
 |-- function: long (nullable = true)
 |-- html: long (nullable = true)
 |-- ios: long (nullable = true)
 |-- iphone: long (nullable = true)
 |-- java: long (nullable = true)
 |-- javascript: long (nullable = true)
 |-- jquery: long (nullable = true)
 |-- json: long (nullable = true)
 |-- language-agnostic: long (nullable = true)
 |-- linq: long (nullable = true)


In [18]:
#Remove duplicates
model_df = model_df.dropDuplicates(["post_body_text"])
model_df = model_df.filter(model_df["post_body_text"].isNotNull())
model_df = model_df.filter(model_df["post_tags"].isNotNull())
model_df.count()

                                                                                

318649

In [19]:
# Split the data into train (70%), test (20%), and validation (10%) sets
train_df, test_df, val_df = model_df.randomSplit([0.7, 0.2, 0.1], seed=11)

### Load the models from GCS for inference

In [20]:
loaded_models = []

for i in range(0,50):
    tag = top_50_tags[i]

    # Path to save the model in GCS
    gcs_model_path = f'gs://msca-bdp-student-gcs/Group6/Tag_classification_models/Log_Reg_{tag}'

    # Save the trained logistic regression model
    loaded_model = LogisticRegressionModel.load(gcs_model_path)
    
    loaded_models.append({
        "tag": tag,
        "Log_reg_htfidf_model": loaded_model})

24/12/02 01:28:14 WARN org.apache.hadoop.util.concurrent.ExecutorHelper: Thread (Thread[GetFileInfo #1,5,main]) interrupted: 
java.lang.InterruptedException
	at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:510)
	at com.google.common.util.concurrent.FluentFuture$TrustedFuture.get(FluentFuture.java:88)
	at org.apache.hadoop.util.concurrent.ExecutorHelper.logThrowableFromAfterExecute(ExecutorHelper.java:48)
	at org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor.afterExecute(HadoopThreadPoolExecutor.java:90)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1157)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)


In [21]:
loaded_models[0]

{'tag': 'java',
 'Log_reg_htfidf_model': LogisticRegressionModel: uid=LogisticRegression_1480097e889f, numClasses=2, numFeatures=262144}

### Inference - Validation Data

In [22]:
#Remove duplicates
val_df = val_df.dropDuplicates(["post_body_text"])
val_df = val_df.filter(val_df["post_body_text"].isNotNull())
val_df = val_df.filter(val_df["post_tags"].isNotNull())
val_df.count()

24/12/02 01:28:32 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 4.2 MiB
                                                                                

31965

In [23]:
results_validation_data = val_df.select("post_body_text","post_tags")
results_validation_data.show(5)

24/12/02 01:29:04 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 4.2 MiB


+--------------------+--------------------+
|      post_body_text|           post_tags|
+--------------------+--------------------+
| all thanks for t...|objective-c|array...|
| am newbie in lar...|         php|laravel|
| am using it in w...|jquery|wordpress|...|
| are there any go...| java|python|parsing|
| as the question ...|jquery|drop-down-...|
+--------------------+--------------------+
only showing top 5 rows



                                                                                

In [24]:
results_validation_data.count()

24/12/02 01:29:14 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 4.2 MiB
                                                                                

31965

In [25]:
predictions_tags = []

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

for i in range(0,50):
    
    #Get the tag and model for the tag
    tag = loaded_models[i]['tag']
    Log_reg_model = loaded_models[i]['Log_reg_htfidf_model']
    
    #Predict using model
    predictions = Log_reg_model.transform(val_df)
    
    #Evaluate and calculate metrics 
    eval1 = MulticlassClassificationEvaluator(labelCol = tag, predictionCol = "prediction")
    accuracy = eval1.evaluate(predictions, {eval1.metricName: "accuracy"})
    f1_score = eval1.evaluate(predictions, {eval1.metricName: "f1"})
    
    #Save the predictions
    predictions_column = predictions.select("post_body_text",col("prediction").alias(tag))
    predictions_column = predictions_column.withColumnRenamed("post_body_text",f"post_body_text_{i}")
    
    predictions_tags.append({
        "tag":tag,
        "predictions_column":predictions_column,
        "count_rows":predictions_column.count(),
        "accuracy":accuracy,
        "f1_score":f1_score
    })

24/12/02 01:34:45 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/02 01:35:40 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/02 01:36:09 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 4.2 MiB
24/12/02 01:36:34 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/02 01:37:04 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/02 01:37:29 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 4.2 MiB
24/12/02 01:37:52 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/02 01:38:27 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/02 01:38:53 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary w

In [None]:
predictions_tags

[{'tag': 'java',
  'predictions_column': DataFrame[post_body_text_0: string, java: double],
  'count_rows': 31965,
  'accuracy': 0.879600238851001,
  'f1_score': 0.8679389401111208},
 {'tag': 'c#',
  'predictions_column': DataFrame[post_body_text_1: string, c#: double],
  'count_rows': 31855,
  'accuracy': 0.8687890882805871,
  'f1_score': 0.8515350977618873},
 {'tag': 'javascript',
  'predictions_column': DataFrame[post_body_text_2: string, javascript: double],
  'count_rows': 31855,
  'accuracy': 0.8974826361607844,
  'f1_score': 0.8836549453457717},
 {'tag': 'php',
  'predictions_column': DataFrame[post_body_text_3: string, php: double],
  'count_rows': 31855,
  'accuracy': 0.924070523900814,
  'f1_score': 0.916029075649772},
 {'tag': 'c++',
  'predictions_column': DataFrame[post_body_text_4: string, c++: double],
  'count_rows': 32032,
  'accuracy': 0.9201420534900531,
  'f1_score': 0.9065470455473275},
 {'tag': 'jquery',
  'predictions_column': DataFrame[post_body_text_5: string, 

In [29]:
predictions_tags[0]["tag"]

'java'

In [30]:
from pyspark.sql import Row

#Convert results to PySpark DataFrame
metrics_data = sc.parallelize([
    Row(
        tag=predictions_tag["tag"],
        accuracy=predictions_tag["accuracy"],
        f1_score=predictions_tag["f1_score"]
    )
    for predictions_tag in predictions_tags
])

metrics_df = spark.createDataFrame(metrics_data)
metrics_df.show(50,truncate=False)

                                                                                

+-----------------+------------------+------------------+
|tag              |accuracy          |f1_score          |
+-----------------+------------------+------------------+
|java             |0.879600238851001 |0.8679389401111208|
|c#               |0.8687890882805871|0.8515350977618873|
|javascript       |0.8974826361607844|0.8836549453457717|
|php              |0.924070523900814 |0.916029075649772 |
|c++              |0.9201420534900531|0.9065470455473275|
|jquery           |0.9335302806499262|0.925943814016528 |
|html             |0.9335931361764983|0.922778410290219 |
|python           |0.9469185078097991|0.9412964652387685|
|css              |0.9626952449794148|0.9595620259131539|
|android          |0.9662151544674565|0.9628272178577086|
|c                |0.9492127345296836|0.9400854502573984|
|sql              |0.9585782079889374|0.9536426938740109|
|mysql            |0.9636380778779974|0.9566253716525295|
|arrays           |0.9668122819698922|0.9603931832177421|
|string       

In [44]:
evaluation_metrics_df = metrics_df.toPandas()
evaluation_metrics_df.head()

Unnamed: 0,tag,accuracy,f1_score
0,java,0.8796,0.867939
1,c#,0.868789,0.851535
2,javascript,0.897483,0.883655
3,php,0.924071,0.916029
4,c++,0.920142,0.906547


In [46]:
evaluation_metrics_df.to_csv('evaluation_metrics_df.csv',index=False)

In [47]:
!gsutil -m cp 'evaluation_metrics_df.csv' 'gs://msca-bdp-student-gcs/Group6/Tag_classification_inference/'

Copying file://evaluation_metrics_df.csv [Content-Type=text/csv]...
/ [1/1 files][  2.2 KiB/  2.2 KiB] 100% Done                                    
Operation completed over 1 objects/2.2 KiB.                                      
