## INIT

In [1]:
import configparser

# Read Config
config = configparser.ConfigParser()
config.read('tfidf_config.properties')
SPARK_MASTER = config.get('tfidf', 'spark_master')
SPARK_DRIVER_HOST = config.get('tfidf', 'spark_driver_host')
SPARK_DRIVER_BINDADDRES = config.get('tfidf', 'spark_driver_bindaddress')
TF_UDF_JAR_PATH = config.get('tfidf', 'tf_udf_jar_path')
HDFS_URL = config.get('tfidf', 'hdfs_url')
HADOOP_USER_NAME = config.get('tfidf', 'hadoop_user_name')

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, IntegerType, DoubleType
from pyspark.sql.functions import regexp_replace, lower, split, explode, col, collect_list, struct, udf, count, array, trim, log, lit
from pyspark.ml.feature import StopWordsRemover
from pyspark.storagelevel import StorageLevel
from pyspark.sql import SparkSession, Row
import os

# os.environ['PYSPARK_PYTHON'] = "/home/user0170809/pyspark_venv1/bin/python"
# os.environ['PYSPARK_PYTHON'] = "./environment/bin/python"
os.environ["HADOOP_USER_NAME"] = HADOOP_USER_NAME

# Create a SparkSession with a specific master using builder.config
spark = SparkSession.builder \
    .appName("TF_IDF") \
    .config("spark.master", SPARK_MASTER) \
    .config("spark.driver.host", SPARK_DRIVER_HOST) \
    .config("spark.driver.bindAddress", SPARK_DRIVER_BINDADDRES) \
    .config("spark.sql.files.maxPartitionBytes", "33554432") \
    .config("spark.executor.memory", "2G") \
    .config("spark.memory.storageFraction", "0.05") \
    .config("spark.hadoop.dfs.blocksize", "16777216") \
    .config("spark.hadoop.parquet.block.size", "16777216") \
    .config("spark.hadoop.dfs.replication", "1") \
    .config("spark.memory.fraction", "0.5") \
    .config("spark.jars", TF_UDF_JAR_PATH) \
    .getOrCreate()
#.config("spark.sql.shuffle.partitions", "179") \
#.config("spark.executor.instances", "1") \
#.config("spark.executor.cores", "2") \
#.config("spark.archives", "/home/user1083408/pyspark_venv1.tar.gz#environment") \

In [3]:
schema = StructType([
    StructField("id", StringType(), True),
    StructField("url", StringType(), True),
    StructField("title", StringType(), True),
    StructField("text", StringType(), True)
])

hdfs_input = f'hdfs://{HDFS_URL}/wiki2'
#hdfs_input1 = f'hdfs://{HDFS_URL}/wiki2/part-00000-03018fbe-cac8-46a1-8b7a-70cb9751b15f-c000.snappy.parquet'
#hdfs_input2 = f'hdfs://{HDFS_URL}/wiki2/part-00000-06fd7f28-a724-4eb0-a97a-73b002720b89-c000.snappy.parquet'
#df = spark.read.schema(schema).parquet(hdfs_input1, hdfs_input2).select('id', 'text')
df = spark.read.schema(schema).parquet(hdfs_input)

In [3]:
df_count = df.count()

                                                                                

In [4]:
df_count = 1704000

In [4]:
df_count = 37600

## CLEANING

In [5]:
# CLEANING (as text)

# remove newline
df_rm_newline = df.withColumn("text", regexp_replace("text", "\n", " "))

# select to only alphanumeric
df_alphanum = df_rm_newline.withColumn("text", regexp_replace("text", "[^a-zA-Z\s]", " "))

# replace multi space with single space
df_one_space = df_alphanum.withColumn("text", regexp_replace("text", "\s+", " ")).withColumn("text", trim("text"))

# lowercase
df_lower = df_one_space.withColumn("text", lower("text"))

In [6]:
# SPLIT TEXT TO WORDS ARRAY
df_split_words = df_lower.withColumn("words", split(col("text"), " ")).drop('text')

In [7]:
# CLEANING (as array of words)

# remove stop word
df_rm_stopword = StopWordsRemover(inputCol="words", outputCol="words_nostop").transform(df_split_words).drop('words').withColumnRenamed("words_nostop", "words")

## TERM-DOC COUNT WITH UDF

In [8]:
def f_term_frequency(words_list):
    word_count = len(words_list)
    word_count_dict = {}
    for word in words_list:
        if word in word_count_dict:
            word_count_dict[word] += 1
        else:
            word_count_dict[word] = 1
    return [{"term": word, "frequency": count / word_count} for word, count in word_count_dict.items()]

### (1a) Python UDF, Non-Deterministic

In [9]:
udf_term_frequency = udf(f_term_frequency, ArrayType(StructType([
    StructField("term", StringType()),
    StructField("frequency", DoubleType())
]))).asNondeterministic()

df_term_frequency = df_rm_stopword.select(col("id").alias("doc_id"), udf_term_frequency("words").alias("term_frequency"))

### (1b) Python UDF, Deterministic + Cached

