## Pull subreddit data for relevant subreddits

## Setup
We need an available Java installation to run pyspark. The easiest way to do this is to install JDK and set the proper paths using conda

In [26]:
# Setup - Run only once per Kernel App
%conda install https://anaconda.org/conda-forge/openjdk/11.0.1/download/linux-64/openjdk-11.0.1-hacce0ff_1021.tar.bz2

# install PySpark
%pip install pyspark==3.4.0

# restart kernel
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")

IOStream.flush timed out
Retrieving notices: ...working... done

Downloading and Extracting Packages:


## Package Plan ##

  environment location: /opt/conda



Preparing transaction: done
Verifying transaction: done
Executing transaction: done

Note: you may need to restart the kernel to use updated packages.
IOStream.flush timed out
Note: you may need to restart the kernel to use updated packages.


## Utilize S3 Data within local PySpark
* By specifying the `hadoop-aws` jar in our Spark config we're able to access S3 datasets using the s3a file prefix. 
* Since we've already authenticated ourself to SageMaker Studio , we can use our assumed SageMaker ExecutionRole for any S3 reads/writes by setting the credential provider as `ContainerCredentialsProvider`

In [27]:
# Import pyspark and build Spark session
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder.appName("PySparkApp")
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.2")
    .config(
        "fs.s3a.aws.credentials.provider",
        "com.amazonaws.auth.ContainerCredentialsProvider",
    )
    .getOrCreate()
)

print(spark.version)

3.4.0


INFO:py4j.clientserver:Error while sending or receiving.
Traceback (most recent call last):
  File "/opt/conda/lib/python3.11/site-packages/py4j/clientserver.py", line 503, in send_command
    self.socket.sendall(command.encode("utf-8"))
ConnectionResetError: [Errno 104] Connection reset by peer
INFO:py4j.clientserver:Closing down clientserver connection
INFO:root:Exception while sending command.
Traceback (most recent call last):
  File "/opt/conda/lib/python3.11/site-packages/py4j/clientserver.py", line 503, in send_command
    self.socket.sendall(command.encode("utf-8"))
ConnectionResetError: [Errno 104] Connection reset by peer

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/lib/python3.11/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/py4j/clientse

## Process S3 data with SageMaker Processing Job `PySparkProcessor`

We are going to move the above processing code in a Python file and then submit that file to SageMaker Processing Job's [`PySparkProcessor`](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_processing.html#pysparkprocessor).

In [7]:
!mkdir -p ./pull_data

In [8]:
%%writefile ./pull_data/process.py

import os
import logging
import argparse

# Import pyspark and build Spark session
from pyspark.sql.functions import *
from pyspark.sql.types import (
    DoubleType,
    IntegerType,
    StringType,
    StructField,
    StructType,
)
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

logging.basicConfig(format='%(asctime)s,%(levelname)s,%(module)s,%(filename)s,%(lineno)d,%(message)s', level=logging.DEBUG)
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))

