In [1]:
import logging
import sagemaker
from time import gmtime, strftime

sagemaker_logger = logging.getLogger("sagemaker")
sagemaker_logger.setLevel(logging.INFO)
sagemaker_logger.addHandler(logging.StreamHandler())

sagemaker_session = sagemaker.Session()
bucket = sagemaker_session.default_bucket()
role = sagemaker.get_execution_role()

In [4]:
%%writefile spark_bigquery_to_s3.py

from pyspark.sql import SparkSession
from google.cloud import bigquery
from google.oauth2 import service_account


def main():
    spark = SparkSession \
        .builder \
        .appName("Spark Demo - Writing to S3") \
        .getOrCreate()

    credentials = service_account.Credentials.from_service_account_file('/opt/ml/processing/input/files/credentials.json')

    project_id = '<projectid>'
    client = bigquery.Client(credentials= credentials,project=project_id)

    query = """
       SELECT   name,  count FROM   `babynames.names_2014` WHERE   gender = 'M' ORDER BY   count DESC LIMIT   5 """

    df = client.query(query).to_dataframe()

    sparkDF=spark.createDataFrame(df) 
    sparkDF.printSchema()
    sparkDF.show()

    sparkDF.repartition(1).write.mode('overwrite').\
        parquet('<s3 uri>')

if __name__ == "__main__":
    main()  

Overwriting spark_bigquery_to_s3.py


In [5]:
from sagemaker.spark.processing import PySparkProcessor

prefix = "sagemaker/spark-demo"
image_uri_custom = '<ecr_image_uri>'

# Run the processing job
spark_processor = PySparkProcessor(
    base_job_name="sm-spark",
    framework_version="2.4",
    role=role,
    instance_count=2,
    instance_type="ml.m5.xlarge",
    max_runtime_in_seconds=1200,
    image_uri= image_uri_custom
)

spark_processor.run(
    submit_app="spark_bigquery_to_s3.py",
    submit_files=["credentials.json"],
    spark_event_logs_s3_uri="s3://{}/{}/spark_event_logs".format(bucket, prefix),
    logs=False
)

Copying dependency from local path demo_credentials.json to tmpdir /tmp/tmpz1_2ab75
Uploading dependencies from tmpdir /tmp/tmpz1_2ab75 to S3 s3://sagemaker-eu-central-1-310766717595/sm-spark-2021-04-19-09-31-19-713/input/files
Creating processing-job with name sm-spark-2021-04-19-09-31-19-713



Job Name:  sm-spark-2021-04-19-09-31-19-713
Inputs:  [{'InputName': 'files', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-eu-central-1-310766717595/sm-spark-2021-04-19-09-31-19-713/input/files', 'LocalPath': '/opt/ml/processing/input/files', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-eu-central-1-310766717595/sm-spark-2021-04-19-09-31-19-713/input/code/spark_bigquery_to_s3.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'output-1', 'AppManaged': False, 'S3Output': {'S3Uri': 's3://sagemaker-eu-central-1-310766717595/sagemaker/spark-demo/spark_event_logs', 'LocalPath': '/opt/ml/processing/spark-events/', 'S3UploadMode': 'Continuous'}}]
............................