In [10]:
# !pip install -q pyspark==3.4.1 spark-nlp==5.3.2

In [11]:
#!pip uninstall -y pyspark spark-nlp

In [12]:
import time
import threading
import subprocess
from sparknlp.annotator import BertEmbeddings, Tokenizer
from sparknlp.base import DocumentAssembler
from pyspark.ml import Pipeline
import sparknlp
from pyspark.sql import SparkSession
from pyspark.sql import Row
import re
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
# Flag to control GPU monitoring
gpu_monitoring_active = True
gpu_utilization_data = []

In [13]:

# Function to monitor GPU utilization in parallel
def monitor_gpu_utilization():
    while gpu_monitoring_active:  # Check if the flag is active
        gpu_stats = subprocess.run(['nvidia-smi'], stdout=subprocess.PIPE).stdout.decode('utf-8')
        utilization_match = re.search(r'(\d+)%\s+Default', gpu_stats)
        if utilization_match:
            gpu_utilization = int(utilization_match.group(1))
            gpu_utilization_data.append(gpu_utilization)

        # Check the flag every second within a longer sleep to allow for early exit
        for _ in range(1):  # Total of 5 seconds (adjust based on your need)
            if not gpu_monitoring_active:
                return
            time.sleep(1)

In [14]:
def update_plot(frame):
    plt.cla()  # Clear the previous plot
    plt.plot(gpu_utilization_data, label='GPU Utilization (%)')
    plt.ylim(0, 100)  # GPU utilization percentage is between 0 and 100
    plt.xlabel('Time (seconds)')
    plt.ylabel('GPU Utilization (%)')
    plt.title('Real-time GPU Utilization')
    plt.legend(loc='upper right')

In [15]:
# Start GPU monitoring in a separate thread
gpu_thread = threading.Thread(target=monitor_gpu_utilization)
gpu_thread.daemon = True  # This allows the thread to exit when the main program finishes
gpu_thread.start()

# Initialize the Spark session with GPU support
# spark = SparkSession.builder \
#     .appName("Spark NLP with GPU") \
#     .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp-gpu_2.12:5.3.2") \
#     .config("spark.executor.resource.gpu.amount", "1") \
#     .config("spark.task.resource.gpu.amount", "1") \
#     .config("spark.driver.memory", "16G") \
#     .config("spark.executor.memory", "16G") \
#     .config("spark.executor.cores", "4") \
#     .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
#     .config("spark.sql.execution.arrow.maxRecordsPerBatch", "2048") \
#     .config("spark.driver.extraJavaOptions", "-Dtensorflow.gpu=true") \
#     .getOrCreate()

In [16]:
spark = sparknlp.start(gpu=True)
# Start time tracking
start_time = time.time()

# Generate a large dataset to fully utilize the GPU
# Create a large dataset of strings (not arrays of strings)
text_data = ["This is a sample sentence for embedding."] * 100000  # Repeat sentence to create a large dataset

# Create a DataFrame with single strings (not arrays)
data_df = spark.createDataFrame([Row(text=row) for row in text_data])
data_df.show()

24/09/24 13:59:53 WARN Utils: Your hostname, minti9 resolves to a loopback address: 127.0.1.1; using 192.168.1.101 instead (on interface enp4s0)
24/09/24 13:59:53 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /home/sean/.ivy2/cache
The jars for the packages stored in: /home/sean/.ivy2/jars
com.johnsnowlabs.nlp#spark-nlp-gpu_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-f5c23141-ce5c-443d-bf2c-41b4c1605491;1.0
	confs: [default]
	found com.johnsnowlabs.nlp#spark-nlp-gpu_2.12;5.4.1 in central
	found com.typesafe#config;1.4.2 in central
	found org.rocksdb#rocksdbjni;6.29.5 in central
	found com.amazonaws#aws-java-sdk-s3;1.12.500 in central
	found com.amazonaws#aws-java-sdk-kms;1.12.500 in central


:: loading settings :: url = jar:file:/home/sean/miniforge3/envs/wt2/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found com.amazonaws#aws-java-sdk-core;1.12.500 in central
	found commons-logging#commons-logging;1.1.3 in local-m2-cache
	found commons-codec#commons-codec;1.15 in local-m2-cache
	found org.apache.httpcomponents#httpclient;4.5.13 in local-m2-cache
	found org.apache.httpcomponents#httpcore;4.4.13 in local-m2-cache
	found software.amazon.ion#ion-java;1.0.2 in central
	found joda-time#joda-time;2.8.1 in local-m2-cache
	found com.amazonaws#jmespath-java;1.12.500 in central
	found com.github.universal-automata#liblevenshtein;3.0.0 in central
	found com.google.protobuf#protobuf-java-util;3.0.0-beta-3 in central
	found com.google.protobuf#protobuf-java;3.0.0-beta-3 in central
	found com.google.code.gson#gson;2.3 in central
	found it.unimi.dsi#fastutil;7.0.12 in central
	found org.projectlombok#lombok;1.16.8 in central
	found com.google.cloud#google-cloud-storage;2.20.1 in central
	found com.google.guava#guava;31.1-jre in local-m2-cache
	found com.google.guava#failureaccess;1.0.1 in local-m2-

PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.

In [None]:
# Step 1: Define a document assembler
document_assembler = DocumentAssembler() \
    .setInputCol("text") \
    .setOutputCol("document")

# Step 2: Tokenize the text
tokenizer = Tokenizer() \
    .setInputCols(["document"]) \
    .setOutputCol("token")

# Step 3: Load a larger, more complex pretrained model to increase GPU usage
bert_embeddings = BertEmbeddings.pretrained("bert_large_uncased", "en") \
    .setInputCols(["document", "token"]) \
    .setOutputCol("embeddings") \
    .setCaseSensitive(False) \
    .setBatchSize(1)  # Increase batch size to use more GPU resources

# Step 4: Create a pipeline with document assembler, tokenizer, and embeddings
pipeline = Pipeline(stages=[document_assembler, tokenizer, bert_embeddings])

In [None]:
plt.style.use('seaborn-darkgrid')
fig, ax = plt.subplots()
ani = FuncAnimation(fig, update_plot, interval=1000)
def run_pipeline():
    model = pipeline.fit(data_df)
    embedded_df = model.transform(data_df)
    # Show the embeddings (only the first 2 rows to avoid long output)
    embedded_df.select("embeddings.result").show(2, truncate=False)

pipeline_thread = threading.Thread(target=run_pipeline)
pipeline_thread.start()
plt.show()

In [None]:
pipeline_thread.join()
# End time tracking
end_time = time.time()

# Calculate total time
total_time = end_time - start_time
print(f"Total execution time: {total_time:.2f} seconds")

# Stop GPU monitoring
gpu_monitoring_active = False

# Wait for the GPU monitoring thread to finish
gpu_thread.join()
print("GPU monitoring stopped.")