def main():
    parser = argparse.ArgumentParser(description="app inputs and outputs")
    parser.add_argument("--s3_dataset_path_commments", type=str, help="Path of dataset in S3 for reddit comments")
    parser.add_argument("--s3_dataset_path_submissions", type=str, help="Path of dataset in S3 for reddit submissions")
    parser.add_argument("--s3_output_bucket", type=str, help="s3 output bucket")
    parser.add_argument("--s3_output_prefix", type=str, help="s3 output prefix")
    parser.add_argument("--subreddits", type=str, help="comma separate list of subreddits of interest")
    args = parser.parse_args()

    spark = SparkSession.builder.appName("PySparkApp").getOrCreate()
    logger.info(f"spark version = {spark.version}")
    
    # This is needed to save RDDs which is the only way to write nested Dataframes into CSV format
    sc = spark.sparkContext
    sc._jsc.hadoopConfiguration().set(
        "mapred.output.committer.class", "org.apache.hadoop.mapred.FileOutputCommitter"
    )

   
    # Downloading the data from S3 into a Dataframe
    logger.info(f"going to read {args.s3_dataset_path_commments}")
    comments = spark.read.parquet(args.s3_dataset_path_commments, header=True)
    logger.info(f"finished reading files...")
    
    logger.info(f"going to read {args.s3_dataset_path_submissions}")
    submissions = spark.read.parquet(args.s3_dataset_path_submissions, header=True)
    logger.info(f"finished reading files...")
    
    # filter the dataframe to only keep the subreddits of interest
    subreddits = [s.strip() for s in args.subreddits.split(",")]
    excluded_cols = [] # ['edited', 'created_utc', 'retrieved_on']
    
    comments_included_cols = [c for c in comments.columns if c not in excluded_cols]
    logger.info(f"comments included_cols={comments_included_cols}")

    submissions_included_cols = [c for c in submissions.columns if c not in excluded_cols]
    logger.info(f"submissions included_cols={submissions_included_cols}")

    # subset the dataframes because "edited" and "created_utc" have data type problems
    # sometimes they occur as int some time as float and since schema is encoded in the 
    # parquet files therefore different files have different data tpyes for these fields (float in some cases, int in some cases)
    # and spark enforces strict type checking on read so the only option we have is to either
    # fix this outside of spark or delete these columns.
    comments = comments.select(comments_included_cols)
    submissions = submissions.select(submissions_included_cols)
    
    submissions_filtered = submissions.where(lower(col("subreddit")).isin(subreddits))
    comments_filtered = comments.where(lower(col("subreddit")).isin(subreddits))
    
    # save the filtered dataframes so that these files can now be used for future analysis
    s3_path = f"s3://{args.s3_output_bucket}/{args.s3_output_prefix}/comments"
    logger.info(f"going to write comments for {subreddits} in {s3_path}")
    logger.info(f"shape of the comments_filtered dataframe is {comments_filtered.count():,}x{len(comments_filtered.columns)}")
    comments_filtered.write.mode("overwrite").parquet(s3_path)
    
    s3_path = f"s3://{args.s3_output_bucket}/{args.s3_output_prefix}/submissions"
    logger.info(f"going to write submissions for {subreddits} in {s3_path}")
    logger.info(f"shape of the submissions_filtered dataframe is {submissions_filtered.count():,}x{len(submissions_filtered.columns)}")
    submissions_filtered.write.mode("overwrite").parquet(s3_path)

if __name__ == "__main__":
    main()

Writing ./pull_data/process.py


### Now submit this code to SageMaker Processing Job.

For the r/travel, r/usatravel, and r/AskAnAmerican subreddits

In [None]:
%%time
import sagemaker
from sagemaker.spark.processing import PySparkProcessor

# Setup the PySpark processor to run the job. Note the instance type and instance count parameters. SageMaker will create these many instances of this type for the spark job.
role = sagemaker.get_execution_role()
spark_processor = PySparkProcessor(
    base_job_name="sm-spark-project",
    framework_version="3.3",
    role=role,
    instance_count=4,
    instance_type="ml.m5.xlarge",
    max_runtime_in_seconds=3600,
)

# s3 paths
session = sagemaker.Session()
bucket = session.default_bucket()
s3_dataset_path_commments = "s3://bigdatateaching/reddit-project/reddit/parquet/comments/yyyy=*/mm=*/*.parquet"
s3_dataset_path_submissions = "s3://bigdatateaching/reddit-project/reddit/parquet/submissions/yyyy=*/mm=*/*.parquet"
output_prefix_data = "project"
output_prefix_logs = f"spark_logs"

# modify this comma separated list to choose the subreddits of interest
#subreddits = "technology,chatgpt"
subreddits = "travel,usatravel,askanamerican"
    
