In [1]:
isStreaming = True
inputCol = "abstract"
outputCol = "probability vectors"
modelType = "AST"
model = "AST" # AST/PLSA/ARTM

inputCol = "abstract"
outputCol = "probability vectors"

#### Download kaggle token and mount Google drive

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
from google.colab import files
files.upload()
 
!rm -r ~/.kaggle
!mkdir ~/.kaggle
!mv ./kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

Saving kaggle.json to kaggle.json
rm: cannot remove '/root/.kaggle': No such file or directory


#### Install Dependencies

In [4]:
%%capture
!pip install pyspark
!pip install bigartm

In [5]:
%%capture
import sys
import os
# installation of GOT (G eneralization O ver T axonomies) software package
!mkdir gotlib
if not os.listdir("gotlib"):
  !git clone https://github.com/dmitsf/GOT.git gotlib
sys.path.append('gotlib')

#library name spelled in caps in setup causes errors in Apache Spark
!cp /content/drive/MyDrive/setup.py /content/gotlib/setup.py

!cd gotlib && python setup.py install

In [6]:
# Downloading taxonomies
%%capture
!cp /content/drive/MyDrive/arxiv_category_taxonomy.csv arxiv_category_taxonomy.csv 

!python3 /content/gotlib/got/taxonomies/taxonomy.py arxiv_category_taxonomy.csv 

In [7]:
import pandas as pd
with open("taxonomy_leaves.txt") as f:
    strings = [l.strip() for l in f.readlines()]

taxanomy_leaves_df = pd.DataFrame.from_dict(dict(enumerate(strings)), orient='index', columns=[inputCol])

In [8]:
%%capture
!pip install kaggle
!kaggle datasets download -d Cornell-University/arxiv

!mkdir Dataset
!cp arxiv.zip Dataset/arxiv.zip
!unzip -q Dataset/arxiv.zip -d Dataset
!rm Dataset/arxiv.zip

Installation of GOT libriary on Apache Spark

In [9]:
from pyspark import SparkConf
from pyspark.context import SparkContext

sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))
# add gotlib egg for Spark
sc.addFile(path='/content/gotlib/dist/' + os.listdir('/content/gotlib/dist/')[0])

Dataset schema

In [10]:
from pyspark.sql.types import *

schema = StructType([
StructField("id",StringType(),True), 
StructField("submitter",StringType(),True), 
StructField("authors",StringType(),True),
StructField("title", StringType(), True),
StructField("comments", StringType(), True),
StructField("journal-ref", StringType(), True),
StructField("doi", StringType(), True),
StructField("report-no", StringType(), True),
StructField("categories", StringType(), True),
StructField("abstract", StringType(), True)])

Load Dataset

In [11]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master('local').appName('Streaming Probability Topic Modeling Pipeline').config('spark.ui.port', '4050').getOrCreate()
spark.conf.set('spark.rapids.sql.enabled','true/false')


# Arxiv Dataset
if isStreaming:
  sentenceDataFrame = spark.readStream.format('json').schema(schema).option('header', True).load('Dataset')


# Taxonomy 
taxanomyDataFrame = spark.createDataFrame(taxanomy_leaves_df)

#### Spark Pipeline Functions

##### Tokenizers

In [12]:
from pyspark import keyword_only  ## < 2.0 -> pyspark.ml.util.keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, HasPredictionCol, Param, Params, TypeConverters
# Available in PySpark >= 2.3.0 
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable  
from pyspark.ml.pipeline import Estimator, Model, Pipeline
from pyspark.ml.feature import CountVectorizer, StopWordsRemover, RegexTokenizer, CountVectorizerModel
from pyspark.sql.functions import udf

import re
import numpy as np
import artm
from got.asts.ast import EASA

import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords


