In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("spark-proj111") \
    .master('yarn')\
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2023-05-05 16:28:13,393 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


In [2]:
from pyspark.sql.functions import regexp_replace
from pyspark.sql.types import *
from pyspark.sql.functions import col
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.functions import udf, col
from pyspark.ml.feature import StopWordsRemover, HashingTF, IDF
from pyspark.ml.clustering import KMeans, BisectingKMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import PCA
from pyspark.sql.functions import sum, col
from delta.tables import *
import time

In [3]:
# Creating the Data Schema
schema = StructType([StructField("_c0", IntegerType(), True),
                            StructField("_c1", StringType(), True),
                            StructField("_c2", IntegerType(), True),
                            StructField("_c3", IntegerType(), True),
                            ])


In [4]:
# creating our  DataFrame by reading our CSV file on HDFS
df = spark.read \
    .schema(schema)\
    .option("delimiter", "\t") \
    .csv("hdfs://namenode:9000/project/test18/part*") \
    .select( "_c0", "_c1", "_c3")\
    .distinct()

                                                                                

In [None]:
df=df.repartition(100)

In [5]:
#Rename the name of columns to be the same as dataset fields
df = df.withColumnRenamed('_c0', 'id')\
    .withColumnRenamed('_c1', 'title')\
    .withColumnRenamed('_c2', 'year')\
    .withColumnRenamed('_c3', 'n_citation')
df.select("title").show(30, truncate=False)

[Stage 3:>                                                          (0 + 1) / 1]

+-----------------------------------------------------------------------------------------------------------------------+
|title                                                                                                                  |
+-----------------------------------------------------------------------------------------------------------------------+
|Ontology-Mediated Queries: Combined Complexity and Succinctness of Rewritings via Circuit Complexity                   |
|Inducing strong convergence into the asymptotic behaviour of proximal splitting algorithms in Hilbert spaces           |
|Self-Supervised Deep Reinforcement Learning with Generalized Computation Graphs for Robot Navigation                   |
|Context-dependent upper-confidence bounds for directed exploration                                                     |
|Unsupervised object-level video summarization with online motion auto-encoder                                          |
|Evaluation of the finit

                                                                                

In [None]:
# Remove "." from the end of titles
# inefficient due to shuffle
new_rows = []

# iterate on DataFrame rows
for row in df.rdd.collect():
    # Get the value of the "title" column for each row
    old_value = row["title"]
    
    # Cleaning titles
    # Removing the period at the end of titles
    new_value = old_value.rstrip(".")
    
    # Append modified value to the new rows
    new_row = row.asDict()
    new_row["_title_cleaned"] = new_value
    new_rows.append(new_row)

# Form a new DataFrame from the list of new rows
new_df = spark.createDataFrame(new_rows)

new_df.show(30, truncate=False)
df=new_df


2023-04-27 07:02:47,358 ERROR netty.Inbox: An error happened while processing message in the inbox for CoarseGrainedScheduler
java.lang.OutOfMemoryError: Java heap space
	at java.util.Arrays.copyOf(Arrays.java:3236)
	at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
	at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
	at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
	at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
	at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
	at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
	at org.apache.spark.serializer.JavaSerialize

KeyboardInterrupt: 

In [6]:
#Efficient approach in spark to Remove "." from the end of titles

df = df.withColumn("_title_cleaned", regexp_replace("title", "\.$", ""))
df.select("title","_title_cleaned").show(30, truncate=False)

[Stage 6:>                                                          (0 + 1) / 1]

+----------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------+
|title                                                                                                                 |_title_cleaned                                                                                                        |
+----------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------+
|Matrix Profile XVI: Efficient and Effective Labeling of Massive Time Series Archives                                  |Matrix Profile XVI: Efficient and Effective Labeling of Massive Time Series Archives                                  |
|Enhancing CS1 with Mobile Apps.        

                                                                                

In [7]:
import pyspark.sql.functions as F
df.groupBy(F.spark_partition_id()).count().show()

[Stage 9:>                                                          (0 + 0) / 1]

+--------------------+------+
|SPARK_PARTITION_ID()| count|
+--------------------+------+
|                   1|806368|
|                   2|806505|
|                   5|808522|
|                   0|807334|
|                   3|807989|
|                   4|808296|
|                   6| 49067|
+--------------------+------+



                                                                                

[Stage 9:>                                                          (0 + 0) / 1]

In [8]:
df=df.withColumn('salt', F.rand())
df=df.repartition(8,'salt')

In [9]:
df.groupBy(F.spark_partition_id()).count().show()

[Stage 9:>                                                          (0 + 0) / 1]

+--------------------+------+
|SPARK_PARTITION_ID()| count|
+--------------------+------+
|                   1|611873|
|                   3|610666|
|                   4|612004|
|                   6|612426|
|                   0|611718|
|                   2|610717|
|                   5|613059|
|                   7|611618|
+--------------------+------+



                                                                                

