In [1]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import IntegerType, ArrayType, StringType, DoubleType, MapType
import pyspark.sql.functions as f
from pyspark.sql.window import Window

import math

In [2]:
spark = SparkSession.builder \
    .appName("TFIDF")\
    .config("spark.executor.instances", "2") \
    .config("spark.executor.memory", "25G") \
    .config("spark.exceturor.memovyOverhead", "5G") \
    .config("spark.executor.cores", "20") \
    .config("spark.executor.cores.max", "20") \
    .config("spark.driver.memory", "30G") \

    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/18 12:50:33 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).


In [3]:
# https://dzone.com/articles/calculating-tf-idf-with-apache-spark

In [4]:
%%time 
df = spark.read.parquet('/user/jpietraszek/arxiv-lemmatized_preprocessed.parquet')

                                                                                

CPU times: user 7.67 ms, sys: 694 µs, total: 8.36 ms
Wall time: 4.72 s


In [5]:
%%time 
# Combine groupBy and explode in a single transformation
df = df.groupBy("categories").agg(f.explode(f.flatten(f.collect_list("result"))).alias("token"))

# Create term frequency dataframe
tokensWithTf = df.groupBy("categories", "token").agg(f.count("token").alias('tf'))

# Create document frequency dataframe
tokensWithDf = df.groupBy("token").agg(f.countDistinct("categories").alias('df'))

# Calculate IDF using Spark SQL for better optimization
tokensWithDf.createOrReplaceTempView("tokensWithDf")
tokensWithTf.createOrReplaceTempView("tokensWithTf")

docCount = spark.sql("SELECT COUNT(DISTINCT categories) FROM tokensWithTf").collect()[0][0]
tokensWithIdf = spark.sql(f"SELECT *, LOG(({docCount} + 1.0) / (1.0 + df)) as idf FROM tokensWithDf")

# Join DataFrames
joined_df = tokensWithTf.join(tokensWithIdf, ["token"], "left")

# Calculate TF-IDF
result_df = joined_df.withColumn("tf_idf", f.col("tf") * f.col("idf"))

# Create top 10 keywords per category based on TF-IDF value
window_spec = Window.partitionBy("categories").orderBy(f.col("tf_idf").desc())
ranked_df = result_df.withColumn("rank", f.row_number().over(window_spec))
top_tfidf_df = ranked_df.filter(f.col("rank") <= 10)
rank = top_tfidf_df.select('token', 'categories', 'rank')

# Persist instead of cache for more control
#rank.persist(StorageLevel.DISK_ONLY)



                                                                                

CPU times: user 63.3 ms, sys: 9.44 ms, total: 72.8 ms
Wall time: 17 s


In [6]:
%%time
# Write the result to Parquet
rank.write.format('parquet').mode("overwrite").save("/user/jpietraszek/sparkTFIDF_full.parquet")



CPU times: user 49.7 ms, sys: 13.6 ms, total: 63.3 ms
Wall time: 25.7 s


                                                                                

In [5]:
# Code below has problems with lack of memory on executors, creates CODE 137 and several other problems. 
#This is caused by using UDF and more "pandas" code writing, mainly using aggregation and joins. Using SQL is much more efficient



"""

df = spark.read.parquet('/user/jpietraszek/arxiv-lemmatized_preprocessed.parquet')%%time

#df = df.sample(fraction = .001)
df = df.groupBy("categories").agg(f.flatten(f.collect_list("result")).alias("merged_result"))
df = df.select("*", f.explode(f.col("merged_result")).alias("token"))

# create term frequency dataframe
tokensWithTf = df.groupBy("categories", "token").agg(f.count("merged_result").alias('tf'))

# create document frequency dataframe
tokensWithDf = df.groupBy("token").agg(f.countDistinct("merged_result").alias('df'))
def calc_idf(df, docCount):
    return math.log((docCount + 1.0) / (1.0 + df))

# register the IDF calculation function as a UDF
calc_idf_udf = f.udf(calc_idf, DoubleType())
docCount = tokensWithTf.select('categories').distinct().count()
# apply the UDF to calculate IDF and add a new column "idf" on DF dataframe
tokensWithIdf = tokensWithDf.withColumn("idf", calc_idf_udf(f.col("df"), f.lit(docCount)))

joined_df = tokensWithTf.join(f.broadcast(tokensWithIdf), ["token"], "left")

# calculate the TF-IDF by multiplying "tf" with "idf"
result_df = joined_df.withColumn("tf_idf", f.col("tf") * f.col("idf"))

# create top 10 keywords per category based on TF-IDF value

window_spec = Window.partitionBy("categories").orderBy(f.col("tf_idf").desc())
ranked_df = result_df.withColumn("rank", f.row_number().over(window_spec))
top_tfidf_df = ranked_df.filter(f.col("rank") <= 10)

rank = top_tfidf_df.select('token','categories','rank')

rank.write.format('parquet').mode("overwrite").save("/user/jpietraszek/sparkTFIDF_full.parquet")

"""

CPU times: user 3.25 ms, sys: 4.17 ms, total: 7.42 ms
Wall time: 142 ms