class ParGenMWordTokenizer(
        Transformer, HasInputCol, HasOutputCol,
        DefaultParamsReadable, DefaultParamsWritable):


    @keyword_only
    def __init__(self, inputCol=None, outputCol=None, lowerize=None):
        super(ParGenMWordTokenizer, self).__init__()
        self.lowerize =  Param(self, "lowerize", "")
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None, lowerize=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def getLowerize(self):
        return self.getOrDefault(self.lowerize)

    # Required in Spark >= 3.0
    def setInputCol(self, value):
        """
        Sets the value of :py:attr:`inputCol`.
        """
        return self._set(inputCol=value)

    # Required in Spark >= 3.0
    def setOutputCol(self, value):
        """
        Sets the value of :py:attr:`outputCol`.
        """
        return self._set(outputCol=value)

    def _transform(self, dataset):
        def clear_text(text):
          pat = re.compile(r'[^A-Za-z0-9 \-\n\r.,;!?А-Яа-я]+')
          cleared_text = re.sub(pat, ' ', text)

          if self.getLowerize():
              cleared_text = cleared_text.lower()

          tokens = cleared_text.split()
          return tokens

        t = ArrayType(StringType())
        out_col = self.getOutputCol()
        in_col = dataset[self.getInputCol()]
        return dataset.withColumn(out_col, udf(clear_text, t)(in_col))

class StopWordsTokenizer(
        Transformer, HasInputCol, HasOutputCol,
        DefaultParamsReadable, DefaultParamsWritable):


    @keyword_only
    def __init__(self, inputCol=None, outputCol=None, stop_words=None):
        super(StopWordsTokenizer, self).__init__()
        self.stop_words = Param(self, "stop_words", "")
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

        self.reTokenizer = RegexTokenizer(pattern='[^A-Za-z\-А-Яа-я]+', gaps=True, minTokenLength=3)
        self.remover = StopWordsRemover(stopWords=stop_words)
        self.cv = CountVectorizer(vocabSize=500000, minTF=3, minDF=5, maxDF=5e5)
        self.trained = False


        # Tokenizer: ["User defined input column"] -> ["words"]
        self.reTokenizer.setOutputCol("words")
        
        # Remover: ["words"] -> ["tokens"]
        self.remover.setInputCol("words")
        self.remover.setOutputCol("tokens")

        # CountVectorizer: ["tokens"] -> ["User defined output column"]
        self.cv.setInputCol("tokens")
  

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None, stop_words=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)
    
    def getVocab(self):
        return self.vocabulary

    # Required in Spark >= 3.0
    def setInputCol(self, value):
        """
        Sets the value of :py:attr:`inputCol`.
        """
        return self._set(inputCol=value)

    # Required in Spark >= 3.0
    def setOutputCol(self, value):
        """
        Sets the value of :py:attr:`outputCol`.
        """
        return self._set(outputCol=value)
      
    def setTrain(self):
      return self._set(train=True)

    def _tokenize(self, dataset):
        self.reTokenizer.setInputCol(self.getInputCol())
        dataset =  self.reTokenizer.transform(dataset)

        dataset = self.remover.transform(dataset)
        return dataset

    def save(self, path):
        self.modelcv.save(path)

    def load(self, path):
        self.modelcv = CountVectorizerModel.load(path)
        self.vocabulary = self.modelcv.vocabulary
        self.trained = True
        

    def fit(self, dataset):
        dataset = self._tokenize(dataset)
        self.cv.setOutputCol(self.getOutputCol())
        self.modelcv = self.cv.fit(dataset)
        self.vocabulary = self.modelcv.vocabulary
        self.trained = True
        return self

    def _transform(self, dataset):
        dataset = self._tokenize(dataset)

        if not self.trained:
          self.fit(dataset)
        
        dataset = self.modelcv.transform(dataset)
        return dataset

  return re.findall(re.compile("[\w']+", re.U), text)
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


##### Estimators

In [13]:
from pyspark.ml.functions import vector_to_array
from pyspark.sql.functions import array