# run the job now, the arguments array is provided as command line to the Python script (Spark code in this case).
spark_processor.run(
    submit_app="./pull_data/process.py",
    arguments=[
        "--s3_dataset_path_commments",
        s3_dataset_path_commments,
        "--s3_dataset_path_submissions",
        s3_dataset_path_submissions,
        "--s3_output_bucket",
              bucket,
        "--s3_output_prefix",
        output_prefix_data,
        "--subreddits",
        subreddits,
    ],
    spark_event_logs_s3_uri="s3://{}/{}/spark_event_logs".format(bucket, output_prefix_logs),
    logs=False,
)

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


INFO:sagemaker:Creating processing-job with name sm-spark-project-2024-11-08-16-16-49-511


...............................................................................................................................................................................................................................................................................................................................!CPU times: user 4.22 s, sys: 418 ms, total: 4.64 s
Wall time: 37min 42s


### Re-write process.py for another pull

For the r/alabama, alaska, arizona, arkansas, california, colorado, connecticut, delaware, florida, georgia, hawaii, idaho, illinois, indiana, iowa, kansas, kentucky, louisiana, maine, maryland, massachusetts, michigan, minnesota, mississippi, missouri, montana, nebraska, nevada subreddits

In [10]:
%%writefile ./pull_data/process.py

import os
import logging
import argparse

# Import pyspark and build Spark session
from pyspark.sql.functions import *
from pyspark.sql.types import (
    DoubleType,
    IntegerType,
    StringType,
    StructField,
    StructType,
)
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

logging.basicConfig(format='%(asctime)s,%(levelname)s,%(module)s,%(filename)s,%(lineno)d,%(message)s', level=logging.DEBUG)
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))

def main():
    parser = argparse.ArgumentParser(description="app inputs and outputs")
    parser.add_argument("--s3_dataset_path_commments", type=str, help="Path of dataset in S3 for reddit comments")
    parser.add_argument("--s3_dataset_path_submissions", type=str, help="Path of dataset in S3 for reddit submissions")
    parser.add_argument("--s3_output_bucket", type=str, help="s3 output bucket")
    parser.add_argument("--s3_output_prefix", type=str, help="s3 output prefix")
    parser.add_argument("--subreddits", type=str, help="comma separate list of subreddits of interest")
    args = parser.parse_args()

    spark = SparkSession.builder.appName("PySparkApp").getOrCreate()
    logger.info(f"spark version = {spark.version}")
    
    # This is needed to save RDDs which is the only way to write nested Dataframes into CSV format
    sc = spark.sparkContext
    sc._jsc.hadoopConfiguration().set(
        "mapred.output.committer.class", "org.apache.hadoop.mapred.FileOutputCommitter"
    )

   
    # Downloading the data from S3 into a Dataframe
    logger.info(f"going to read {args.s3_dataset_path_commments}")
    comments = spark.read.parquet(args.s3_dataset_path_commments, header=True)
    logger.info(f"finished reading files...")
    
    logger.info(f"going to read {args.s3_dataset_path_submissions}")
    submissions = spark.read.parquet(args.s3_dataset_path_submissions, header=True)
    logger.info(f"finished reading files...")
    
    # filter the dataframe to only keep the subreddits of interest
    subreddits = [s.strip() for s in args.subreddits.split(",")]
    excluded_cols = [] # ['edited', 'created_utc', 'retrieved_on']
    
    comments_included_cols = [c for c in comments.columns if c not in excluded_cols]
    logger.info(f"comments included_cols={comments_included_cols}")

    submissions_included_cols = [c for c in submissions.columns if c not in excluded_cols]
    logger.info(f"submissions included_cols={submissions_included_cols}")

    # subset the dataframes because "edited" and "created_utc" have data type problems
    # sometimes they occur as int some time as float and since schema is encoded in the 
    # parquet files therefore different files have different data tpyes for these fields (float in some cases, int in some cases)
    # and spark enforces strict type checking on read so the only option we have is to either
    # fix this outside of spark or delete these columns.
    comments = comments.select(comments_included_cols)
    submissions = submissions.select(submissions_included_cols)
    
    submissions_filtered = submissions.where(lower(col("subreddit")).isin(subreddits))
    comments_filtered = comments.where(lower(col("subreddit")).isin(subreddits))
    
    # save the filtered dataframes so that these files can now be used for future analysis
    s3_path = f"s3://{args.s3_output_bucket}/{args.s3_output_prefix}/comments_2"
    logger.info(f"going to write comments for {subreddits} in {s3_path}")
    logger.info(f"shape of the comments_filtered dataframe is {comments_filtered.count():,}x{len(comments_filtered.columns)}")
    comments_filtered.write.mode("overwrite").parquet(s3_path)
    
    s3_path = f"s3://{args.s3_output_bucket}/{args.s3_output_prefix}/submissions_2"
    logger.info(f"going to write submissions for {subreddits} in {s3_path}")
    logger.info(f"shape of the submissions_filtered dataframe is {submissions_filtered.count():,}x{len(submissions_filtered.columns)}")
    submissions_filtered.write.mode("overwrite").parquet(s3_path)

