## Setup
We need an available Java installation to run pyspark. The easiest way to do this is to install JDK and set the proper paths using conda

In [2]:
# Setup - Run only once per Kernel App
%conda install openjdk -y

# install PySpark
%pip install pyspark==3.4.0

# install spark-nlp
%pip install spark-nlp==5.1.3

# restart kernel
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")

Collecting package metadata (current_repodata.json): done
Solving environment: done


  current version: 23.3.1
  latest version: 23.10.0

Please update conda by running

    $ conda update -n base -c defaults conda

Or to minimize the number of packages updated during conda update use

     conda install conda=23.10.0



## Package Plan ##

  environment location: /opt/conda

  added / updated specs:
    - openjdk


The following packages will be downloaded:

    package                    |            build
    ---------------------------|-----------------
    ca-certificates-2023.08.22 |       h06a4308_0         123 KB
    certifi-2023.7.22          |  py310h06a4308_0         153 KB
    openjdk-11.0.13            |       h87a67e3_0       341.0 MB
    ------------------------------------------------------------
                                           Total:       341.3 MB

The following NEW packages will be INSTALLED:

  openjdk            pkgs/main/linux-64::openjdk-11.0.13-h87a6

## Process data cleaning using job

In [None]:
import sagemaker
session = sagemaker.Session()
bucket = session.default_bucket()
!wget -qO- https://s3.amazonaws.com/auxdata.johnsnowlabs.com/public/jars/spark-nlp-assembly-5.1.3.jar | aws s3 cp - s3://{bucket}/lab8/spark-nlp-assembly-5.1.3.jar
!aws s3 ls s3://{bucket}/lab8/spark-nlp-assembly-5.1.3.jar

In [11]:
%%writefile ../scripts/nlp_tfidf_sentiment.py

import os
import sys
import logging
import argparse

# Import pyspark and build Spark session
import json
import sparknlp
import numpy as np
import pandas as pd
from sparknlp.base import *
from pyspark.ml import Pipeline
from sparknlp.annotator import *
from pyspark.sql import SparkSession
from sparknlp.pretrained import PretrainedPipeline
from pyspark.sql.functions import *
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import CountVectorizer, IDF, Tokenizer
from pyspark.sql.types import *


logging.basicConfig(format='%(asctime)s,%(levelname)s,%(module)s,%(filename)s,%(lineno)d,%(message)s', level=logging.DEBUG)
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))

def main():
    parser = argparse.ArgumentParser(description="app inputs and outputs")
    parser.add_argument("--s3_dataset_path", type=str, help="Path of dataset in S3")
    parser.add_argument("--s3_output_bucket", type=str, help="s3 output bucket")
    parser.add_argument("--s3_output_key_prefix", type=str, help="s3 output key prefix")
    args = parser.parse_args()
    logger.info(f"args={args}")
    
    spark = SparkSession.builder \
    .appName("Spark NLP")\
    .config("spark.driver.memory","16G")\
    .config("spark.driver.maxResultSize", "0") \
    .config("spark.kryoserializer.buffer.max", "2000M")\
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:5.1.3")\
    .getOrCreate()
    
    logger.info(f"Spark version: {spark.version}")
    logger.info(f"sparknlp version: {sparknlp.version()}")
    
    # This is needed to save RDDs which is the only way to write nested Dataframes into CSV format
    sc = spark.sparkContext
    sc._jsc.hadoopConfiguration().set(
        "mapred.output.committer.class", "org.apache.hadoop.mapred.FileOutputCommitter"
    )

    # Downloading the data from S3 into a Dataframe
    logger.info(f"going to read {args.s3_dataset_path}")
    df = spark.read.parquet(args.s3_dataset_path, header=True)
    df = df.repartition(128)
    logger.info(f"finished reading files...")
    
    # process TF-IDF calculation
    # tokenize the text
    tokenizer = Tokenizer(inputCol="combined_text", outputCol="tokens")
    wordsData = tokenizer.transform(df)

    # use CountVectorizer to get term frequency vectors
    cv = CountVectorizer(inputCol="tokens", outputCol="rawFeatures", vocabSize=50000)
    cvModel = cv.fit(wordsData)
    featurizedData = cvModel.transform(wordsData)

    # apply IDF to scale the features
    idf = IDF(inputCol="rawFeatures", outputCol="features")
    idfModel = idf.fit(featurizedData)
    rescaledData = idfModel.transform(featurizedData)

    # write udf functions to extract the indices and values from sparse vectors
    sparse_values = udf(lambda v: v.values.tolist(), ArrayType(DoubleType()))
    sparse_indices = udf(lambda v: v.indices.tolist(), ArrayType(IntegerType()))
    rescaledData = rescaledData.withColumn("values", sparse_values("features")).withColumn('indices',sparse_indices("features"))

    vocab = cvModel.vocabulary

    # define function to map the indices to words
    def map_indices_to_words(indices):
        return [vocab[idx] for idx in indices]

    # register the UDF
    map_indices_udf = udf(map_indices_to_words, ArrayType(StringType()))

    # apply the UDF to create a new column "words"
    rescaledData = rescaledData.withColumn("words", map_indices_udf("indices"))
    
    # process sentiment detection
    MODEL_NAME='sentimentdl_use_twitter'

    documentAssembler = DocumentAssembler()\
        .setInputCol("body")\
        .setOutputCol("document")
    
    use = UniversalSentenceEncoder.pretrained(name="tfhub_use", lang="en")\
     .setInputCols(["document"])\
     .setOutputCol("sentence_embeddings")


    sentimentdl = SentimentDLModel.pretrained(name=MODEL_NAME, lang="en")\
        .setInputCols(["sentence_embeddings"])\
        .setOutputCol("sentiment")

    nlpPipeline = Pipeline(
          stages = [
              documentAssembler,
              use,
              sentimentdl
          ])
    pipelineModel = nlpPipeline.fit(rescaledData)
    results = pipelineModel.transform(rescaledData)
    results = results.withColumn('sentiment_index',explode(results.sentiment.result))
    
    # select the target columns
    output = results.select('subreddit','id','created_utc','body','score','gilded','is_zoom','is_coursera','is_udemy','is_edx','is_linkedin','is_masterclass','is_youtube','is_inperson','is_engineering','is_science','is_social_science','is_arts','is_med','is_law','final_text','combined_text','words','values','sentiment_index')
    
    s3_path = f"s3://{args.s3_output_bucket}/{args.s3_output_key_prefix}/tfidf_sentiment_dummy"
    logger.info(f"going to save dataframe to {s3_path}")
    output.write.mode("overwrite").parquet(s3_path)
    logger.info("all done")
    
