In [37]:
%%sh
aws iam create-role --role-name emr-serverless-job-role --assume-role-policy-document '{
    "Version": "2012-10-17",
    "Statement": [
      {
        "Effect": "Allow",
        "Principal": {
          "Service": "emr-serverless.amazonaws.com"
        },
        "Action": "sts:AssumeRole"
      }
    ]
  }'

{
    "Role": {
        "Path": "/",
        "RoleName": "emr-serverless-job-role",
        "RoleId": "AROAVVYXO24EXG5JQDKNV",
        "Arn": "arn:aws:iam::390354360073:role/emr-serverless-job-role",
        "CreateDate": "2022-06-17T18:37:33+00:00",
        "AssumeRolePolicyDocument": {
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Effect": "Allow",
                    "Principal": {
                        "Service": "emr-serverless.amazonaws.com"
                    },
                    "Action": "sts:AssumeRole"
                }
            ]
        }
    }
}


In [38]:
%%sh
aws iam put-role-policy --role-name emr-serverless-job-role --policy-name S3Access --policy-document '{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "ReadFromOutputAndInputBuckets",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::udacity-dend-redshift",
                "arn:aws:s3:::udacity-dend-redshift/*"
            ]
        },
        {
            "Sid": "WriteToOutputDataBucket",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:DeleteObject"
            ],
            "Resource": [
                "arn:aws:s3:::udacity-dend-redshift/*"
            ]
        }
    ]
}'

In [39]:
%%sh
aws iam put-role-policy --role-name emr-serverless-job-role --policy-name GlueAccess --policy-document '{
    "Version": "2012-10-17",
    "Statement": [
      {
        "Sid": "GlueCreateAndReadDataCatalog",
        "Effect": "Allow",
        "Action": [
            "glue:GetDatabase",
            "glue:GetDataBases",
            "glue:CreateTable",
            "glue:GetTable",
            "glue:GetTables",
            "glue:GetPartition",
            "glue:GetPartitions",
            "glue:CreatePartition",
            "glue:BatchCreatePartition",
            "glue:GetUserDefinedFunctions"
        ],
        "Resource": ["*"]
      }
    ]
  }'

In [None]:
# %%sh
# aws emr-serverless create-application \
#   --type SPARK \
#   --name serverless-demo \
#   --release-label "emr-6.6.0" \
#     --initial-capacity '{
#         "DRIVER": {
#             "workerCount": 2,
#             "workerConfiguration": {
#                 "cpu": "2vCPU",
#                 "memory": "4GB"
#             }
#         },
#         "EXECUTOR": {
#             "workerCount": 10,
#             "workerConfiguration": {
#                 "cpu": "4vCPU",
#                 "memory": "8GB"
#             }
#         }
#     }' \
#     --maximum-capacity '{
#         "cpu": "200vCPU",
#         "memory": "200GB",
#         "disk": "1000GB"
#     }'

In [41]:
!aws emr-serverless get-application --application-id 00f1li2klr0lrv09

