In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

import matplotlib.pyplot as plt
%matplotlib inline

In [2]:
#create Spark session
spark = SparkSession.builder.appName('Stackoverflow_Project').getOrCreate()

#change configuration settings on Spark 
conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '5g'), 
                                        ('spark.app.name', 'Spark Updated Conf'), 
                                        ('spark.executor.cores', '4'), 
                                        ('spark.cores.max', '4'), 
                                        ('spark.driver.memory','8g')])

In [1]:
spark = (
    SparkSession.builder
    .appName("Stackoverflow_Project")
    .config("spark.yarn.maxAppAttempts", "4") 
    .config("spark.yarn.am.attemptFailuresValidityInterval", "1h")  
    .config("spark.task.maxFailures", "4")  
    .config("spark.executor.instances", "5")  
    .config("spark.dynamicAllocation.enabled", "true")
    .config("spark.dynamicAllocation.minExecutors", "2")
    .config("spark.dynamicAllocation.maxExecutors", "10")
    .getOrCreate()
)

#change configuration settings on Spark 
conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '5g'), 
                                        ('spark.app.name', 'Spark Updated Conf'), 
                                        ('spark.executor.cores', '4'), 
                                        ('spark.cores.max', '4'), 
                                        ('spark.driver.memory','10g')])

In [3]:
#Read the cleaned and pre-processed data from the GCS bucket
df = spark.read \
    .option("quote", "\"")  \
    .option("escape", "\"") \
    .option("ignoreLeadingWhiteSpace",True) \
    .parquet("gs://msca-bdp-student-gcs/Group6/extracted_StackOverflow.parquet",inferSchema=True, header=True )

                                                                                

In [4]:
#Data is at answers level, aggregate to get at post level
df_2 = df.select('post_body_text','post_tags') \
         .groupBy('post_body_text','post_tags').count()

df_2 = df_2.withColumnRenamed('count', 'Count of Answers')
df_2.show(5)



+--------------------+--------------------+----------------+
|      post_body_text|           post_tags|Count of Answers|
+--------------------+--------------------+----------------+
| i only want to p...|javascript|angularjs|               6|
| i have a table t...|           php|mysql|               7|
| please check the...|     html|css|colors|               5|
| not sure if i am...|          python|gis|               7|
| just learning jq...|javascript|jquery...|               5|
+--------------------+--------------------+----------------+
only showing top 5 rows



                                                                                

In [5]:
df_2 = df_2.repartition(40)

In [6]:
df_2 = df_2.filter(df_2["post_body_text"].isNotNull())
df_2 = df_2.dropDuplicates(["post_body_text"])
df_2.count()

                                                                                

367336

In [7]:
from pyspark.sql.functions import split, explode, col, lit, array_contains

#Split tags into an array
df_2 = df_2.withColumn("tags_array", split(col("post_tags"), "\|"))

#Explode tags and count frequencies
exploded_df = df_2.select('post_body_text','post_tags',explode(col("tags_array")).alias("tag"))
tag_counts = exploded_df.groupBy("tag").count().orderBy(col("count").desc())

In [8]:
exploded_df.show(5)



+--------------------+--------------------+-------------------+
|      post_body_text|           post_tags|                tag|
+--------------------+--------------------+-------------------+
| a beginner quest...|build-process|dev...|      build-process|
| a beginner quest...|build-process|dev...|development-process|
| a few years ago ...|                perl|               perl|
| a friend told me...|c++|performance|c...|                c++|
| a friend told me...|c++|performance|c...|        performance|
+--------------------+--------------------+-------------------+
only showing top 5 rows



                                                                                

In [9]:
#Select top 50 tags
top_50_tags = tag_counts.limit(50).select("tag").rdd.flatMap(lambda x: x).collect()

                                                                                

In [10]:
#Filter data for top 50 tags
exploded_df_filtered = exploded_df.filter(col('tag').isin(top_50_tags))
exploded_df_filtered.count()

                                                                                

502980