[Stage 9:>                                                          (0 + 0) / 1]

In [7]:
# Second phase of cleaning for the titles
# Deleting all Characters including : [!,?/:"().;$#%&*()@^=+-]
characters_to_remove = '[!,?/:"().;$#%&*()@^=+-]'

df = df.withColumn("title", regexp_replace("_title_cleaned", characters_to_remove, "")) \
       .withColumn("title", regexp_replace("title", "\d+", "")) \
       .withColumn("title", regexp_replace("title", ":", ""))

df.select("title").show(truncate=False)




+-------------------------------------------------------------------------------------------------------------------+
|title                                                                                                              |
+-------------------------------------------------------------------------------------------------------------------+
|OntologyMediated Queries Combined Complexity and Succinctness of Rewritings via Circuit Complexity                 |
|Inducing strong convergence into the asymptotic behaviour of proximal splitting algorithms in Hilbert spaces       |
|SelfSupervised Deep Reinforcement Learning with Generalized Computation Graphs for Robot Navigation                |
|Contextdependent upperconfidence bounds for directed exploration                                                   |
|Unsupervised objectlevel video summarization with online motion autoencoder                                        |
|Evaluation of the finite element lattice Boltzmann meth

                                                                                

In [11]:
df.groupBy(F.spark_partition_id()).count().show()

[Stage 9:>                                                          (0 + 0) / 1]

+--------------------+------+
|SPARK_PARTITION_ID()| count|
+--------------------+------+
|                   0|611718|
|                   2|610717|
|                   4|612004|
|                   6|612426|
|                   1|611873|
|                   3|610666|
|                   5|613059|
|                   7|611618|
+--------------------+------+



                                                                                

[Stage 9:>                                                          (0 + 0) / 1]

In [11]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- n_citation: integer (nullable = true)
 |-- _title_cleaned: string (nullable = true)



In [None]:
# from pyspark.sql.functions import udf
# from pyspark.sql.types import ArrayType, StringType

# # Define a very slow tokenizer function
# def very_slow_tokenizer(text):
#     tokens = []
#     for word in text.split():
#         for letter in word:
#             for i in range(10):
#                 tokens.append(letter)
#     return tokens

# # Define a UDF using the very slow tokenizer function
# slow_tokenizer_udf = udf(very_slow_tokenizer, ArrayType(StringType()))

# # Apply the UDF to the DataFrame column
# df = df.withColumn("tokens", slow_tokenizer_udf(df["_c1"]))
# df.show(truncate=False)

In [8]:
# Tokenize the titles

# Define a user-defined function (UDF) to tokenize
tokenizer = udf(lambda x: x.lower().split(), ArrayType(StringType()))

# Apply the tokenizer UDF to the "title" column of the DataFrame to create a new column called "token_list"
df = df.withColumn("token_list", tokenizer(col("title").cast(StringType()))) # add this line to cast tokens to StringType
df.select("title", "token_list").show(truncate=False)


[Stage 12:>                                                         (0 + 1) / 1]

+-------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------+
|title                                                                                                              |token_list                                                                                                                       |
+-------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------+
|OntologyMediated Queries Combined Complexity and Succinctness of Rewritings via Circuit Complexity                 |[ontologymediated, queries, combined, complexity, and, succinctness, of, rewritings, via, circuit, complexity]                   |
|Inducin

                                                                                

In [9]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- n_citation: integer (nullable = true)
 |-- _title_cleaned: string (nullable = true)
 |-- token_list: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [10]:
# Define the stop words that should be removed in titles
stop_words = ["the", "a", "an", "and", "or", "in", "on", "at",\
              "what","why","how","when","whatever","have","has",\
                  "to", "of", "by", "for", "from", "do", "how", "about", "do", "does"]

# Remove stop words from token_list and finish cleaning
remover = StopWordsRemover(inputCol="token_list", outputCol="filtered_token", stopWords=stop_words)
df = remover.transform(df)
df.select("filtered_token").show(truncate=False)

[Stage 15:>                                                         (0 + 1) / 1]

+-------------------------------------------------------------------------------------------------------------+
|filtered_token                                                                                               |
+-------------------------------------------------------------------------------------------------------------+
|[matrix, profile, xvi, efficient, effective, labeling, massive, time, series, archives]                      |
|[enhancing, cs, with, mobile, apps]                                                                          |
|[mobile, crowdsourcing, pervasive, computing, smart, cities]                                                 |
|[readability, computer, documentation]                                                                       |
|[space, as, interface, creating, interactive, street, art]                                                   |
|[comparison, several, difference, schemes, d, d, test, problems, euler, equations]                     

                                                                                