if __name__ == "__main__":
    main()

Overwriting ./pull_data/process.py


In [None]:
%%time
import sagemaker
from sagemaker.spark.processing import PySparkProcessor

# Setup the PySpark processor to run the job. Note the instance type and instance count parameters. SageMaker will create these many instances of this type for the spark job.
role = sagemaker.get_execution_role()
spark_processor = PySparkProcessor(
    base_job_name="sm-spark-project",
    framework_version="3.3",
    role=role,
    instance_count=4,
    instance_type="ml.m5.xlarge",
    max_runtime_in_seconds=3600,
)

# s3 paths
session = sagemaker.Session()
bucket = session.default_bucket()
s3_dataset_path_commments = "s3://bigdatateaching/reddit-project/reddit/parquet/comments/yyyy=*/mm=*/*.parquet"
s3_dataset_path_submissions = "s3://bigdatateaching/reddit-project/reddit/parquet/submissions/yyyy=*/mm=*/*.parquet"
output_prefix_data = "project"
output_prefix_logs = f"spark_logs"

# modify this comma separated list to choose the subreddits of interest
#subreddits = "technology,chatgpt"
subreddits = "alabama,alaska,arizona,arkansas,california,colorado,connecticut,delaware,florida,georgia,hawaii,idaho,illinois,indiana,iowa,kansas,kentucky,louisiana,maine,maryland,massachusetts,michigan,minnesota,mississippi,missouri,montana,nebraska,nevada"
    
# run the job now, the arguments array is provided as command line to the Python script (Spark code in this case).
spark_processor.run(
    submit_app="./pull_data/process.py",
    arguments=[
        "--s3_dataset_path_commments",
        s3_dataset_path_commments,
        "--s3_dataset_path_submissions",
        s3_dataset_path_submissions,
        "--s3_output_bucket",
              bucket,
        "--s3_output_prefix",
        output_prefix_data,
        "--subreddits",
        subreddits,
    ],
    spark_event_logs_s3_uri="s3://{}/{}/spark_event_logs".format(bucket, output_prefix_logs),
    logs=False,
)

INFO:sagemaker:Creating processing-job with name sm-spark-project-2024-11-08-16-56-59-922


.......................................................................................................................................................................................................................................................................................................................................................................!CPU times: user 2.32 s, sys: 256 ms, total: 2.58 s
Wall time: 35min 7s


### Re-write process.py for another pull

For the r/newhampshire, newjersey, newmexico, newyork, northcarolina, northdakota, ohio, oklahoma, oregon, pennsylvania, rhodeisland, southcarolina, southdakota, tennessee, texas, utah, vermont, virginia, washington, westvirginia, wisconsin, wyoming subreddits








In [12]:
%%writefile ./pull_data/process.py

import os
import logging
import argparse

# Import pyspark and build Spark session
from pyspark.sql.functions import *
from pyspark.sql.types import (
    DoubleType,
    IntegerType,
    StringType,
    StructField,
    StructType,
)
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

logging.basicConfig(format='%(asctime)s,%(levelname)s,%(module)s,%(filename)s,%(lineno)d,%(message)s', level=logging.DEBUG)
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))