In [11]:
#Pivot data
exploded_df_filtered = exploded_df_filtered.groupBy("post_body_text","post_tags").pivot("tag").count()
exploded_df_filtered.printSchema()



root
 |-- post_body_text: string (nullable = true)
 |-- post_tags: string (nullable = true)
 |-- .net: long (nullable = true)
 |-- ajax: long (nullable = true)
 |-- algorithm: long (nullable = true)
 |-- android: long (nullable = true)
 |-- arrays: long (nullable = true)
 |-- asp.net: long (nullable = true)
 |-- asp.net-mvc: long (nullable = true)
 |-- bash: long (nullable = true)
 |-- c: long (nullable = true)
 |-- c#: long (nullable = true)
 |-- c++: long (nullable = true)
 |-- class: long (nullable = true)
 |-- css: long (nullable = true)
 |-- database: long (nullable = true)
 |-- eclipse: long (nullable = true)
 |-- function: long (nullable = true)
 |-- html: long (nullable = true)
 |-- ios: long (nullable = true)
 |-- iphone: long (nullable = true)
 |-- java: long (nullable = true)
 |-- javascript: long (nullable = true)
 |-- jquery: long (nullable = true)
 |-- json: long (nullable = true)
 |-- language-agnostic: long (nullable = true)
 |-- linq: long (nullable = true)
 |-- linux:

                                                                                

In [12]:
#The dot character triggers an error when used in column names. So rename these columns.
exploded_df_filtered = exploded_df_filtered.withColumnRenamed('.net', 'dot_net')
exploded_df_filtered = exploded_df_filtered.withColumnRenamed('asp.net-mvc', 'asp_dot_net-mvc')
exploded_df_filtered = exploded_df_filtered.withColumnRenamed('asp.net', 'asp_dot_net')
exploded_df_filtered = exploded_df_filtered.withColumnRenamed('vb.net', 'vb_dot_net')

In [13]:
#Also rename in tags list
top_50_tags.remove('.net')
top_50_tags.remove('asp.net-mvc')
top_50_tags.remove('asp.net')
top_50_tags.remove('vb.net')
top_50_tags = top_50_tags + ['asp_dot_net','asp_dot_net-mvc','vb_dot_net','dot_net']
top_50_tags

['java',
 'c#',
 'javascript',
 'php',
 'c++',
 'jquery',
 'html',
 'python',
 'css',
 'android',
 'c',
 'sql',
 'mysql',
 'arrays',
 'string',
 'sql-server',
 'iphone',
 'ios',
 'regex',
 'objective-c',
 'algorithm',
 'ruby',
 'performance',
 'database',
 'linux',
 'ruby-on-rails',
 'windows',
 'list',
 'multithreading',
 'oop',
 'bash',
 'eclipse',
 'ajax',
 'perl',
 'json',
 'pointers',
 'visual-studio',
 'xml',
 'winforms',
 'linq',
 'function',
 'class',
 'tsql',
 'wpf',
 'xcode',
 'language-agnostic',
 'asp_dot_net',
 'asp_dot_net-mvc',
 'vb_dot_net',
 'dot_net']

In [14]:
#Since there are lot of null values because of pivoting the df, fill null values with 0s
exploded_df_filtered = exploded_df_filtered.na.fill(value = 0)
exploded_df_filtered.show(5)

24/12/01 19:32:34 WARN org.apache.spark.sql.catalyst.util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

+--------------------+--------------------+-------+----+---------+-------+------+-----------+---------------+----+---+---+---+-----+---+--------+-------+--------+----+---+------+----+----------+------+----+-----------------+----+-----+----+--------------+-----+-----------+---+-----------+----+---+--------+------+-----+----+-------------+---+----------+------+----+----------+-------------+-------+--------+---+-----+---+
|      post_body_text|           post_tags|dot_net|ajax|algorithm|android|arrays|asp_dot_net|asp_dot_net-mvc|bash|  c| c#|c++|class|css|database|eclipse|function|html|ios|iphone|java|javascript|jquery|json|language-agnostic|linq|linux|list|multithreading|mysql|objective-c|oop|performance|perl|php|pointers|python|regex|ruby|ruby-on-rails|sql|sql-server|string|tsql|vb_dot_net|visual-studio|windows|winforms|wpf|xcode|xml|
+--------------------+--------------------+-------+----+---------+-------+------+-----------+---------------+----+---+---+---+-----+---+--------+-------+-

                                                                                