{
    "application": {
        "applicationId": "00f1li2klr0lrv09",
        "name": "my-serverless-application",
        "arn": "arn:aws:emr-serverless:us-east-1:390354360073:/applications/00f1li2klr0lrv09",
        "releaseLabel": "emr-6.6.0",
        "type": "Spark",
        "state": "STARTED",
        "stateDetails": "",
        "initialCapacity": {
            "Driver": {
                "workerCount": 1,
                "workerConfiguration": {
                    "cpu": "4 vCPU",
                    "memory": "16 GB",
                    "disk": "20 GB"
                }
            },
            "Executor": {
                "workerCount": 2,
                "workerConfiguration": {
                    "cpu": "4 vCPU",
                    "memory": "16 GB",
                    "disk": "20 GB"
                }
            }
        },
        "maximumCapacity": {
            "cpu": "400 vCPU",
            "memory": "1600 GB",
            "disk": "2000 GB"
        },
        "cr

In [44]:
%env S3_BUCKET=udacity-dend-redshift
%env APPLICATION_ID=00f1li2klr0lrv09
%env JOB_ROLE_ARN=arn:aws:iam::390354360073:role/emr-serverless-job-role

env: S3_BUCKET=udacity-dend-redshift
env: APPLICATION_ID=00f1li2klr0lrv09
env: JOB_ROLE_ARN=arn:aws:iam::390354360073:role/emr-serverless-job-role


In [45]:
%%writefile spark.cfg
[S3]
SOURCE_S3_PATH = s3a://udacity-dend-redshift/
DEST_S3_PATH = s3a://udacity-dend-redshift/spark/

Writing spark.cfg


In [55]:
%%writefile spark_etl.py
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.functions import monotonically_increasing_id


SOURCE_S3_BUCKET = "s3://udacity-dend-redshift/"
DEST_S3_BUCKET = "s3://udacity-dend-redshift/spark/"


def create_spark_session():
    spark = SparkSession \
        .builder \
        .getOrCreate()
    return spark


def process_song_data(spark, input_data, output_data):
    """ Process song data and create songs and artists table
    
        Arguments:
            spark {object}: SparkSession object
            input_data {object}: Source S3 endpoint
            output_data {object}: Target S3 endpoint
        Returns:
            None
    """

    # get filepath to song data file
    song_data = os.path.join(input_data + 'song_data/*/*/*/*.json')
    
    # read song data file
    df = spark.read.json(song_data)
    
    # create temp view of song data for songplays table to join
    df.createOrReplaceTempView("song_data_view")

    # extract columns to create songs table
    songs_table = df.select('song_id', 'title', 'artist_id', 'year', 'duration').distinct()
    
    # write songs table to parquet files partitioned by year and artist
    songs_table.write.mode("overwrite").partitionBy('year', 'artist_id')\
               .parquet(path=output_data + 'songs')

    # extract columns to create artists table
    artists_table = df.select('artist_id', 'artist_name', 'artist_location',\
                              'artist_latitude', 'artist_longitude').distinct()
    
    # write artists table to parquet files
    artists_table.write.mode("overwrite").parquet(path=output_data + 'artists')

    
def process_log_data(spark, input_data, output_data):
    """ Process log data and create users, time and songplays table
        Arguments:
            spark {object}: SparkSession object
            input_data {object}: Source S3 endpoint
            output_data {object}: Target S3 endpoint
        Returns:
            None
    """

    # get filepath to log data file
    log_data = os.path.join(input_data + 'log_data/*/*/*.json')

    # read log data file
    df = spark.read.json(log_data)
    
    # filter by actions for song plays
    df = df.where(df['page'] == 'NextSong')

    # extract columns for users table    
    users_table = df.select('userId', 'firstName', 'lastName', 'gender', 'level').distinct()
    
    # write users table to parquet files
    users_table.write.mode("overwrite").parquet(path=output_data + 'users')

    """  
    # create timestamp column from original timestamp column
    get_timestamp = udf()
    df = 
    # create datetime column from original timestamp column
    get_datetime = udf()
    df = 
    """
    
    # extract columns to create time table
    df = df.withColumn('start_time', (df['ts']/1000).cast('timestamp'))
    df = df.withColumn('weekday', date_format(df['start_time'], 'E'))
    df = df.withColumn('year', year(df['start_time']))
    df = df.withColumn('month', month(df['start_time']))
    df = df.withColumn('week', weekofyear(df['start_time']))
    df = df.withColumn('day', dayofmonth(df['start_time']))
    df = df.withColumn('hour', hour(df['start_time']))
    time_table = df.select('start_time', 'weekday', 'year', 'month',\
                           'week', 'day', 'hour').distinct()
    
    # write time table to parquet files partitioned by year and month
    time_table.write.mode('overwrite').partitionBy('year', 'month')\
              .parquet(path=output_data + 'time')
    
    # read in song data to use for songplays table
    song_df = spark.sql("SELECT * FROM song_data_view")

    # extract columns from joined song and log datasets to create songplays table 
    songplays_table = df.join(song_df, (df.song == song_df.title)\
                                       & (df.artist == song_df.artist_name)\
                                       & (df.length == song_df.duration), "inner")\
                        .distinct()\
                        .select('start_time', 'userId', 'level', 'song_id',\
                                'artist_id', 'sessionId','location','userAgent',\
                                df['year'].alias('year'), df['month'].alias('month'))\
                        .withColumn("songplay_id", monotonically_increasing_id())

    # write songplays table to parquet files partitioned by year and month
    songplays_table.write.mode("overwrite").partitionBy('year', 'month')\
                   .parquet(path=output_data + 'songplays')

    
def main():
    spark = create_spark_session()
    input_data = SOURCE_S3_BUCKET
    output_data = DEST_S3_BUCKET
    
    process_song_data(spark, input_data, output_data)    
    process_log_data(spark, input_data, output_data)


if __name__ == "__main__":
    main()

Overwriting spark_etl.py


In [56]:
!aws s3 cp ./spark_etl.py s3://${S3_BUCKET}/code/pyspark/spark/spark_etl.py

upload: ./spark_etl.py to s3://udacity-dend-redshift/code/pyspark/spark/spark_etl.py


In [57]:
%%sh
aws emr-serverless start-job-run \
    --application-id $APPLICATION_ID \
    --execution-role-arn $JOB_ROLE_ARN \
    --job-driver '{
        "sparkSubmit": {
            "entryPoint": "s3://'${S3_BUCKET}'/code/pyspark/spark/spark_etl.py",
            "sparkSubmitParameters": "--conf spark.driver.cores=1 --conf spark.driver.memory=3g --conf spark.executor.cores=4 --conf spark.executor.memory=3g --conf spark.executor.instances=10"
        }
    }' \
    --configuration-overrides '{
        "monitoringConfiguration": {
            "s3MonitoringConfiguration": {
                "logUri": "s3://'${S3_BUCKET}'/logs/"
            }
        }
    }'

{
    "applicationId": "00f1li2klr0lrv09",
    "jobRunId": "00f1pvv4cq5mtu01",
    "arn": "arn:aws:emr-serverless:us-east-1:390354360073:/applications/00f1li2klr0lrv09/jobruns/00f1pvv4cq5mtu01"
}


In [50]:
!aws s3 cp s3://prod.us-east-1.appinfo.src/emr-serverless/390354360073/applications/00f1li2klr0lrv09/jobs/00f1pv6elvcfbs01/sparklogs/* ./logs --recursive

fatal error: An error occurred (AccessDenied) when calling the ListObjectsV2 operation: Access Denied


In [2]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.master("local[2]").getOrCreate()
df = spark.read.parquet('s3://udacity-dend-redshift/songplays/year=2018/month=11/*.parquet')
df.head(5)