def main():
    parser = argparse.ArgumentParser(description="app inputs and outputs")
    parser.add_argument("--s3_dataset_path_commments", type=str, help="Path of dataset in S3 for reddit comments")
    parser.add_argument("--s3_dataset_path_submissions", type=str, help="Path of dataset in S3 for reddit submissions")
    parser.add_argument("--s3_output_bucket", type=str, help="s3 output bucket")
    parser.add_argument("--s3_output_prefix", type=str, help="s3 output prefix")
    parser.add_argument("--subreddits", type=str, help="comma separate list of subreddits of interest")
    args = parser.parse_args()

    spark = SparkSession.builder.appName("PySparkApp").getOrCreate()
    logger.info(f"spark version = {spark.version}")
    
    # This is needed to save RDDs which is the only way to write nested Dataframes into CSV format
    sc = spark.sparkContext
    sc._jsc.hadoopConfiguration().set(
        "mapred.output.committer.class", "org.apache.hadoop.mapred.FileOutputCommitter"
    )

   
    # Downloading the data from S3 into a Dataframe
    logger.info(f"going to read {args.s3_dataset_path_commments}")
    comments = spark.read.parquet(args.s3_dataset_path_commments, header=True)
    logger.info(f"finished reading files...")
    
    logger.info(f"going to read {args.s3_dataset_path_submissions}")
    submissions = spark.read.parquet(args.s3_dataset_path_submissions, header=True)
    logger.info(f"finished reading files...")
    
    # filter the dataframe to only keep the subreddits of interest
    subreddits = [s.strip() for s in args.subreddits.split(",")]
    excluded_cols = [] # ['edited', 'created_utc', 'retrieved_on']
    
    comments_included_cols = [c for c in comments.columns if c not in excluded_cols]
    logger.info(f"comments included_cols={comments_included_cols}")

    submissions_included_cols = [c for c in submissions.columns if c not in excluded_cols]
    logger.info(f"submissions included_cols={submissions_included_cols}")

    # subset the dataframes because "edited" and "created_utc" have data type problems
    # sometimes they occur as int some time as float and since schema is encoded in the 
    # parquet files therefore different files have different data tpyes for these fields (float in some cases, int in some cases)
    # and spark enforces strict type checking on read so the only option we have is to either
    # fix this outside of spark or delete these columns.
    comments = comments.select(comments_included_cols)
    submissions = submissions.select(submissions_included_cols)
    
    submissions_filtered = submissions.where(lower(col("subreddit")).isin(subreddits))
    comments_filtered = comments.where(lower(col("subreddit")).isin(subreddits))
    
    # save the filtered dataframes so that these files can now be used for future analysis
    s3_path = f"s3://{args.s3_output_bucket}/{args.s3_output_prefix}/comments_3"
    logger.info(f"going to write comments for {subreddits} in {s3_path}")
    logger.info(f"shape of the comments_filtered dataframe is {comments_filtered.count():,}x{len(comments_filtered.columns)}")
    comments_filtered.write.mode("overwrite").parquet(s3_path)
    
    s3_path = f"s3://{args.s3_output_bucket}/{args.s3_output_prefix}/submissions_3"
    logger.info(f"going to write submissions for {subreddits} in {s3_path}")
    logger.info(f"shape of the submissions_filtered dataframe is {submissions_filtered.count():,}x{len(submissions_filtered.columns)}")
    submissions_filtered.write.mode("overwrite").parquet(s3_path)

if __name__ == "__main__":
    main()

Overwriting ./pull_data/process.py


In [None]:
%%time
import sagemaker
from sagemaker.spark.processing import PySparkProcessor

# Setup the PySpark processor to run the job. Note the instance type and instance count parameters. SageMaker will create these many instances of this type for the spark job.
role = sagemaker.get_execution_role()
spark_processor = PySparkProcessor(
    base_job_name="sm-spark-project",
    framework_version="3.3",
    role=role,
    instance_count=4,
    instance_type="ml.m5.xlarge",
    max_runtime_in_seconds=3600,
)