### Create pipeline for feature engineering/data transformation

In [15]:
#Spark ML imports
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#Tokenize the data into words
tokenizer = Tokenizer(inputCol="post_body_text", outputCol="Words")

#Remove stop words
remove_stopwords = StopWordsRemover(inputCol="Words", outputCol="Filtered_Words")

#HashingTF
hashing_tf = HashingTF(inputCol="Filtered_Words", outputCol="Hashing_TF_Features")

#IDF
idf = IDF(inputCol="Hashing_TF_Features", outputCol="Hashing_TFIDF_Features")

#Creating a pipeline to transform the data and prepare it for the model
pipeline = Pipeline(stages=[tokenizer, remove_stopwords, hashing_tf, idf])

In [16]:
#Fit and transform the data using the pipeline
pipeline_final = pipeline.fit(exploded_df_filtered)
model_df = pipeline_final.transform(exploded_df_filtered)
model_df.printSchema()



root
 |-- post_body_text: string (nullable = true)
 |-- post_tags: string (nullable = true)
 |-- dot_net: long (nullable = true)
 |-- ajax: long (nullable = true)
 |-- algorithm: long (nullable = true)
 |-- android: long (nullable = true)
 |-- arrays: long (nullable = true)
 |-- asp_dot_net: long (nullable = true)
 |-- asp_dot_net-mvc: long (nullable = true)
 |-- bash: long (nullable = true)
 |-- c: long (nullable = true)
 |-- c#: long (nullable = true)
 |-- c++: long (nullable = true)
 |-- class: long (nullable = true)
 |-- css: long (nullable = true)
 |-- database: long (nullable = true)
 |-- eclipse: long (nullable = true)
 |-- function: long (nullable = true)
 |-- html: long (nullable = true)
 |-- ios: long (nullable = true)
 |-- iphone: long (nullable = true)
 |-- java: long (nullable = true)
 |-- javascript: long (nullable = true)
 |-- jquery: long (nullable = true)
 |-- json: long (nullable = true)
 |-- language-agnostic: long (nullable = true)
 |-- linq: long (nullable = true)


                                                                                

In [17]:
#Remove duplicates
model_df = model_df.dropDuplicates(["post_body_text"])
model_df = model_df.filter(model_df["post_body_text"].isNotNull())
model_df = model_df.filter(model_df["post_tags"].isNotNull())
model_df.count()

                                                                                

318648

In [18]:
# Split the data into train (70%), test (20%), and validation (10%) sets
train_df, test_df, val_df = model_df.randomSplit([0.7, 0.2, 0.1], seed=11)

### Load the models from GCS for inference

In [19]:
loaded_models = []

for i in range(0,50):
    tag = top_50_tags[i]

    # Path to save the model in GCS
    gcs_model_path = f'gs://msca-bdp-student-gcs/Group6/Tag_classification_models/Log_Reg_{tag}'

    # Save the trained logistic regression model
    loaded_model = LogisticRegressionModel.load(gcs_model_path)
    
    loaded_models.append({
        "tag": tag,
        "Log_reg_htfidf_model": loaded_model})

24/12/01 19:33:22 WARN org.apache.hadoop.util.concurrent.ExecutorHelper: Thread (Thread[GetFileInfo #1,5,main]) interrupted: 
java.lang.InterruptedException
	at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:510)
	at com.google.common.util.concurrent.FluentFuture$TrustedFuture.get(FluentFuture.java:88)
	at org.apache.hadoop.util.concurrent.ExecutorHelper.logThrowableFromAfterExecute(ExecutorHelper.java:48)
	at org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor.afterExecute(HadoopThreadPoolExecutor.java:90)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1157)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
