# Data Lake with Spark (Project 4)

In [1]:
from pyspark.sql import SparkSession

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
7,application_1623793692045_0026,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
spark = create_spark_session()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Initialize

Run this command to remove any existing files in processed directory in HDFS.

```bash
hadoop fs -rm -r hdfs:///processed/*
```

# Songs dataset

## Load data

In [5]:
staging_songs = spark.read.json("s3a://udacity-dend/song-data/A/A/*/*.json")  # TODO: change A/A to */* in prod

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
staging_songs.createOrReplaceTempView("staging_songs")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Transform

### Songs

In [7]:
songs = spark.sql(
'''
    select
        song_id,
        title,
        artist_id,
        year,
        duration
    from
        staging_songs
'''
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
songs.write.parquet('hdfs:///processed/songs')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Artists

In [9]:
artists = spark.sql(
'''
    select
        artist_id,
        max(artist_name) as name,
        max(artist_location) as location,
        max(artist_latitude) as latitude,
        max(artist_longitude) as longitude
    from
        staging_songs
    group by 1
'''
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
artists.write.parquet('hdfs:///processed/artists')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Logs dataset

## Load data

In [11]:
base_events = spark.read.json("s3a://udacity-dend/log-data/*/*/*.json")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
base_events.createOrReplaceTempView("base_events")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
staging_events = spark.sql(
'''
    select
        artist as artist,
        auth as auth,
        firstName as first_name,
        gender as gender,
        itemInSession as item_in_session,
        lastName as last_name,
        length as length,
        level as level,
        location as location,
        method as method,
        page as page,
        registration as registration,
        sessionId as session_id,
        song as song,
        status as status,
        ts as ts,
        userAgent as user_agent,
        userId as user_id
    from
        base_events
'''
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
staging_events.createOrReplaceTempView("staging_events")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Transform

### Users

In [15]:
users = spark.sql(
'''
    with events as
    (
        select * from staging_events
    ),

    add_row_number_and_remove_null_ids as
    (
        select
            *,
            row_number() over (partition by user_id order by ts desc) as row_number_by_user
        from
            events
        where
            user_id is not null
    ),
    
    latest_user_events as
    (
        -- this is done to get latest user event since `level` can change over time
        select
            *
        from
            add_row_number_and_remove_null_ids
        where
            row_number_by_user = 1
    )

    select
        user_id,
        first_name,
        last_name,
        gender,
        level
    from
        latest_user_events
'''
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
users.write.parquet('hdfs:///processed/users')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Songplays

In [17]:
songplays = spark.sql(
'''
    with events as
    (
       select * from staging_events
    ),

    songs as
    (
        select * from staging_songs
    ),

    only_next_song_events as
    (
        select
            *
        from
            events
        where
            page = 'NextSong'
    ),

    events_with_converted_timestamp as
    (
        select
            *,
            from_unixtime(ts / 1000) as start_time
        from
            only_next_song_events
    ),

    joined as
    (
        select
            e.start_time,
            e.user_id,
            e.level,
            s.song_id,
            s.artist_id,
            e.session_id,
            e.location,
            e.user_agent
        from
            events_with_converted_timestamp e
            left outer join songs s on e.song = s.title
    ),

    final as (
        select
            -- create songplay id based on other columns that form a unique combination
            md5(
                coalesce(cast(user_id as string), '') || coalesce(song_id, '') || coalesce(cast(start_time as string), '')
            ) as songplay_id,
            *
        from
            joined
    )

    select * from final

'''
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
songplays.createOrReplaceTempView("songplays")  # Register as temp view to be used as time table dependency

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
songplays.write.parquet('hdfs:///processed/songplays')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Time

In [20]:
time = spark.sql(
'''
    select
        start_time,
        extract(hour from start_time) as hour,
        extract(day from start_time) as day,
        extract(week from start_time) as week,
        extract(month from start_time) as month,
        extract(year from start_time) as year,
        dayofweek(start_time) as weekday
    from
        songplays

'''
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
time.write.parquet('hdfs:///processed/time')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Drop temp views

In [22]:
spark.catalog.dropTempView("staging_songs")
spark.catalog.dropTempView("base_events")
spark.catalog.dropTempView("staging_events")
spark.catalog.dropTempView("songplays")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Copy from HDFS to S3

Finally, run this command to copy from HDFS to S3:

```bash
s3-dist-cp \
--src hdfs:///processed \
--dest s3a://udacity-data-lake-spark/processed
```