class ProbabilityMatrixEstimator(Estimator,
        HasInputCol, HasPredictionCol,
        DefaultParamsReadable, DefaultParamsWritable):

    @keyword_only
    def __init__(self, prepared_string_tokens, inputCol=None, predictionCol=None, 
                 modelType="AST", vocabulary=None):
        super(ProbabilityMatrixEstimator, self).__init__()
        self.model = {"AST": ASTRelevanceMatrixModel,
                      "PLSA": PLSAProbabilityMatrixModel,
                      "LDA": LDAProbabilityMatrixModel,
                      "ARTM": ARTMProbabilityMatrixModel}
        self.modelType =  Param(self, "modelType", "")
        self.prepared_string_tokens =  Param(self, "prepared_string_tokens", "")
        self.vocabulary = Param(self, "vocabulary", "")
        self.prepared_strings = None
        kwargs = self._input_kwargs
        self.setParams(**kwargs)
        
        if self.getOrDefault(self.modelType) not in self.model.keys():
          raise Exception("Wrong model type")

        if self.getOrDefault(self.prepared_string_tokens) is not None:
          self.prepared_strings = [' '.join(t) for t in self.getOrDefault(self.prepared_string_tokens)]
        self.modelEstimator = None
      


    # Required in Spark >= 3.0
    def setInputCol(self, value):
        """
        Sets the value of :py:attr:`inputCol`.
        """
        return self._set(inputCol=value)

    # Required in Spark >= 3.0
    def setPredictionCol(self, value):
        """
        Sets the value of :py:attr:`predictionCol`.
        """
        return self._set(predictionCol=value)

    def setModelType(self, value):
        """
        Sets the value of :py:attr:`modelType`.
        """
        return self._set(modelType=value)
    def getModelEstimator(self):
        return self.modelEstimator


    @keyword_only
    def setParams(self, prepared_string_tokens, inputCol=None, predictionCol=None, modelType="AST", vocabulary=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)        
        
    def _fit(self, dataset):
      self.modelEstimator = self.model[self.getOrDefault(self.modelType)](
            inputCol = self.getInputCol(),
            prepared_strings = self.prepared_strings,
            predictionCol = self.getPredictionCol(),
            vocabulary=self.getOrDefault(self.vocabulary),
            topics_num=15, tokens_num=20, iter_over_document=5, iter_over_collection=20)
      return self.modelEstimator

        
class ProbabilityMatrixModel(Model, HasInputCol, HasPredictionCol,
        DefaultParamsReadable, DefaultParamsWritable):

    @keyword_only
    def __init__(self, inputCol=None, predictionCol=None,
                prepared_strings=None,
                vocabulary=None,
                topics_num=15, tokens_num=20, iter_over_document=5, iter_over_collection=20):
        super(ProbabilityMatrixModel, self).__init__()
        self.prepared_strings  =  Param(self, "prepared_strings", "")
        self.vocabulary = Param(self, "vocabulary", "")

        # artm models params(for retrain)
        self.topics_num = Param(self, "topics_num", "")
        self.tokens_num = Param(self, "tokens_num", "")
        self.iter_over_document = Param(self, "iter_over_document", "")
        self.iter_over_collection = Param(self, "iter_over_collection", "")
        
        kwargs = self._input_kwargs
        self.setParams(**kwargs) 
        self.model = None
        

    @keyword_only
    def setParams(self, inputCol=None, predictionCol=None,
                prepared_strings=None,
                vocabulary=None,
                topics_num=15, tokens_num=20, iter_over_document=5, iter_over_collection=20):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

class ASTRelevanceMatrixModel(ProbabilityMatrixModel):
    def _transform(self, dataset):
        def make_substrings(tokens, k=4):
          for i in range(max(len(tokens) - k + 1, 1)):
              yield ' '.join(tokens[i:i + k])

        def get_relevance_matrix(cleared_tokens):
          ast = EASA(list(make_substrings(cleared_tokens)))
          row = [float(ast.score(s)) for s in self.getOrDefault(self.prepared_strings)]
          return row

        t = ArrayType(DoubleType())
        out_col = self.getPredictionCol()
        in_col = dataset[self.getInputCol()]
        return dataset.withColumn(out_col, udf(get_relevance_matrix, t)(in_col))
    
    def train(self, dataset):
      return self