if __name__ == "__main__":
    main()

Overwriting ../scripts/nlp_tfidf_sentiment.py


In [2]:
%%time
import boto3
import sagemaker
from sagemaker.spark.processing import PySparkProcessor

account_id = boto3.client('sts').get_caller_identity()['Account']

# Setup the PySpark processor to run the job. Note the instance type and instance count parameters. SageMaker will create these many instances of this type for the spark job.
role = sagemaker.get_execution_role()
spark_processor = PySparkProcessor(
    base_job_name="sm-spark-nlp-tfidf-sentiment",
    image_uri=f"{account_id}.dkr.ecr.us-east-1.amazonaws.com/sagemaker-spark:latest",
    role=role,
    instance_count=8,
    instance_type="ml.m5.xlarge",
    max_runtime_in_seconds=3600,
)

# s3 paths
session = sagemaker.Session()
bucket = session.default_bucket()
s3_dataset_path = f"*"
output_prefix_logs = f"category/logs"
output_bucket = f"*"
output_key_prefix = f"6000_project"

# Run the job now, the arguments array is provided as command line to the Python script
spark_processor.run(
    submit_app="../scripts/nlp_tfidf_sentiment.py",
    submit_jars=[f"s3://{bucket}/lab8/spark-nlp-assembly-5.1.3.jar"],
    arguments=[
        "--s3_dataset_path",
        s3_dataset_path,
        "--s3_output_bucket",
        output_bucket,
        "--s3_output_key_prefix",
        output_key_prefix,
    ],
    spark_event_logs_s3_uri="s3://{}/{}/spark_event_logs".format(bucket, output_prefix_logs),
    logs=False,
)

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml


INFO:sagemaker:Creating processing-job with name sm-spark-nlp-tfidf-sentiment-2023-12-06-08-17-53-599


......................................................................................................................................................................................................................!CPU times: user 2.24 s, sys: 578 ms, total: 2.82 s
Wall time: 18min 5s


## Start a spark session to check the output

In [3]:
# Import pyspark and build Spark session
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder.appName("PySparkApp")
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.2")
    .config(
        "fs.s3a.aws.credentials.provider",
        "com.amazonaws.auth.ContainerCredentialsProvider",
    )
    .getOrCreate()
)

print(spark.version)



:: loading settings :: url = jar:file:/opt/conda/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-8aaeffe5-3c84-4308-a101-4ad247cd1137;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.2.2 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.563 in central
:: resolution report :: resolve 312ms :: artifacts dl 18ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.11.563 from central in [default]
	org.apache.hadoop#hadoop-aws;3.2.2 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	---------------------------------------------

3.4.0


## Check the output data

In [4]:
%%time
s3_path = f"*"
print(f"reading tfidf_sentiment_dummy from {s3_path}")
tfidf_sentiment_dummy = spark.read.parquet(s3_path, header=True)
print(f"shape of the comments dataframe is {tfidf_sentiment_dummy.count():,}x{len(tfidf_sentiment_dummy.columns)}")

reading tfidf_sentiment_dummy from s3a://sagemaker-us-east-1-640225923506/6000_project/tfidf_sentiment_dummy


23/12/06 08:37:26 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties

shape of the comments dataframe is 1,454,993x25
CPU times: user 16.3 ms, sys: 660 µs, total: 17 ms
Wall time: 13.1 s


                                                                                

In [5]:
tfidf_sentiment_dummy.printSchema()

root
 |-- subreddit: string (nullable = true)
 |-- id: string (nullable = true)
 |-- created_utc: timestamp (nullable = true)
 |-- body: string (nullable = true)
 |-- score: long (nullable = true)
 |-- gilded: long (nullable = true)
 |-- is_zoom: integer (nullable = true)
 |-- is_coursera: integer (nullable = true)
 |-- is_udemy: integer (nullable = true)
 |-- is_edx: integer (nullable = true)
 |-- is_linkedin: integer (nullable = true)
 |-- is_masterclass: integer (nullable = true)
 |-- is_youtube: integer (nullable = true)
 |-- is_inperson: integer (nullable = true)
 |-- is_engineering: integer (nullable = true)
 |-- is_science: integer (nullable = true)
 |-- is_social_science: integer (nullable = true)
 |-- is_arts: integer (nullable = true)
 |-- is_med: integer (nullable = true)
 |-- is_law: integer (nullable = true)
 |-- final_text: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- combined_text: string (nullable = true)
 |-- words: array (nullable = tru