# s3 paths
session = sagemaker.Session()
bucket = session.default_bucket()
s3_dataset_path_commments = "s3://bigdatateaching/reddit-project/reddit/parquet/comments/yyyy=*/mm=*/*.parquet"
s3_dataset_path_submissions = "s3://bigdatateaching/reddit-project/reddit/parquet/submissions/yyyy=*/mm=*/*.parquet"
output_prefix_data = "project"
output_prefix_logs = f"spark_logs"

# modify this comma separated list to choose the subreddits of interest
#subreddits = "technology,chatgpt"
subreddits = "newhampshire,newjersey,newmexico,newyork,northcarolina,northdakota,ohio,oklahoma,oregon,pennsylvania,rhodeisland,southcarolina,southdakota,tennessee,texas,utah,vermont,virginia,washington,westvirginia,wisconsin,wyoming"
    
# run the job now, the arguments array is provided as command line to the Python script (Spark code in this case).
spark_processor.run(
    submit_app="./pull_data/process.py",
    arguments=[
        "--s3_dataset_path_commments",
        s3_dataset_path_commments,
        "--s3_dataset_path_submissions",
        s3_dataset_path_submissions,
        "--s3_output_bucket",
              bucket,
        "--s3_output_prefix",
        output_prefix_data,
        "--subreddits",
        subreddits,
    ],
    spark_event_logs_s3_uri="s3://{}/{}/spark_event_logs".format(bucket, output_prefix_logs),
    logs=False,
)

INFO:sagemaker:Creating processing-job with name sm-spark-project-2024-11-08-17-33-24-773


.................................................................................................................................................................................................................................................................................................................................................................................................................................!CPU times: user 2.18 s, sys: 210 ms, total: 2.39 s
Wall time: 35min 7s


## Read and combine the filtered data

Now that we have filtered the data to only keep submissions and comments from subreddits of interest. Let us read data from the s3 path where we saved the filtered data.

In [None]:
%%time
# Read comments data
s3_path_1 = f"s3a://{bucket}/{output_prefix_data}/comments"
s3_path_2 = f"s3a://{bucket}/{output_prefix_data}/comments_2"
s3_path_3 = f"s3a://{bucket}/{output_prefix_data}/comments_3"

print(f"reading comments from {s3_path_1}")
comments_1 = spark.read.parquet(s3_path_1, header=True)
print(f"reading comments from {s3_path_2}")
comments_2 = spark.read.parquet(s3_path_2, header=True)
print(f"reading comments from {s3_path_3}")
comments_3 = spark.read.parquet(s3_path_3, header=True)

# Append comments data
comments = comments_1.union(comments_2)
comments = comments.union(comments_3)
print(f"shape of the comments dataframe is {comments.count():,}x{len(comments.columns)}")

# Print schema
print(comments.printSchema)
comments.groupBy('subreddit').count().show()

reading comments from s3a://sagemaker-us-east-1-562201516459/project/comments
reading comments from s3a://sagemaker-us-east-1-562201516459/project/comments_2
reading comments from s3a://sagemaker-us-east-1-562201516459/project/comments_3




+-------------+-------+
|    subreddit|  count|
+-------------+-------+
|       travel|1692634|
|    usatravel|   3541|
|AskAnAmerican|1484279|
|       Hawaii| 229058|
|     Arkansas| 206619|
|       alaska| 155213|
|  Connecticut| 596502|
|     maryland| 374793|
|     Nebraska|  92045|
|massachusetts| 532337|
|       Nevada|  43471|
|  mississippi| 120069|
|     Delaware| 121224|
|      arizona| 212745|
|     illinois| 151436|
|      florida| 983754|
|      Georgia| 289066|
|      Montana|  94978|
|     Michigan| 417908|
|      Alabama| 134993|
+-------------+-------+
only showing top 20 rows