In [11]:
# Convert tokens to TF-IDF representation
hashingTF = HashingTF(inputCol="filtered_token", outputCol="rawFeatures", numFeatures=10000)
idf = IDF(inputCol="rawFeatures", outputCol="idf_features")
tfidf = idf.fit(hashingTF.transform(df)).transform(hashingTF.transform(df))

# pca = PCA(k=2, inputCol="idf_features")
# pca.setOutputCol("pca_features")

# model = pca.fit(tfidf)


                                                                                

In [None]:
# Evaluate Kmeans accuracy to select best k for Kmeans
from pyspark.ml.evaluation import ClusteringEvaluator
for x in range(10,1000,10):
    kmeans = KMeans(featuresCol="idf_features", k=x)
    model = kmeans.fit(tfidf)
    clustered = model.transform(tfidf)
    evaluator = ClusteringEvaluator(predictionCol="prediction", featuresCol="idf_features")
    silhouette = evaluator.evaluate(clustered)
    print("Silhouette =" + str(silhouette))

In [12]:
# Cluster the rows using kmeans
kmeans = KMeans(featuresCol="idf_features", k=20)
model = kmeans.fit(tfidf)
clustered = model.transform(tfidf)

# Show the clustered dataframe
clustered.select("title","prediction").show(truncate=False)

2023-05-05 16:37:19,749 WARN netlib.InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
2023-05-05 16:37:19,760 WARN netlib.InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
2023-05-05 16:40:27,700 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 1797.8 KiB
2023-05-05 16:42:00,972 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 1783.9 KiB
2023-05-05 16:42:20,689 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 1793.0 KiB
[Stage 125:>                                                        (0 + 1) / 1]

+-------------------------------------------------------------------------------------------------------------+----------+
|title                                                                                                        |prediction|
+-------------------------------------------------------------------------------------------------------------+----------+
|Matrix Profile XVI Efficient and Effective Labeling of Massive Time Series Archives                          |0         |
|Enhancing CS with Mobile Apps                                                                                |0         |
|Mobile Crowdsourcing and Pervasive Computing for Smart Cities                                                |0         |
|Readability and computer documentation                                                                       |0         |
|Space as interface creating interactive street art                                                           |10        |
|Comparison of S

                                                                                

In [None]:
# Evaluate Bisecting K-means accuracy to select best k for Bisecting K-means
for x in range(10,1000,10):
    bkmeans = BisectingKMeans(featuresCol="idf_features", k=x)
    model = bkmeans.fit(tfidf)
    clustered = model.transform(tfidf)
    evaluator = ClusteringEvaluator(predictionCol="prediction", featuresCol="idf_features")
    silhouette = evaluator.evaluate(clustered)
    print("Silhouette =" + str(silhouette))

In [None]:
# Cluster the rows using BisectingKMeans

# Define the Bisecting K-means model
bkmeans = BisectingKMeans(featuresCol="idf_features", k=20, maxIter=20)

# Fit the model on the TF-IDF DataFrame
model = bkmeans.fit(tfidf)

# Get the cluster assignments for each document
clustered = model.transform(tfidf)

# Show the clustered DataFrame
clustered.show(truncate=False)


In [18]:
#inefficien version of calculating the percentage of being influentioal
# Group by the cluster and sum the number of citations in each cluster
citations = clustered.groupBy("prediction").agg(sum("n_citation").alias("total_citations"))

# Collect all data to the driver node
CitationList = citations.collect()

# Calculate the total number of citations
total_citations = 0
for row in CitationList:
    total_citations += row["total_citations"]

# Calculate the percentage of citations in each cluster
citations = spark.createDataFrame(CitationList)
citations = citations.withColumn("percentage_citations", col("total_citations")/total_citations*100)

# Sort the result by the percentage of citations in descending order
citations = citations.sort(col("percentage_citations").desc())

# Show the result
citations.show(truncate=False)

2023-05-05 15:38:35,922 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 1800.7 KiB
2023-05-05 15:40:10,068 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 1786.6 KiB
[Stage 125:>                                                        (0 + 2) / 2]

+----------+---------------+--------------------+
|prediction|total_citations|percentage_citations|
+----------+---------------+--------------------+
|7         |76568707       |93.12562671886849   |
|13        |953117         |1.1592153170538426  |
|0         |769492         |0.9358839604690669  |
|1         |739683         |0.8996291781222425  |
|6         |486606         |0.5918277908906274  |
|17        |446128         |0.5425969854347333  |
|4         |425741         |0.5178015797617922  |
|11        |394807         |0.4801785317857897  |
|3         |298491         |0.36303553415028644 |
|18        |239454         |0.2912326026393516  |
|12        |222755         |0.2709226757578857  |
|2         |171856         |0.20901747374939822 |
|15        |160194         |0.19483372817830683 |
|14        |109541         |0.13322772025406637 |
|5         |109478         |0.13315109737883238 |
|8         |42403          |0.05157205997693262 |
|10        |35200          |0.04281151124184676 |


                                                                                

