In [None]:
# HPC cluster's pyspark is 3.1.2
!pip install --upgrade datasets apache-beam pyspark==3.1.2 findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk/jre"
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.12:0.12.0 pyspark-shell' 

In [6]:
current_context = SparkContext.getOrCreate()
current_context.stop()

In [7]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import pyspark

conf = SparkConf().setAppName("LDA") \
                  .setMaster('spark://cs071:37356')


sc = SparkContext(conf=conf)

spark = SparkSession(sc)

udf_registration = pyspark.sql.udf.UDFRegistration(spark)

spark

In [8]:
from datasets import load_dataset
import pandas as pd
import time

# LDA parameters, go to max in production
MAX_ITER = 100
CHECKPOINT = 5
K = 100

In [9]:
# change it to this to use the full 1m dataset
# dataset = load_dataset("wikipedia", "20220301.en")
dataset = load_dataset("wikipedia", "20220301.en")
dataset

Found cached dataset wikipedia (/home/tmv7269/.cache/huggingface/datasets/wikipedia/20220301.en/2.0.0/aa542ed919df55cc5d3347f42dd4521d05ca68751f50dbc32bae2a7f1e167559)


  0%|          | 0/1 [00:00<?, ?it/s]

DatasetDict({
    train: Dataset({
        features: ['id', 'url', 'title', 'text'],
        num_rows: 6458670
    })
})

In [None]:
sparkDF = spark.createDataFrame(dataset['train'])
sparkDF.columns

23/04/27 18:54:06 ERROR StandaloneSchedulerBackend: Application has been killed. Reason: Master removed our application: KILLED
23/04/27 18:54:06 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exiting due to error from cluster scheduler: Master removed our application: KILLED
	at org.apache.spark.scheduler.TaskSchedulerImpl.error(TaskSchedulerImpl.scala:873)
	at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.dead(StandaloneSchedulerBackend.scala:154)
	at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint.markDead(StandaloneAppClient.scala:262)
	at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$receive$1.applyOrElse(StandaloneAppClient.scala:169)
	at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
	at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
	at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
	at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$Mess

In [None]:
sparkDF.count()

In [None]:
from pyspark.ml.feature import StopWordsRemover, RegexTokenizer, CountVectorizer
from pyspark.sql.functions import col,udf
from pyspark.sql.types import IntegerType

In [None]:
tokenizer = RegexTokenizer(inputCol='text',outputCol='words', pattern = '[^a-zA-Z]')
tokenized_df = tokenizer.transform(sparkDF).drop('text')
tokenized_df.head()

In [None]:
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
removed_df = remover.transform(tokenized_df).drop('words')
removed_df.head()

In [None]:
cv = CountVectorizer(inputCol="filtered", outputCol="features", minDF=2.0).fit(removed_df)

lda_count = cv.transform(removed_df).drop('filtered')
lda_count.head()

In [None]:
from pyspark.ml.clustering import LDA

# create LDA with K topics

start = time.time()
lda = LDA(k=K, seed=1, optimizer="em", maxIter=MAX_ITER, checkpointInterval=CHECKPOINT)
model = lda.fit(lda_count)
end = time.time()

print(f"Time elapsed: {end-start:.2f} seconds")

In [None]:
topicIndices = model.describeTopics(maxTermsPerTopic = 5)
vocabList = cv.vocabulary

for row in topicIndices.collect():
    print(f"Topic {row.topic + 1}: ")
    for topic, weight in zip(row.termIndices, row.termWeights):
        print(f"{vocabList[topic]} {weight:.2E}")
    print()