24/12/01 19:33:52 WARN org.apache.hadoop.util.concurrent.ExecutorHelper: Thread (Thread[GetFileInfo #1,5,main]) interrupted: 
java.lang.InterruptedException
	at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:510)
	at c

In [20]:
loaded_models[0]

{'tag': 'java',
 'Log_reg_htfidf_model': LogisticRegressionModel: uid=LogisticRegression_1480097e889f, numClasses=2, numFeatures=262144}

### Inference - Validation Data

In [21]:
#Remove duplicates
val_df = val_df.dropDuplicates(["post_body_text"])
val_df = val_df.filter(val_df["post_body_text"].isNotNull())
val_df = val_df.filter(val_df["post_tags"].isNotNull())
val_df.count()

24/12/01 19:34:11 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 4.2 MiB
                                                                                

32032

In [22]:
results_validation_data = val_df.select("post_body_text","post_tags")
results_validation_data.show(5)

24/12/01 19:34:39 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 4.2 MiB


+--------------------+--------------------+
|      post_body_text|           post_tags|
+--------------------+--------------------+
| all thanks for t...|objective-c|array...|
| am newbie in lar...|         php|laravel|
| am using it in w...|jquery|wordpress|...|
| are there any go...| java|python|parsing|
| as the question ...|jquery|drop-down-...|
+--------------------+--------------------+
only showing top 5 rows



                                                                                

In [23]:
results_validation_data.count()

24/12/01 19:34:47 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 4.2 MiB
                                                                                

31986

In [24]:
predictions_tags = []

In [25]:
for i in range(0,50):
    tag = loaded_models[i]['tag']
    Log_reg_model = loaded_models[i]['Log_reg_htfidf_model']
    predictions = Log_reg_model.transform(val_df)
    predictions_column = predictions.select("post_body_text",col("prediction").alias(tag))
    predictions_column = predictions_column.withColumnRenamed("post_body_text",f"post_body_text_{i}")
    
    predictions_tags.append({
        "tag":tag,
        "predictions_column":predictions_column,
        "count_rows":predictions_column.count()
    })

24/12/01 19:36:37 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 4.2 MiB
24/12/01 19:37:09 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 4.2 MiB
24/12/01 19:37:29 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 4.2 MiB
24/12/01 19:37:48 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 4.2 MiB
24/12/01 19:38:06 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 4.2 MiB
24/12/01 19:38:23 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 4.2 MiB
24/12/01 19:38:39 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 4.2 MiB
24/12/01 19:38:55 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 4.2 MiB
24/12/01 19:39:11 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary w

In [26]:
predictions_tags

[{'tag': 'java',
  'predictions_column': DataFrame[post_body_text_0: string, java: double],
  'count_rows': 31986},
 {'tag': 'c#',
  'predictions_column': DataFrame[post_body_text_1: string, c#: double],
  'count_rows': 31986},
 {'tag': 'javascript',
  'predictions_column': DataFrame[post_body_text_2: string, javascript: double],
  'count_rows': 31986},
 {'tag': 'php',
  'predictions_column': DataFrame[post_body_text_3: string, php: double],
  'count_rows': 31986},
 {'tag': 'c++',
  'predictions_column': DataFrame[post_body_text_4: string, c++: double],
  'count_rows': 31986},
 {'tag': 'jquery',
  'predictions_column': DataFrame[post_body_text_5: string, jquery: double],
  'count_rows': 31986},
 {'tag': 'html',
  'predictions_column': DataFrame[post_body_text_6: string, html: double],
  'count_rows': 31986},
 {'tag': 'python',
  'predictions_column': DataFrame[post_body_text_7: string, python: double],
  'count_rows': 31986},
 {'tag': 'css',
  'predictions_column': DataFrame[post_body_

In [27]:
for i in range(0,50):
    print(predictions_tags[i]['tag'],":",predictions_tags[i]['count_rows'])

java : 31986
c# : 31986
javascript : 31986
php : 31986
c++ : 31986
jquery : 31986
html : 31986
python : 31986
css : 31986
android : 31986
c : 31986
sql : 31986
mysql : 31986
arrays : 31986
string : 31986
sql-server : 31986
iphone : 31986
ios : 31986
regex : 31986
objective-c : 32000
algorithm : 32000
ruby : 32000
performance : 32000
database : 32000
linux : 32000
ruby-on-rails : 32000
windows : 32001
list : 32000
multithreading : 32000
oop : 32000
bash : 32001
eclipse : 32000
ajax : 32000
perl : 32001
json : 32000
pointers : 32000
visual-studio : 32000
xml : 32000
winforms : 32000
linq : 32001
function : 32001
class : 32001
tsql : 32000
wpf : 32039
xcode : 31990
language-agnostic : 31990
asp_dot_net : 31990
asp_dot_net-mvc : 31990
vb_dot_net : 31989
dot_net : 31990


In [28]:
for i in range(0,10):
    #Join predictions back to the results DataFrame
    predictions_column = predictions_tags[i]['predictions_column']
    results_validation_data = results_validation_data.join(predictions_column, 
                                                   results_validation_data["post_body_text"] == predictions_column[f"post_body_text_{i}"], 
                                                   how='inner')  
    results_validation_data = results_validation_data.drop(f"post_body_text_{i}")
    print(f'completed iteration{i}')

completed iteration0
completed iteration1
completed iteration2
completed iteration3
completed iteration4
completed iteration5
completed iteration6
completed iteration7
completed iteration8
completed iteration9


In [29]:
results_validation_data.show(5)

24/12/01 19:50:08 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 19:50:08 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 19:50:08 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 19:50:08 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 19:50:08 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 19:50:08 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 19:50:08 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 19:50:08 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 19:50:08 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary w

+--------------------+--------------------+----+---+----------+---+---+------+----+------+---+-------+
|      post_body_text|           post_tags|java| c#|javascript|php|c++|jquery|html|python|css|android|
+--------------------+--------------------+----+---+----------+---+---+------+----+------+---+-------+
| am newbie in lar...|         php|laravel| 0.0|0.0|       0.0|1.0|0.0|   0.0| 0.0|   0.0|0.0|    0.0|
| below is my conn...| c#|entity-framework| 0.0|1.0|       0.0|0.0|0.0|   0.0| 0.0|   0.0|0.0|    0.0|
| i am developing ...|android|google-pl...| 0.0|0.0|       0.0|0.0|0.0|   0.0| 0.0|   0.0|0.0|    1.0|
| i dont quite und...|php|model-view-co...| 0.0|0.0|       0.0|0.0|0.0|   0.0| 0.0|   0.0|0.0|    0.0|
| i followed the h...|android|unit-testing| 0.0|0.0|       0.0|0.0|0.0|   0.0| 0.0|   0.0|0.0|    0.0|
+--------------------+--------------------+----+---+----------+---+---+------+----+------+---+-------+
only showing top 5 rows



                                                                                

In [30]:
for i in range(10,20):
    #Join predictions back to the results DataFrame
    predictions_column = predictions_tags[i]['predictions_column']
    results_validation_data = results_validation_data.join(predictions_column, 
                                                   results_validation_data["post_body_text"] == predictions_column[f"post_body_text_{i}"], 
                                                   how='inner')  
    results_validation_data = results_validation_data.drop(f"post_body_text_{i}")
    print(f'completed iteration{i}')

completed iteration10
completed iteration11
completed iteration12
completed iteration13
completed iteration14
completed iteration15
completed iteration16
completed iteration17
completed iteration18
completed iteration19


In [31]:
results_validation_data.show(5)

24/12/01 19:53:45 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 19:53:45 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 19:53:45 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 19:53:46 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 19:53:46 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 19:53:46 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 19:53:46 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 19:53:47 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 19:53:47 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary w

+--------------------+--------------------+----+---+----------+---+---+------+----+------+---+-------+---+---+-----+------+------+----------+------+---+-----+-----------+
|      post_body_text|           post_tags|java| c#|javascript|php|c++|jquery|html|python|css|android|  c|sql|mysql|arrays|string|sql-server|iphone|ios|regex|objective-c|
+--------------------+--------------------+----+---+----------+---+---+------+----+------+---+-------+---+---+-----+------+------+----------+------+---+-----+-----------+
| can anyone expla...|          javascript| 0.0|0.0|       0.0|0.0|0.0|   0.0| 0.0|   0.0|0.0|    0.0|0.0|0.0|  0.0|   0.0|   0.0|       0.0|   0.0|0.0|  0.0|        0.0|
| how can i write ...|                   c| 0.0|0.0|       0.0|0.0|0.0|   0.0| 0.0|   0.0|0.0|    0.0|0.0|0.0|  0.0|   0.0|   0.0|       0.0|   0.0|0.0|  0.0|        0.0|
| i have 2 importa...|     java|networking| 0.0|0.0|       0.0|0.0|0.0|   0.0| 0.0|   0.0|0.0|    0.0|0.0|0.0|  0.0|   0.0|   0.0|       0.0|   0

                                                                                

In [32]:
for i in range(20,30):
    #Join predictions back to the results DataFrame
    predictions_column = predictions_tags[i]['predictions_column']
    results_validation_data = results_validation_data.join(predictions_column, 
                                                   results_validation_data["post_body_text"] == predictions_column[f"post_body_text_{i}"], 
                                                   how='inner')  
    results_validation_data = results_validation_data.drop(f"post_body_text_{i}")
    print(f'completed iteration{i}')

completed iteration20
completed iteration21
completed iteration22
completed iteration23
completed iteration24
completed iteration25
completed iteration26
completed iteration27
completed iteration28
completed iteration29


In [33]:
results_validation_data.show(5)

24/12/01 19:56:15 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 19:56:15 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 19:56:15 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 19:56:15 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 19:56:15 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 19:56:16 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 19:56:16 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 19:56:16 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 19:56:16 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary w

+--------------------+--------------------+----+---+----------+---+---+------+----+------+---+-------+---+---+-----+------+------+----------+------+---+-----+-----------+---------+----+-----------+--------+-----+-------------+-------+----+--------------+---+
|      post_body_text|           post_tags|java| c#|javascript|php|c++|jquery|html|python|css|android|  c|sql|mysql|arrays|string|sql-server|iphone|ios|regex|objective-c|algorithm|ruby|performance|database|linux|ruby-on-rails|windows|list|multithreading|oop|
+--------------------+--------------------+----+---+----------+---+---+------+----+------+---+-------+---+---+-----+------+------+----------+------+---+-----+-----------+---------+----+-----------+--------+-----+-------------+-------+----+--------------+---+
| i am trying to s...|          javascript| 0.0|0.0|       0.0|0.0|0.0|   0.0| 0.0|   0.0|0.0|    0.0|0.0|0.0|  0.0|   0.0|   0.0|       0.0|   0.0|0.0|  0.0|        0.0|      0.0| 1.0|        0.0|     0.0|  0.0|          0

                                                                                

In [34]:
for i in range(30,40):
    #Join predictions back to the results DataFrame
    predictions_column = predictions_tags[i]['predictions_column']
    results_validation_data = results_validation_data.join(predictions_column, 
                                                   results_validation_data["post_body_text"] == predictions_column[f"post_body_text_{i}"], 
                                                   how='inner')  
    results_validation_data = results_validation_data.drop(f"post_body_text_{i}")
    print(f'completed iteration{i}')

completed iteration30
completed iteration31
completed iteration32
completed iteration33
completed iteration34
completed iteration35
completed iteration36
completed iteration37
completed iteration38
completed iteration39


In [35]:
results_validation_data.show(5)

24/12/01 19:59:04 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 19:59:04 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 19:59:04 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 19:59:04 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 19:59:04 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 19:59:04 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 19:59:05 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 19:59:05 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 19:59:05 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary w

+--------------------+--------------------+----+---+----------+---+---+------+----+------+---+-------+---+---+-----+------+------+----------+------+---+-----+-----------+---------+----+-----------+--------+-----+-------------+-------+----+--------------+---+----+-------+----+----+----+--------+-------------+---+--------+----+
|      post_body_text|           post_tags|java| c#|javascript|php|c++|jquery|html|python|css|android|  c|sql|mysql|arrays|string|sql-server|iphone|ios|regex|objective-c|algorithm|ruby|performance|database|linux|ruby-on-rails|windows|list|multithreading|oop|bash|eclipse|ajax|perl|json|pointers|visual-studio|xml|winforms|linq|
+--------------------+--------------------+----+---+----------+---+---+------+----+------+---+-------+---+---+-----+------+------+----------+------+---+-----+-----------+---------+----+-----------+--------+-----+-------------+-------+----+--------------+---+----+-------+----+----+----+--------+-------------+---+--------+----+
| i am trying to

                                                                                

In [36]:
for i in range(40,50):
    #Join predictions back to the results DataFrame
    predictions_column = predictions_tags[i]['predictions_column']
    results_validation_data = results_validation_data.join(predictions_column, 
                                                   results_validation_data["post_body_text"] == predictions_column[f"post_body_text_{i}"], 
                                                   how='inner')  
    results_validation_data = results_validation_data.drop(f"post_body_text_{i}")
    print(f'completed iteration{i}')

completed iteration40
completed iteration41
completed iteration42
completed iteration43
completed iteration44
completed iteration45
completed iteration46
completed iteration47
completed iteration48
completed iteration49


In [37]:
results_validation_data.show(5)

24/12/01 20:01:58 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 20:01:59 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 20:01:59 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 20:02:00 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 20:02:00 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 20:02:00 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 20:02:01 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 20:02:01 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 20:02:01 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary w

+--------------------+--------------------+----+---+----------+---+---+------+----+------+---+-------+---+---+-----+------+------+----------+------+---+-----+-----------+---------+----+-----------+--------+-----+-------------+-------+----+--------------+---+----+-------+----+----+----+--------+-------------+---+--------+----+--------+-----+----+---+-----+-----------------+-----------+---------------+----------+-------+
|      post_body_text|           post_tags|java| c#|javascript|php|c++|jquery|html|python|css|android|  c|sql|mysql|arrays|string|sql-server|iphone|ios|regex|objective-c|algorithm|ruby|performance|database|linux|ruby-on-rails|windows|list|multithreading|oop|bash|eclipse|ajax|perl|json|pointers|visual-studio|xml|winforms|linq|function|class|tsql|wpf|xcode|language-agnostic|asp_dot_net|asp_dot_net-mvc|vb_dot_net|dot_net|
+--------------------+--------------------+----+---+----------+---+---+------+----+------+---+-------+---+---+-----+------+------+----------+------+---+--

                                                                                

In [38]:
# Write the DataFrame to GCS in Parquet format
output_path1 = "gs://msca-bdp-student-gcs/Group6/Tag_classification_inference/val_df"
val_df.write.mode("overwrite").parquet(output_path1)

24/12/01 20:04:54 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 4.5 MiB
                                                                                

In [39]:
# Write the DataFrame to GCS in Parquet format
output_path2 = "gs://msca-bdp-student-gcs/Group6/Tag_classification_inference/results_validation_data"
results_validation_data.write.mode("overwrite").parquet(output_path2)

24/12/01 20:05:25 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 20:05:25 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 20:05:25 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 20:05:26 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 20:05:26 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 20:05:26 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 20:05:26 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 20:05:26 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 20:05:26 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary w

In [40]:
#We can also have a look at some of the tags that the model has been able to generate for posts by condensing the data
from pyspark.sql.functions import array, col, when, expr, size

results_validation_data_with_tags = results_validation_data.withColumn("predicted_tags",array(*[when(col(tag) == 1, 
                                                                                          tag).otherwise(None) 
                                                                                     for tag in top_50_tags]))

results_validation_data_with_tags = results_validation_data_with_tags.withColumn("predicted_tags",
                                                                                 expr("filter(predicted_tags, x -> x is not null)"))

results_validation_data_with_tags.filter(size(col("predicted_tags")) > 0).select('post_body_text','post_tags','predicted_tags').show(5)

24/12/01 20:09:08 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 20:09:08 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 20:09:08 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 20:09:09 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 20:09:09 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 20:09:09 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 20:09:09 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 20:09:09 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 20:09:10 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary w

+--------------------+--------------------+--------------------+
|      post_body_text|           post_tags|      predicted_tags|
+--------------------+--------------------+--------------------+
| am newbie in lar...|         php|laravel|               [php]|
| how can i close ...|c#|winforms|keybo...|[c#, string, winf...|
| i am completely ...|         c#|xml|linq|         [xml, linq]|
| i have django in...| python|django|pydev|            [python]|
| i have problem w...|python|file|dicti...|            [python]|
+--------------------+--------------------+--------------------+
only showing top 5 rows



                                                                                

In [41]:
# Write the DataFrame to GCS in Parquet format
output_path3 = "gs://msca-bdp-student-gcs/Group6/Tag_classification_inference/results_validation_data_with_tags"
results_validation_data_with_tags.write.mode("overwrite").parquet(output_path3)

24/12/01 20:12:20 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 20:12:20 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 20:12:20 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 20:12:20 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 20:12:21 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 20:12:21 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 20:12:21 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 20:12:21 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.9 MiB
24/12/01 20:12:21 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary w

In [46]:
results_validation_data_with_tags.printSchema()

root
 |-- post_body_text: string (nullable = true)
 |-- post_tags: string (nullable = true)
 |-- java: double (nullable = false)
 |-- c#: double (nullable = false)
 |-- javascript: double (nullable = false)
 |-- php: double (nullable = false)
 |-- c++: double (nullable = false)
 |-- jquery: double (nullable = false)
 |-- html: double (nullable = false)
 |-- python: double (nullable = false)
 |-- css: double (nullable = false)
 |-- android: double (nullable = false)
 |-- c: double (nullable = false)
 |-- sql: double (nullable = false)
 |-- mysql: double (nullable = false)
 |-- arrays: double (nullable = false)
 |-- string: double (nullable = false)
 |-- sql-server: double (nullable = false)
 |-- iphone: double (nullable = false)
 |-- ios: double (nullable = false)
 |-- regex: double (nullable = false)
 |-- objective-c: double (nullable = false)
 |-- algorithm: double (nullable = false)
 |-- ruby: double (nullable = false)
 |-- performance: double (nullable = false)
 |-- database: double

In [47]:
val_df.groupBy('java').count().show()

24/12/02 00:59:28 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 4.2 MiB
24/12/02 00:59:46 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 4.1 MiB
24/12/02 00:59:46 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 4.1 MiB
24/12/02 00:59:47 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 4.1 MiB
24/12/02 00:59:47 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 4.1 MiB
24/12/02 00:59:47 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 4.1 MiB


+----+-----+
|java|count|
+----+-----+
|   0|27436|
|   1| 4665|
+----+-----+



In [48]:
results_validation_data.groupBy('java').count().show()

24/12/02 01:00:18 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 4.2 MiB
24/12/02 01:00:19 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 4.2 MiB
24/12/02 01:00:19 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 4.2 MiB
24/12/02 01:00:19 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 4.2 MiB
24/12/02 01:00:19 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 4.2 MiB
24/12/02 01:00:19 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 4.2 MiB
24/12/02 01:00:19 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 4.2 MiB
24/12/02 01:00:19 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 4.2 MiB
24/12/02 01:00:20 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary w

+----+-----+
|java|count|
+----+-----+
| 0.0|26899|
| 1.0| 2880|
+----+-----+



                                                                                