In [20]:
#Efficient version of calulatating percentage of being influential

# Group by the cluster and sum the number of citations in each cluster
citations = clustered.groupBy("prediction").agg(sum("n_citation").alias("total_citations"))
# Calculate the total number of citations
total_citations = citations.agg(sum("total_citations")).collect()[0][0]
# Calculate the percentage of citations in each cluster
citations = citations.withColumn("percentage_citations", col("total_citations")/total_citations*100)

# Show the result
citations.show(truncate=False)


2023-05-05 15:48:22,632 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 1833.3 KiB
2023-05-05 15:48:33,102 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 1836.9 KiB
2023-05-05 15:48:33,491 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 1833.4 KiB

+----------+---------------+--------------------+
|prediction|total_citations|percentage_citations|
+----------+---------------+--------------------+
|12        |222755         |0.2709226757578857  |
|1         |739683         |0.8996291781222425  |
|13        |953117         |1.1592153170538426  |
|6         |486606         |0.5918277908906274  |
|16        |21464          |0.026105291968607926|
|3         |298491         |0.36303553415028644 |
|5         |109478         |0.13315109737883238 |
|19        |6077           |0.007391066869792692|
|15        |160194         |0.19483372817830683 |
|17        |446128         |0.5425969854347333  |
|9         |19683          |0.023939175448104255|
|4         |425741         |0.5178015797617922  |
|8         |42403          |0.05157205997693262 |
|7         |76568707       |93.12562671886849   |
|10        |35200          |0.04281151124184676 |
|11        |394807         |0.4801785317857897  |
|14        |109541         |0.13322772025406637 |


2023-05-05 15:48:42,855 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 1834.3 KiB
                                                                                

In [None]:
# Efficient version of calulatating percentage of being influential
# Apply Caching technique
# Group by the cluster and sum the number of citations in each cluster
citations = clustered.groupBy("prediction").agg(sum("n_citation").alias("total_citations"))
clustered.cache()
# Calculate the total number of citations
total_citations = citations.agg(sum("total_citations")).collect()[0][0]
clustered.cache()
# Calculate the percentage of citations in each cluster
citations = citations.withColumn("percentage_citations", col("total_citations")/total_citations*100)

# Show the result
citations.show(truncate=False)


In [21]:
# Benchmarking to figure out functions that are not optimized by calculating Execution time
# Time of groupBy() operation
start_time = time.time()
citations_groupBy = clustered.groupBy("prediction")
end_time = time.time()
time_of_groupBy = end_time - start_time

# Time of agg operation
start_time = time.time()
citations_aggregated = citations_groupBy.agg(sum("n_citation").alias("total_citations"))
end_time = time.time()
time_of_agg = end_time - start_time

# Calculate the total number of citations
total_citations = citations_aggregated.agg(sum("total_citations")).collect()[0][0]

# Calculate to find the percentage of citations for each cluster
Percentage_of_citation = citations_aggregated.withColumn("percentage_citations", col("total_citations")/total_citations*100)

print("Time of groupBy function is:", time_of_groupBy)

print("Time of agg function is:", time_of_agg)

# Show the result
Percentage_of_citation.show(truncate=False)


2023-05-05 15:51:08,758 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 1833.3 KiB
2023-05-05 15:51:18,958 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 1836.9 KiB
                                                                                

Time of groupBy function is: 0.02834033966064453
Time of agg function is: 0.027842283248901367


2023-05-05 15:51:19,266 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 1833.4 KiB
2023-05-05 15:51:28,338 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 1834.3 KiB


+----------+---------------+--------------------+
|prediction|total_citations|percentage_citations|
+----------+---------------+--------------------+
|12        |222755         |0.2709226757578857  |
|1         |739683         |0.8996291781222425  |
|13        |953117         |1.1592153170538426  |
|6         |486606         |0.5918277908906274  |
|16        |21464          |0.026105291968607926|
|3         |298491         |0.36303553415028644 |
|5         |109478         |0.13315109737883238 |
|19        |6077           |0.007391066869792692|
|15        |160194         |0.19483372817830683 |
|17        |446128         |0.5425969854347333  |
|9         |19683          |0.023939175448104255|
|4         |425741         |0.5178015797617922  |
|8         |42403          |0.05157205997693262 |
|7         |76568707       |93.12562671886849   |
|10        |35200          |0.04281151124184676 |
|11        |394807         |0.4801785317857897  |
|14        |109541         |0.13322772025406637 |


                                                                                

In [None]:
# Save Delta table in an output directory on HDFS
citations.write.mode("overwrite").option("delimiter", "\t").csv("hdfs://namenode:9000/project/output")


In [None]:
#END