CPU times: user 821 ms, sys: 179 ms, total: 1e+03 ms
Wall time: 50min 29s


                                                                                

In [22]:
%%time
# Read submissions data
s3_path_1 = f"s3a://{bucket}/{output_prefix_data}/submissions"
s3_path_2 = f"s3a://{bucket}/{output_prefix_data}/submissions_2"
s3_path_3 = f"s3a://{bucket}/{output_prefix_data}/submissions_3"

print(f"reading submissions from {s3_path_1}")
submissions_1 = spark.read.parquet(s3_path_1, header=True)
print(f"reading comments from {s3_path_2}")
submissions_2 = spark.read.parquet(s3_path_2, header=True)
print(f"reading comments from {s3_path_3}")
submissions_3 = spark.read.parquet(s3_path_3, header=True)

# Append submissions data
submissions = submissions_1.union(submissions_2)
submissions = submissions.union(submissions_3)
print(f"shape of the submissions dataframe is {submissions.count():,}x{len(submissions.columns)}")

# Print schema
print(submissions.printSchema)
submissions.groupBy('subreddit').count().show()

reading submissions from s3a://sagemaker-us-east-1-562201516459/project/submissions
reading comments from s3a://sagemaker-us-east-1-562201516459/project/submissions_2
reading comments from s3a://sagemaker-us-east-1-562201516459/project/submissions_3


                                                                                

shape of the submissions dataframe is 697,043x21
<bound method DataFrame.printSchema of DataFrame[author: string, author_flair_css_class: string, author_flair_text: string, created_utc: bigint, distinguished: string, domain: string, edited: double, id: string, is_self: boolean, locked: boolean, num_comments: bigint, over_18: boolean, quarantine: boolean, retrieved_on: bigint, score: bigint, selftext: string, stickied: boolean, subreddit: string, subreddit_id: string, title: string, url: string]>




+-------------+------+
|    subreddit| count|
+-------------+------+
|       travel|169797|
|    usatravel|   880|
|AskAnAmerican| 30216|
|       Hawaii| 11515|
|     Arkansas|  7360|
|       alaska|  7925|
|  Connecticut| 21102|
|     maryland| 15686|
|     Nebraska|  2991|
|massachusetts| 16189|
|       Nevada|  2084|
|  mississippi|  4362|
|     Delaware|  6207|
|      arizona| 10510|
|     illinois|  6594|
|      florida| 29461|
|      Georgia| 10063|
|      Montana|  4392|
|     Michigan| 13109|
|      Alabama|  5505|
+-------------+------+
only showing top 20 rows

CPU times: user 133 ms, sys: 20.8 ms, total: 154 ms
Wall time: 6min 56s


                                                                                

### Save data to S3

In [None]:
output_path_1 = f"s3a://{bucket}/{output_prefix_data}/comments_filt"
output_path_2 = f"s3a://{bucket}/{output_prefix_data}/submissions_filt"
comments.write.mode("overwrite").parquet(output_path_1)
submissions.write.mode("overwrite").parquet(output_path_2)

                                                                                

In [28]:
# Test that data was saved correctly
s3_path_1 = f"s3a://{bucket}/{output_prefix_data}/comments_filt"
print(f"reading submissions from {s3_path_1}")
comments = spark.read.parquet(s3_path_1, header=True)
print(f"shape of the comments dataframe is {comments.count():,}x{len(comments.columns)}")

reading submissions from s3a://sagemaker-us-east-1-562201516459/project/comments_filt




shape of the comments dataframe is 17,667,099x17


                                                                                

In [29]:
# Test that data was saved correctly
s3_path_1 = f"s3a://{bucket}/{output_prefix_data}/submissions_filt"
print(f"reading submissions from {s3_path_1}")
comments = spark.read.parquet(s3_path_1, header=True)
print(f"shape of the comments dataframe is {comments.count():,}x{len(comments.columns)}")

reading submissions from s3a://sagemaker-us-east-1-562201516459/project/submissions_filt




shape of the comments dataframe is 697,043x21


                                                                                