In [16]:
%%configure -f
{
    "conf": {
        "spark.jars.packages": "com.johnsnowlabs.nlp:spark-nlp_2.12:4.3.1",
        "spark.pyspark.python": "python3",
        "spark.pyspark.virtualenv.enabled": "true",
        "spark.pyspark.virtualenv.type": "native",
        "spark.pyspark.virtualenv.bin.path": "/usr/bin/virtualenv",
        "spark.executor.memory": "4g"   
    }
}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1685132468463_0003,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1685132468463_0003,pyspark,idle,Link,Link,✔


In [17]:
sc.install_pypi_package('spark-nlp')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting spark-nlp
  Using cached https://files.pythonhosted.org/packages/e2/88/943fb14a2b024bf328bcc448837f75114ac97478db9def2e2042b2818aaa/spark_nlp-4.4.3-py2.py3-none-any.whl
Installing collected packages: spark-nlp
Successfully installed spark-nlp-4.4.3

In [18]:
# For general Spark
from pyspark.sql import SparkSession

# For SparkNLP
import sparknlp
from sparknlp.base import *
from sparknlp.annotator import *
from sparknlp.pretrained import PretrainedPipeline

# For PySpark MLlib
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, Tokenizer, HashingTF, IDF, CountVectorizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# In case you need to handle data types
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
data = spark.read.parquet('s3://newemrbuckey3/book_info.parquet')


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
print('Total Columns: %d' % len(data.dtypes))
print('Total Rows: %d' % data.count())
data.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Total Columns: 10
Total Rows: 7113
root
 |-- book_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- subtitle: string (nullable = true)
 |-- authors: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- published_date: long (nullable = true)
 |-- description: string (nullable = true)
 |-- categories: string (nullable = true)
 |-- imageLinks.smallThumbnail: string (nullable = true)
 |-- imageLinks.thumbnail: string (nullable = true)

In [21]:
data.select("description", "categories").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+
|         description|          categories|
+--------------------+--------------------+
|Worth a read for ...|                 Law|
|Examines the impa...|   Political Science|
|When the reporter...|Blomkvist, Mikael...|
|The classic intro...|Business & Economics|
|Article abstracts...|              Canada|
|A brilliantly res...|          True Crime|
|Business has rece...|            Religion|
|The basis for the...|          Psychology|
|A riveting true s...|             History|
|This writing is a...|             Fiction|
|"Discover the tru...| Juvenile Nonfiction|
|Law | Book | Cult...|             History|
|Personal reinvent...|             History|
|The Russian Revol...|             History|
|Scott Bukatman's ...|    American fiction|
|Discharged batter...|       Accumulateurs|
|_________________...|             Fiction|
|As we struggle to...|   Political Science|
|A math curriculum...|         Mathematics|
|In short, Culture...|      Soci

In [22]:
data.groupBy('categories').count().orderBy('count', ascending=False).show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-----+
|          categories|count|
+--------------------+-----+
|Business & Economics|  761|
|      Social Science|  537|
|             Fiction|  513|
|             History|  498|
|   Political Science|  478|
|           Education|  371|
|          Psychology|  316|
|                 Law|  249|
|            Religion|  244|
|             Science|  195|
|Language Arts & D...|  166|
|             Medical|  150|
|Biography & Autob...|  119|
|          Philosophy|  105|
|  Literary Criticism|  101|
| Juvenile Nonfiction|   88|
|Technology & Engi...|   75|
|              Nature|   68|
|    Juvenile Fiction|   64|
|           Self-Help|   55|
+--------------------+-----+
only showing top 20 rows

In [23]:
total_categories = data.select('categories').distinct().count()
print("Total number of categories:", total_categories)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Total number of categories: 963

In [24]:
from pyspark.sql.functions import col

# First we get the categories count
categories_count = data.groupBy('categories').count()

# We filter those categories which have more than 100 books
major_categories = categories_count.filter(col("count") > 100)

# Now we will only keep rows in the data that belong to the major_categories
data = data.join(major_categories, "categories", "inner")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
data.groupBy('categories').count().orderBy('count', ascending=False).show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-----+
|          categories|count|
+--------------------+-----+
|Business & Economics|  761|
|      Social Science|  537|
|             Fiction|  513|
|             History|  498|
|   Political Science|  478|
|           Education|  371|
|          Psychology|  316|
|                 Law|  249|
|            Religion|  244|
|             Science|  195|
|Language Arts & D...|  166|
|             Medical|  150|
|Biography & Autob...|  119|
|          Philosophy|  105|
|  Literary Criticism|  101|
+--------------------+-----+

In [26]:
from pyspark.sql.functions import col

# Target number of instances per category
target = 300

# Define a dictionary with sample fractions for each category
sample_fractions = {
    "Business & Economics": target/761,
    "Social Science": target/537,
    "Fiction": target/513,
    "History": target/498,
    "Political Science": target/478,
    "Education": target/371,
    "Psychology": target/316,
    "Law": target/249,
    "Religion": target/244,
    "Science": target/195,
    "Language Arts & Disciplines": target/166,
    "Medical": target/150,
    "Biography & Autobiography": target/119,
    "Philosophy": target/105,
    "Literary Criticism": target/101
}

# Create an empty DataFrame to store the sampled data
sampled_data = spark.createDataFrame([], data.schema)