class ARTMModel(ProbabilityMatrixModel):
  def _transform(self, dataset):
    n_wd = np.apply_along_axis(lambda x: x[0].toArray(), 1, dataset[[self.getInputCol()]].toPandas().to_numpy()).T
    bv = artm.BatchVectorizer(data_format='bow_n_wd',
                              n_wd=n_wd,
                              vocabulary=self.getOrDefault(self.vocabulary))
    if self.model is None:
      print("the ARTM model is not pretrained, training on the transferred dataset")
      self.train(dataset)
    res = self.model.transform(bv).T
    res['id'] = dataset[['id']].toPandas()
    res = spark.createDataFrame(res).select('id', array(['topic_' + str(i) for i in range(1, 15)]).alias(self.getInputCol()))
    
    return dataset.join(res, dataset.id == res.id, 'left')

  def loadModel(self, path):
    try:
      self.model = artm.load_artm_model(path)
    except:
      raise Exception("Wrong model path or model type")
    return self

  def saveModel(self, path):
    self.model.dump_artm_model(path)


class PLSAProbabilityMatrixModel(ARTMModel):
  def train(self, dataset):
    n_wd = np.apply_along_axis(lambda x: x[0].toArray(), 1, dataset[[self.getInputCol()]].toPandas().to_numpy()).T
    bv = artm.BatchVectorizer(data_format='bow_n_wd',
                              n_wd=n_wd,
                              vocabulary=self.getOrDefault(self.vocabulary))
    
    self.model = artm.ARTM(num_topics=self.getOrDefault(self.topics_num), cache_theta=True,
                       scores=[artm.PerplexityScore(name='PerplexityScore',
                                                    dictionary=bv.dictionary)])
    self.model.scores.add(artm.SparsityPhiScore(name='SparsityPhiScore'))
    self.model.scores.add(artm.SparsityThetaScore(name='SparsityThetaScore'))
    self.model.scores.add(artm.TopicKernelScore(name='TopicKernelScore',
                                                probability_mass_threshold=0.3))
    self.model.scores.add(artm.TopTokensScore(name='TopTokensScore', 
                                              num_tokens=self.getOrDefault(self.topics_num)))
    self.model.num_document_passes = self.getOrDefault(self.iter_over_document)
    
    
    self.model.initialize(bv.dictionary)
    self.model.fit_offline(batch_vectorizer=bv, 
                           num_collection_passes=self.getOrDefault(self.iter_over_collection))
    return self
    

class LDAProbabilityMatrixModel(ARTMModel):
  def train(self, dataset):
    n_wd = np.apply_along_axis(lambda x: x[0].toArray(), 1, dataset[[self.getInputCol()]].toPandas().to_numpy()).T
    bv = artm.BatchVectorizer(data_format='bow_n_wd',
                              n_wd=n_wd,
                              vocabulary=self.getOrDefault(self.vocabulary))

    self.model = artm.LDA(num_topics=self.getOrDefault(self.topics_num), cache_theta=True)
    self.model.num_document_passes = self.getOrDefault(self.iter_over_document)
    
    
    self.model.initialize(bv.dictionary)
    self.model.fit_offline(batch_vectorizer=bv, 
                          num_collection_passes=self.getOrDefault(self.iter_over_collection))
    return self

