# Processing large datasets with Apache Spark and Amazon SageMaker

***This notebook run on `Data Science 3.0 - Python 3` kernel on a `ml.t3.large` instance***.

Amazon SageMaker Processing Jobs are used  to analyze data and evaluate machine learning models on Amazon SageMaker. With Processing, you can use a simplified, managed experience on SageMaker to run your data processing workloads, such as feature engineering, data validation, model evaluation, and model interpretation. You can also use the Amazon SageMaker Processing APIs during the experimentation phase and after the code is deployed in production to evaluate performance.

        


![](https://docs.aws.amazon.com/images/sagemaker/latest/dg/images/Processing-1.png)

The preceding diagram shows how Amazon SageMaker spins up a Processing job. Amazon SageMaker takes your script, copies your data from Amazon Simple Storage Service (Amazon S3), and then pulls a processing container. The processing container image can either be an Amazon SageMaker built-in image or a custom image that you provide. The underlying infrastructure for a Processing job is fully managed by Amazon SageMaker. Cluster resources are provisioned for the duration of your job, and cleaned up when a job completes. The output of the Processing job is stored in the Amazon S3 bucket you specified.

## Our workflow for processing large amounts of data with SageMaker

We can divide our workflow into two steps:
    
1. Work with a small subset of the data with Spark running in local model in a SageMaker Studio Notebook.

1. Once we are able to work with the small subset of data we can provide the same code (as a Python script rather than a series of interactive steps) to SageMaker Processing which launched a Spark cluster, runs out code and terminates the cluster.

## In this notebook...

We will analyze the [Pushshift Reddit dataset](https://arxiv.org/pdf/2001.08435.pdf) to be used for the project and then we will run a SageMaker Processing Job to filter out the comments and submissions from subreddits of interest. The filtered data will be stored in your account's s3 bucket and it is this filtered data that you will be using for your project.

## Setup
We need an available Java installation to run pyspark. The easiest way to do this is to install JDK and set the proper paths using conda

In [1]:
# Setup - Run only once per Kernel App
%conda install https://anaconda.org/conda-forge/openjdk/11.0.1/download/linux-64/openjdk-11.0.1-hacce0ff_1021.tar.bz2

# install PySpark
%pip install pyspark==3.4.0

# restart kernel
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")


Downloading and Extracting Packages:


## Package Plan ##

  environment location: /opt/conda



Preparing transaction: done
Verifying transaction: done
Executing transaction: done

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


## Utilize S3 Data within local PySpark
* By specifying the `hadoop-aws` jar in our Spark config we're able to access S3 datasets using the s3a file prefix. 
* Since we've already authenticated ourself to SageMaker Studio , we can use our assumed SageMaker ExecutionRole for any S3 reads/writes by setting the credential provider as `ContainerCredentialsProvider`

In [2]:
# Import pyspark and build Spark session
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder.appName("PySparkApp")
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.2")
    .config(
        "fs.s3a.aws.credentials.provider",
        "com.amazonaws.auth.ContainerCredentialsProvider",
    )
    .getOrCreate()
)

print(spark.version)



:: loading settings :: url = jar:file:/opt/conda/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/sagemaker-user/.ivy2/cache
The jars for the packages stored in: /home/sagemaker-user/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-e0ac757e-e33e-4efc-9b1f-baf55603ab3c;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.2.2 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.563 in central
:: resolution report :: resolve 209ms :: artifacts dl 7ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.11.563 from central in [default]
	org.apache.hadoop#hadoop-aws;3.2.2 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	----------------

3.4.0


### Reading data into a Spark Dataframe
Note that we will be using the "s3a" adapter (read more [here](https://aws.amazon.com/blogs/opensource/community-collaboration-the-s3a-story)). S3A enables Hadoop to directly read and write Amazon S3 objects.

In [3]:
%%time
comments = spark.read.parquet(
    "s3a://bigdatateaching/reddit/parquet/comments/yyyy=*/mm=*/*comments*.parquet",
    header=True
)
comments.show()

24/10/06 00:51:20 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
[Stage 2:>                                                          (0 + 1) / 1]

+------------------+----------------------+--------------------+--------------------+----------------+-----------+-------------+------+------+-------+----------+----------+------------+-----+--------+--------------------+------------+
|            author|author_flair_css_class|   author_flair_text|                body|controversiality|created_utc|distinguished|edited|gilded|     id|   link_id| parent_id|retrieved_on|score|stickied|           subreddit|subreddit_id|
+------------------+----------------------+--------------------+--------------------+----------------+-----------+-------------+------+------+-------+----------+----------+------------+-----+--------+--------------------+------------+
|    Insane_Inkster|                  null|:patidar: Patidar...|Started supportin...|               0| 1716989728|         null|  null|     0|l668220|t3_1d362sp|t3_1d362sp|  1716989744|    1|   false|                 RCB|    t5_2v1lp|
|         UneditedB|                  null|                n

                                                                                

In [4]:
%%time
submissions = spark.read.parquet(
    "s3a://bigdatateaching/reddit/parquet/submissions/yyyy=*/mm=*/*submissions*.parquet",
    header=True
)
submissions.show()

[Stage 5:>                                                          (0 + 1) / 1]

+--------------------+----------------------+-----------------+-----------+-------------+--------------------+------+-------+-------+------+------------+-------+----------+------------+-----+--------------------+--------+--------------------+------------+--------------------+--------------------+
|              author|author_flair_css_class|author_flair_text|created_utc|distinguished|              domain|edited|     id|is_self|locked|num_comments|over_18|quarantine|retrieved_on|score|            selftext|stickied|           subreddit|subreddit_id|               title|                 url|
+--------------------+----------------------+-----------------+-----------+-------------+--------------------+------+-------+-------+------+------------+-------+----------+------------+-----+--------------------+--------+--------------------+------------+--------------------+--------------------+
|     Newleaftherapy2|                  null|             null| 1689068092|         null|self.Newleafthera

                                                                                

In [5]:
%%time
print(f"shape of the submissions dataframe is {submissions.count():,}x{len(submissions.columns)}") 



shape of the submissions dataframe is 567,890,869x21
CPU times: user 114 ms, sys: 24.1 ms, total: 138 ms
Wall time: 1min 10s


                                                                                

In [6]:
%%time
print(f"shape of the comments dataframe is {comments.count():,}x{len(comments.columns)}") 



shape of the comments dataframe is 3,678,768,958x17
CPU times: user 507 ms, sys: 92.3 ms, total: 599 ms
Wall time: 4min 29s


                                                                                

## Exploratory data analysis
Let us find the number of submissions and comments per subreddit. We will use SparkSQL for this.

In [7]:
submissions.createOrReplaceTempView("submissions")
comments.createOrReplaceTempView("comments")

In [8]:
%%time

sql_str="select subreddit, count(id) as count from submissions group by subreddit order by count desc"
submissions_subreddit_count = spark.sql(sql_str)
submissions_subreddit_count.show()



+--------------------+-------+
|           subreddit|  count|
+--------------------+-------+
|            dirtyr4r|3633005|
|         JerkOffChat|3616759|
|           AskReddit|2686082|
|   GaySnapchatImages|2655639|
|    GaySnapchatShare|2641484|
|       DirtySnapchat|1986200|
|      PokemonGoRaids|1341799|
|     slutsofsnapchat|1334782|
|PersonalizedGameRecs|1325223|
|  HentaiAndRoleplayy|1315212|
|   u_GrownBrilliance|1310625|
|   MonopolyGoTrading|1234245|
| relationship_advice|1149649|
|         MassiveCock|1096306|
|       DirtyChatPals|1024807|
|         Monopoly_GO| 970421|
|       AutoNewspaper| 946314|
|              GOONED| 917121|
|          ratemycock| 898375|
|    DickPicRequestv2| 863552|
+--------------------+-------+
only showing top 20 rows

CPU times: user 356 ms, sys: 48.2 ms, total: 404 ms
Wall time: 3min 50s


                                                                                

## Process S3 data with SageMaker Processing Job `PySparkProcessor`

We are going to move the above processing code in a Python file and then submit that file to SageMaker Processing Job's [`PySparkProcessor`](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_processing.html#pysparkprocessor).

In [9]:
!mkdir -p ./code

In [22]:
%%writefile ./code/process.py

import os
import logging
import argparse

# Import pyspark and build Spark session
from pyspark.sql.functions import *
from pyspark.sql.types import (
    DoubleType,
    IntegerType,
    StringType,
    StructField,
    StructType,
)
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

logging.basicConfig(format='%(asctime)s,%(levelname)s,%(module)s,%(filename)s,%(lineno)d,%(message)s', level=logging.DEBUG)
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))

def main():
    parser = argparse.ArgumentParser(description="app inputs and outputs")
    parser.add_argument("--s3_dataset_path_commments", type=str, help="Path of dataset in S3 for reddit comments")
    parser.add_argument("--s3_dataset_path_submissions", type=str, help="Path of dataset in S3 for reddit submissions")
    parser.add_argument("--s3_output_bucket", type=str, help="s3 output bucket")
    parser.add_argument("--s3_output_prefix", type=str, help="s3 output prefix")
    parser.add_argument("--subreddits", type=str, help="comma separate list of subreddits of interest")
    args = parser.parse_args()

    spark = SparkSession.builder.appName("PySparkApp").getOrCreate()
    logger.info(f"spark version = {spark.version}")
    
    # This is needed to save RDDs which is the only way to write nested Dataframes into CSV format
    sc = spark.sparkContext
    sc._jsc.hadoopConfiguration().set(
        "mapred.output.committer.class", "org.apache.hadoop.mapred.FileOutputCommitter"
    )

   
    # Downloading the data from S3 into a Dataframe
    logger.info(f"going to read {args.s3_dataset_path_commments}")
    comments = spark.read.parquet(args.s3_dataset_path_commments, header=True)
    logger.info(f"finished reading files...")
    
    logger.info(f"going to read {args.s3_dataset_path_submissions}")
    submissions = spark.read.parquet(args.s3_dataset_path_submissions, header=True)
    logger.info(f"finished reading files...")
    
    # filter the dataframe to only keep the subreddits of interest
    subreddits = [s.strip() for s in args.subreddits.split(",")]
    excluded_cols = ['edited', 'created_utc', 'retrieved_on']
    
    comments_included_cols = [c for c in comments.columns if c not in excluded_cols]
    logger.info(f"comments included_cols={comments_included_cols}")

    submissions_included_cols = [c for c in submissions.columns if c not in excluded_cols]
    logger.info(f"submissions included_cols={submissions_included_cols}")

    # subset the dataframes because "edited" and "created_utc" have data type problems
    # sometimes they occur as int some time as float and since schema is encoded in the 
    # parquet files therefore different files have different data tpyes for these fields (float in some cases, int in some cases)
    # and spark enforces strict type checking on read so the only option we have is to either
    # fix this outside of spark or delete these columns.
    comments = comments.select(comments_included_cols)
    submissions = submissions.select(submissions_included_cols)
    
    submissions_filtered = submissions.where(lower(col("subreddit")).isin(subreddits))
    comments_filtered = comments.where(lower(col("subreddit")).isin(subreddits))
    
    # save the filtered dataframes so that these files can now be used for future analysis
    s3_path = f"s3://{args.s3_output_bucket}/{args.s3_output_prefix}/comments"
    logger.info(f"going to write comments for {subreddits} in {s3_path}")
    logger.info(f"shape of the comments_filtered dataframe is {comments_filtered.count():,}x{len(comments_filtered.columns)}")
    comments_filtered.write.mode("overwrite").parquet(s3_path)
    
    s3_path = f"s3://{args.s3_output_bucket}/{args.s3_output_prefix}/submissions"
    logger.info(f"going to write submissions for {subreddits} in {s3_path}")
    logger.info(f"shape of the submissions_filtered dataframe is {submissions_filtered.count():,}x{len(submissions_filtered.columns)}")
    submissions_filtered.write.mode("overwrite").parquet(s3_path)

if __name__ == "__main__":
    main()

Overwriting ./code/process.py


Now submit this code to SageMaker Processing Job.

In [23]:
%%time
import sagemaker
from sagemaker.spark.processing import PySparkProcessor

# Setup the PySpark processor to run the job. Note the instance type and instance count parameters. SageMaker will create these many instances of this type for the spark job.
role = sagemaker.get_execution_role()
spark_processor = PySparkProcessor(
    base_job_name="sm-spark-project",
    framework_version="3.3",
    role=role,
    instance_count=4,
    instance_type="ml.m5.xlarge",
    max_runtime_in_seconds=3600,
)

# s3 paths
session = sagemaker.Session()
bucket = session.default_bucket()
s3_dataset_path_commments = "s3://bigdatateaching/reddit/parquet/comments/yyyy=*/mm=*/*.parquet"
s3_dataset_path_submissions = "s3://bigdatateaching/reddit/parquet/submissions/yyyy=*/mm=*/*.parquet"
output_prefix_data = "project"
output_prefix_logs = f"spark_logs"

# modify this comma separated list to choose the subreddits of interest
#subreddits = "technology,chatgpt"
subreddits = "aws,azure"
    
# run the job now, the arguments array is provided as command line to the Python script (Spark code in this case).
spark_processor.run(
    submit_app="./code/process.py",
    arguments=[
        "--s3_dataset_path_commments",
        s3_dataset_path_commments,
        "--s3_dataset_path_submissions",
        s3_dataset_path_submissions,
        "--s3_output_bucket",
        bucket,
        "--s3_output_prefix",
        output_prefix_data,
        "--subreddits",
        subreddits,
    ],
    spark_event_logs_s3_uri="s3://{}/{}/spark_event_logs".format(bucket, output_prefix_logs),
    logs=False,
)

...............................................................................................................................................................................................................................................................................................................................................................................................................................................................!CPU times: user 2.38 s, sys: 241 ms, total: 2.62 s
Wall time: 37min 37s


## Read the filtered data

Now that we have filtered the data to only keep submissions and comments from subreddits of interest. Let us read data from the s3 path where we saved the filtered data.

In [24]:
%%time
s3_path = f"s3a://{bucket}/{output_prefix_data}/comments"
print(f"reading submissions from {s3_path}")
comments = spark.read.parquet(s3_path, header=True)
print(f"shape of the comments dataframe is {comments.count():,}x{len(comments.columns)}")

reading submissions from s3a://sagemaker-us-east-1-733132495727/project/comments




shape of the comments dataframe is 189,155x14
CPU times: user 82.3 ms, sys: 21.1 ms, total: 103 ms
Wall time: 2min 24s


                                                                                

In [25]:
comments.printSchema()

root
 |-- author: string (nullable = true)
 |-- author_flair_css_class: string (nullable = true)
 |-- author_flair_text: string (nullable = true)
 |-- body: string (nullable = true)
 |-- controversiality: long (nullable = true)
 |-- distinguished: string (nullable = true)
 |-- gilded: long (nullable = true)
 |-- id: string (nullable = true)
 |-- link_id: string (nullable = true)
 |-- parent_id: string (nullable = true)
 |-- score: long (nullable = true)
 |-- stickied: boolean (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- subreddit_id: string (nullable = true)



In [27]:
# display a subset of columns
comments.select("subreddit", "author", "body", "parent_id", "link_id", "id").show()

+---------+--------------+--------------------+----------+----------+-------+
|subreddit|        author|                body| parent_id|   link_id|     id|
+---------+--------------+--------------------+----------+----------+-------+
|      aws|       Natuuls|         Skill issue|t3_1av0o2g|t3_1av0o2g|kr7ii0t|
|    AZURE|     206SEATTL|Thank you, much a...|t1_kr6fkd7|t3_1auv8p8|kr7ij7m|
|      aws|  light_odin05|You can make it s...|t1_kr7gqii|t3_1av0o2g|kr7imzh|
|      aws|       darvink|You are thinking ...|t3_1av0o2g|t3_1av0o2g|kr7iztp|
|      aws|spurius_tadius|Yes, thanks!\n\nT...|t1_kr7gggr|t3_1autiak|kr7j43a|
|      aws|     nevaNevan|I don’t have your...|t3_1av0o2g|t3_1av0o2g|kr7j4ur|
|    AZURE|       Netskyz|Eating crumpets d...|t1_kr6si28|t3_1aut6p4|kr7j7db|
|      aws|wafflefriesdev|Azure is very Win...|t3_1av0st2|t3_1av0st2|kr7jdf4|
|      aws|  light_odin05|I'd advise you to...|t3_1av0o2g|t3_1av0o2g|kr7jezk|
|    AZURE|       rdhdpsy|I think aws has a...|t3_1av0t6v|t3_1av

In [28]:
%%time
s3_path = f"s3a://{bucket}/{output_prefix_data}/submissions"
print(f"reading submissions from {s3_path}")
submissions = spark.read.parquet(s3_path, header=True)
print(f"shape of the submissions dataframe is {submissions.count():,}x{len(submissions.columns)}")

reading submissions from s3a://sagemaker-us-east-1-733132495727/project/submissions




shape of the submissions dataframe is 31,302x18
CPU times: user 19.1 ms, sys: 0 ns, total: 19.1 ms
Wall time: 26.4 s


                                                                                

In [29]:
submissions.printSchema()

root
 |-- author: string (nullable = true)
 |-- author_flair_css_class: string (nullable = true)
 |-- author_flair_text: string (nullable = true)
 |-- distinguished: string (nullable = true)
 |-- domain: string (nullable = true)
 |-- id: string (nullable = true)
 |-- is_self: boolean (nullable = true)
 |-- locked: boolean (nullable = true)
 |-- num_comments: long (nullable = true)
 |-- over_18: boolean (nullable = true)
 |-- quarantine: boolean (nullable = true)
 |-- score: long (nullable = true)
 |-- selftext: string (nullable = true)
 |-- stickied: boolean (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- subreddit_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- url: string (nullable = true)



In [30]:
# display a subset of columns
submissions.select("subreddit", "author", "title", "selftext", "num_comments").show()

+---------+--------------------+--------------------+--------------------+------------+
|subreddit|              author|               title|            selftext|num_comments|
+---------+--------------------+--------------------+--------------------+------------+
|    AZURE|      dxrkinfuser_44|Azure for Student...|Hey there to ever...|           0|
|    AZURE|           EndNo4852|Hang on Create Fu...|Trying to create ...|           0|
|    AZURE|         ping-friend|Azure Update Mana...|Could you share h...|           3|
|    AZURE|           EndNo4852|Hang on Function App|Trying to create ...|           5|
|      aws|       LostOnEarth82|  Password keys help|So I hired some s...|          10|
|      aws| SalamanderReady6680|Cloud Computing A...|Howdy yall, I’m 2...|           2|
|      aws|      shivvaaaaakeee|Kubernetes with H...|           [removed]|           0|
|    AZURE|         aljandeleon|Databricks with A...|We are planning t...|           0|
|    AZURE|         ryan_qumulo|