In [9]:
udf_term_frequency = udf(f_term_frequency, ArrayType(StructType([
    StructField("term", StringType()),
    StructField("frequency", DoubleType())
])))

df_term_frequency = df_rm_stopword.select(col("id").alias("doc_id"), udf_term_frequency("words").alias("term_frequency")).persist(StorageLevel.MEMORY_ONLY)

### (1c) Scala UDF Cached

In [8]:
df_rm_stopword.createOrReplaceTempView("v_cleaned")

spark.udf.registerJavaFunction("udf_term_frequency", "com.scalaudf.TermFrequencyUDF", ArrayType(StructType([
    StructField("term", StringType()),
    StructField("frequency", DoubleType())
])))

df_term_frequency = spark.sql("""
SELECT id AS doc_id, udf_term_frequency(words) AS term_frequency
FROM v_cleaned
""").persist(StorageLevel.MEMORY_ONLY)

### (2) Explode & Select

In [9]:
# Explode group
df_explode = df_term_frequency.withColumn("term_frequency", explode(col("term_frequency")))

# Unstruct
df_unstruct = df_explode.select('doc_id', col("term_frequency.term"), col('term_frequency.frequency'))

### (3a) Cache -> RepartitionByRange -> Group & Order

In [10]:
df_unstruct_persisted = df_unstruct.persist(StorageLevel.DISK_ONLY)

df_repart_range = df_unstruct_persisted.repartitionByRange(200, "term")

df_termdoc = df_repart_range.groupBy("term").agg(collect_list(struct("doc_id", "frequency")).alias("term_frequency_array"), (log(df_count / count("*")) / log(lit(10))).alias("idf"))

df_termdoc_ordered = df_termdoc.orderBy("term")

### (3b) Group By -> Order By

In [10]:
df_unstruct.createOrReplaceTempView("v_word_count")

df_termdoc = spark.sql(f"""
with cte as (
SELECT term, COLLECT_LIST(STRUCT(doc_id, frequency)) AS term_frequency_array, log({df_count} / count(*)) / log(10) as idf
FROM v_word_count
GROUP BY term
)
SELECT term, TRANSFORM(term_frequency_array, x -> STRUCT(x.doc_id as doc_id, x.frequency * idf as tfidf)) as term_tfidf_array
FROM cte
""")

df_termdoc_ordered = df_termdoc.sort("term")

### (3c) Group By (Cache) -> Order By

In [13]:
df_unstruct.createOrReplaceTempView("v_word_count")

df_termdoc = spark.sql(f"""
with cte as (
SELECT term, COLLECT_LIST(STRUCT(doc_id, frequency)) AS term_frequency_array, log({df_count} / count(*)) / log(10) as idf
FROM v_word_count
GROUP BY term
)
SELECT term, TRANSFORM(term_frequency_array, x -> STRUCT(x.doc_id as doc_id, x.frequency * idf as tfidf)) as term_tfidf_array
FROM cte
""")

df_termdoc_cache = df_termdoc.persist(StorageLevel.DISK_ONLY)

df_termdoc_ordered = df_termdoc_cache.sort("term")

In [None]:
df_termdoc_ordered.explain()

In [13]:
df_unstruct_persisted.unpersist()

DataFrame[doc_id: string, term: string, frequency: double]

In [14]:
df_term_frequency.unpersist()

DataFrame[doc_id: string, term_frequency: array<struct<term:string,frequency:double>>]

In [15]:
df_termdoc.unpersist()

DataFrame[term: string, term_frequency_array: array<struct<doc_id:string,frequency:double>>, idf: double]

## TERM-DOC COUNT WITH EXPLODE + GROUP BY

In [10]:
df_cached = df_rm_stopword.select('id', 'words').persist(StorageLevel.MEMORY_ONLY)

df_explode = df_cached.select(col("id").alias('doc_id'), explode(col("words")).alias('term'))

df_term_doc = df_explode.groupBy("doc_id", "term").agg(count("*").alias("frequency"))

df_term_doc.createOrReplaceTempView("v_word_count")

df_termdoc = spark.sql(f"""
with cte as (
SELECT term, COLLECT_LIST(STRUCT(doc_id, frequency)) AS term_frequency_array, log({df_count} / count(*)) / log(10) as idf
FROM v_word_count
GROUP BY term
)
SELECT term, TRANSFORM(term_frequency_array, x -> STRUCT(x.doc_id as doc_id, x.frequency * idf as tfidf)) as term_tfidf_array
FROM cte
""")

df_termdoc_ordered = df_termdoc.sort("term")

In [None]:
df_cached.unpersist()

In [None]:
df_termdoc_ordered.explain()

## SAVE TO HDFS

In [None]:
hdfs_output = f'hdfs://{HDFS_URL}/wiki_out4'
df_termdoc_ordered.write.parquet(hdfs_output, mode="overwrite")

## VIEW HDFS FILE RESULT

In [None]:
hdfs_path1 = f'hdfs://{HDFS_URL}/wiki_out/part-00000-8deffe4c-8667-440b-b25c-53e3911a6917-c000.snappy.parquet'
df = spark.read.parquet(hdfs_path1)

In [None]:
df.show(5)