class ARTMProbabilityMatrixModel(ARTMModel):
  def train(self, dataset):
    n_wd = np.apply_along_axis(lambda x: x[0].toArray(), 1, dataset[[self.getInputCol()]].toPandas().to_numpy()).T
    bv = artm.BatchVectorizer(data_format='bow_n_wd',
                              n_wd=n_wd,
                              vocabulary=self.getOrDefault(self.vocabulary))

    self.model = artm.ARTM(num_topics=self.getOrDefault(self.topics_num), cache_theta=True,
                       scores=[artm.PerplexityScore(name='PerplexityScore',
                                                    dictionary=bv.dictionary)],
                       regularizers=[artm.SmoothSparseThetaRegularizer(name='SparseTheta',
                                                                       tau=-0.15)])
    self.model.scores.add(artm.SparsityPhiScore(name='SparsityPhiScore'))
    self.model.scores.add(artm.SparsityThetaScore(name='SparsityThetaScore'))
    self.model.scores.add(artm.TopicKernelScore(name='TopicKernelScore',
                                                      probability_mass_threshold=0.3))
    self.model.scores.add(artm.TopTokensScore(name='TopTokensScore', num_tokens=self.getOrDefault(self.tokens_num)))

    self.model.regularizers.add(artm.SmoothSparsePhiRegularizer(name='SparsePhi', tau=-0.1))
    self.model.regularizers.add(artm.DecorrelatorPhiRegularizer(name='DecorrelatorPhi', tau=1.5e+5))

    self.model.num_document_passes = self.getOrDefault(self.iter_over_document)

    
    self.model.initialize(bv.dictionary)
    self.model.fit_offline(batch_vectorizer=bv, num_collection_passes=self.getOrDefault(self.iter_over_collection))
    return self


  and should_run_async(code)


#### Pipeline

In [14]:
!cp -r /content/drive/MyDrive/count-vectorizer count-vectorizer
!cp -r /content/drive/MyDrive/artm_model ARTM
!cp -r /content/drive/MyDrive/plsa_model PLSA

In [15]:
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning) 

In [16]:
stop_words = stopwords.words('english')
stop_words.extend(['allow', 'almost', 'also', 'approach', 'asume', 'bad', 'behavior',
                  'consider', 'constant', 'control', 'datum', 'density', 'describe',
                  'description', 'direction', 'discuss', 'edu', 'effect',
                  'effective', 'energy', 'example', 'experimental', 'field', 'find',
                  'fine', 'first', 'form', 'from', 'give', 'high', 'investigate',
                  'know', 'known', 'large', 'lead', 'let', 'long', 'low', 'make', 'model',
                  'new', 'non', 'observe', 'obtain', 'paper', 'parameter',
                  'particular', 'point', 'positive', 'present', 'problem',
                  'property', 'propose', 'result', 'sample', 'search', 'show',
                  'small', 'state', 'study', 'subject', 'suggest', 'suppose',
                  'system', 'theory', 'time', 'use', 'well', 'word', 'work', ' ', ''])

In [19]:
from pyspark.sql.functions import concat_ws, col

tokenizer = {"AST": ParGenMWordTokenizer(
    inputCol=inputCol, outputCol="tokens",  
    lowerize=True), 
            "ARTM": StopWordsTokenizer(
    inputCol=inputCol, outputCol="vectors", 
    stop_words=stop_words)}

#load pretrained tokenizer
tokenizer["ARTM"].load("count-vectorizer")

# Tokenize Taxonomy for AST
taxanomyDataFrame = tokenizer["AST"].transform(taxanomyDataFrame)
string_tokens = taxanomyDataFrame.toPandas().tokens

estimator = ProbabilityMatrixEstimator(inputCol="tokens", 
                                        predictionCol=outputCol, 
                                        modelType=model,
                                        prepared_string_tokens=string_tokens,
                                        vocabulary=tokenizer["ARTM"].getVocab())

if modelType == "ARTM":
  estimator.loadModel(model)
  estimator.setInputCol("vectors")

model_pipeline = Pipeline(stages=[tokenizer[modelType], estimator])

res = model_pipeline.fit(sentenceDataFrame).transform(sentenceDataFrame)

df = res.select(
    res.id,
    concat_ws(", ", col(outputCol).cast("array<string>")).alias(outputCol)
)

In [20]:
query = (df.writeStream
.format('json')
.queryName('ParGenM')
.option('checkpointLocation', 'checkpoint')
.option('path', 'results')
.outputMode('append')
.start()
.awaitTermination() 
)

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: ignored