# Loop through each category and sample the data accordingly
for category, fraction in sample_fractions.items():
    category_data = data.filter(col("categories") == category)
    # Oversample when fraction > 1, undersample when fraction < 1
    sampled_category_data = category_data.sample(withReplacement=fraction>1, fraction=abs(fraction))
    sampled_data = sampled_data.union(sampled_category_data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:

print('Total Rows: %d' % data.count())


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Total Rows: 4803

In [28]:
from sparknlp.training import CoNLL
from sparknlp.annotator import *
from sparknlp.common import *
from sparknlp.base import *

from sparknlp.annotator import *
from sparknlp.base import *
from sparknlp.common import *

from pyspark.ml import Pipeline
from sparknlp.training import CoNLL
from sparknlp import DocumentAssembler

from sparknlp.annotator import Lemmatizer
from sparknlp.annotator import LemmatizerModel
from sparknlp.base import Finisher, DocumentAssembler
from sparknlp.annotator import Tokenizer

from pyspark.ml.feature import CountVectorizer, StringIndexer, IndexToString
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Define your document assembler, tokenizer, stopwords cleaner, lemmatizer and finisher
documentAssembler = DocumentAssembler().setInputCol("description").setOutputCol("document")
tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token")
stopwords_cleaner = StopWordsCleaner().setInputCols(["token"]).setOutputCol("cleaned_token")
lemmatizer = LemmatizerModel.pretrained("lemma_antbnc").setInputCols(["cleaned_token"]).setOutputCol("lemma")
finisher = Finisher().setInputCols(["lemma"])

# Define count vectors and string indexer
countVectors = CountVectorizer(inputCol="finished_lemma", outputCol="features")
label_stringIdx = StringIndexer(inputCol = "categories", outputCol = "label")

# Define your pipeline
pipeline = Pipeline(stages=[documentAssembler, 
                            tokenizer, 
                            stopwords_cleaner, 
                            lemmatizer, 
                            finisher, 
                            countVectors,
                            label_stringIdx])

# Fit and transform your data with the pipeline
pipeline_model = pipeline.fit(sampled_data)
dataset = pipeline_model.transform(sampled_data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[OK!]

In [29]:

# Split your data into training and test datasets
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)

# Define your logistic regression model
lr = LogisticRegression()

# Define your parameter grid
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.1, 0.3, 0.5]) 
             .addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2]) 
             .addGrid(lr.maxIter, [1, 5, 10])
             .build())

# Define your evaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

# Define your cross-validator
crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)

# Fit the model using the cross-validator
cross_val_Model = crossval.fit(trainingData)
predictions = cross_val_Model.transform(testData)
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Test Error = 0.263481

In [30]:
best_model = cross_val_Model.bestModel
bestRegParam = best_model._java_obj.getRegParam()
bestElasticNetParam = best_model._java_obj.getElasticNetParam()
bestMaxIter = best_model._java_obj.getMaxIter()

print("Best RegParam: ", bestRegParam)
print("Best ElasticNetParam: ", bestElasticNetParam)
print("Best MaxIter: ", bestMaxIter)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Best RegParam:  0.1
Best ElasticNetParam:  0.1
Best MaxIter:  5

In [32]:
# Get the StringIndexerModel
string_indexer = pipeline_model.stages[-1]

# Get the label to index mapping
label_to_index = string_indexer.labels

# Print all labels with their corresponding index
for i, label in enumerate(label_to_index):
    print("Index: ", i, " Label: ", label)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Index:  0  Label:  Medical
Index:  1  Label:  Religion
Index:  2  Label:  Psychology
Index:  3  Label:  Political Science
Index:  4  Label:  Social Science
Index:  5  Label:  Science
Index:  6  Label:  Literary Criticism
Index:  7  Label:  Education
Index:  8  Label:  Fiction
Index:  9  Label:  History
Index:  10  Label:  Law
Index:  11  Label:  Language Arts & Disciplines
Index:  12  Label:  Philosophy
Index:  13  Label:  Biography & Autobiography
Index:  14  Label:  Business & Economics

In [47]:
import numpy as np
from pyspark.sql import functions as F

# Get the CountVectorizerModel
count_vectorizer = pipeline_model.stages[5]

# Get the vocabulary
vocab = count_vectorizer.vocabulary

# Number of classes
num_classes = best_model.numClasses

spark.conf.set('spark.sql.repl.eagerEval.maxNumRows', 20)  # number of rows to display
spark.conf.set('spark.sql.repl.eagerEval.maxColumnWidth', 30)  # adjust column width

# Get the StringIndexerModel
string_indexer = pipeline_model.stages[-1]

# Get the label to index mapping
label_to_index = string_indexer.labels

# Loop through each class and extract top words
for i, label in enumerate(label_to_index):
    coefficients = best_model.coefficientMatrix.toArray()[i]  # take the coefficients of class i
    coefficients = coefficients.tolist()  # convert numpy array to Python list

    # Zip together the vocabulary (words) and coefficients and make a DataFrame
    coeff_df = spark.createDataFrame(zip(vocab, coefficients), ["word", "coefficient"])

    # Show the top 20 words with the highest coefficients
    print(f"Top 20 words for category {label}")
    coeff_df.sort(F.col("coefficient").desc()).show(20, truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Top 20 words for category Medical
+---------------------+-------------------+
|word                 |coefficient        |
+---------------------+-------------------+
|Pocket               |0.7748131571758332 |
|repository           |0.6270174014926888 |
|capstone             |0.602352476022574  |
|psychology/psychiatry|0.6014496849538802 |
|Thought-provoking    |0.5557682776045978 |
|caregiving           |0.522641671958786  |
|Disparities          |0.508823056469649  |
|SEP                  |0.508823056469649  |
|re-position          |0.5003454554333434 |
|standardized         |0.4938412076574927 |
|Pathways             |0.45749015649320507|
|community-wide       |0.45749015649320507|
|continuum            |0.4484814144004059 |
|bioethicists         |0.423566505641134  |
|12-13                |0.4210543543290499 |
|well-reasoned        |0.41228546389779613|
|Geriatric            |0.4019872179041669 |
|contextual-behavioral|0.39602824533820385|
|herb                 |0.38510210550602997