In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
import os
import configparser

In [2]:
config = configparser.ConfigParser()

#Normally this file should be in ~/.aws/credentials
config.read('dl.cfg')

#print(config['AWS']['AWS_ACCESS_KEY_ID'])

os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']

In [3]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.0") \
        .getOrCreate()

In [7]:
df = spark.read.json("s3a://udacity-dend/song_data/A/A/A")

In [8]:
df.printSchema()
df.show(5)

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)

+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|     artist_location|artist_longitude|         artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|ARTC1LV1187B9A4858|        51.4536|Goldsmith's Colle...|        -0.01802|  The Bonzo Dog Band|3

In [33]:
from pyspark.sql.types import *

schema = StructType([
    StructField("artist_id",StringType()),
    StructField("artist_latitude",DoubleType()),
    StructField("artist_location",StringType()),
    StructField("artist_longitude",DoubleType()),
    StructField("artist_name",StringType()),
    StructField("duration",DoubleType()),
    StructField("num_songs",LongType()),
    StructField("song_id",StringType()),
    StructField("title",StringType()),
    StructField("year",LongType())
    ])

df = spark.read.json("s3a://udacity-dend/song_data/A/A/*/*.json", schema=schema)

In [34]:
df.count()

604

In [37]:
df.createOrReplaceTempView("song_data")
df_songs = spark.sql("""select song_id, title, artist_id, year, duration from song_data""")
df_artists = spark.sql(""" select 
    artist_id, 
    artist_name as name, 
    artist_location as location, 
    artist_latitude as latitude , 
    artist_longitude as longitude 
    from
    song_data """)

+------------------+---------------+--------------------+----------------+--------------------+----------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|     artist_location|artist_longitude|         artist_name|  duration|num_songs|           song_id|               title|year|
+------------------+---------------+--------------------+----------------+--------------------+----------+---------+------------------+--------------------+----+
|ARSUVLW12454A4C8B8|       35.83073|           Tennessee|       -85.97874|Royal Philharmoni...|  94.56281|        1|SOBTCUI12A8AE48B70|Faust: Ballet Mus...|   0|
|ARXQC081187FB4AD42|       54.31407|                  UK|        -2.23001|William Shatner_ ...|1047.71873|        1|SOXRPUH12AB017F769|Exodus: Part I: M...|   0|
|ARWUNH81187FB4A3E0|           null|     Miami , Florida|            null|         Trick Daddy| 227.10812|        1|SOVNKJI12A8C13CB0D|Take It To Da Hou...|2001|
|ARTC1LV1187B9A4858|        

In [113]:

#columns = Seq("song_id","title","artist_id","year","duration")
#df_songs = df_songs.toDF(columns:_*)


df_songs.write \
    .mode("append") \
    .partitionBy("year", "artist_id") \
    .parquet("s3a://data-engineering-files/parquet/songs.parquet")
df_artists.write \
     .mode("append") \
    .parquet("s3a://data-engineering-files/parquet/artists.parquet")


In [48]:
log_schema = StructType([
    StructField("artist",StringType()),
    StructField("auth",StringType()),
    StructField("firstName",StringType()),
    StructField("gender",StringType()),
    StructField("itemInSession",LongType()),
    StructField("lastName",StringType()),
    StructField("length",DoubleType()),
    StructField("level",StringType()),
    StructField("location",StringType()),
    StructField("method",StringType()),
    StructField("page",StringType()),
    StructField("registration",StringType()),
    StructField("sessionId",StringType()),
    StructField("song",StringType()),
    StructField("status",IntegerType()),
    StructField("ts",LongType()),
    StructField("userAgent",StringType()),
    StructField("userId",StringType())
    ])


log_data_df = spark.read.json("s3a://udacity-dend/log_data/*/*/*.json", schema=log_schema, multiLine="true")

In [49]:
log_data_df.count()

30

In [51]:
log_data_df.show()
log_data_df.createOrReplaceTempView("log_data")

+--------------------+----------+---------+------+-------------+---------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|              artist|      auth|firstName|gender|itemInSession| lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|
+--------------------+----------+---------+------+-------------+---------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|            Harmonia| Logged In|     Ryan|     M|            0|    Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|                null|Logged Out|     null|  null|            0|     null|     null| free|      

In [54]:
user_df = spark.sql("""select 
    CAST(userId as INT) as user_id, 
    firstName as first_name, 
    lastName as last_name, 
    gender,
    level 
    from
    log_data
    where page ='NextSong'""")
user_df.show(5)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     26|      Ryan|    Smith|     M| free|
|     95|      Sara|  Johnson|     F| paid|
|     80|     Tegan|   Levine|     F| paid|
|     69|  Anabelle|  Simpson|     F| free|
|     91|    Jayden|     Bell|     M| free|
+-------+----------+---------+------+-----+
only showing top 5 rows



In [122]:
#Pending Items 1. Append Unique_id 2. give default value for user_id

song_plays_df = spark.sql(""" select
    from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss') as start_time,
    CAST(userId as INT) as user_id,
    level,
    s.song_id as song_id,
    s.artist_id as artist_id,
    sessionId as session_id,
    location as location,
    userAgent as user_agent,
    from_unixtime(ts/1000,'MMM') as month,
    from_unixtime(ts/1000,'y') as year
    from log_data l
    left join 
    song_data s
    on l.artist = s.artist_name 
    where page ='NextSong'
    """)
#song_plays_df.show(5)
song_plays_df.count()

21

In [117]:
#Reference https://stackoverflow.com/questions/51200217/how-to-create-sequential-number-column-in-pyspark-dataframe

from pyspark.sql.window import Window
from pyspark.sql.functions import monotonically_increasing_id,row_number

song_plays_df =song_plays_df.withColumn("songplay_id",row_number().over(Window.orderBy(monotonically_increasing_id())))
song_plays_df.show(5)

+-------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+-----+----+-----------+
|         start_time|user_id|level|           song_id|         artist_id|session_id|            location|          user_agent|month|year|songplay_id|
+-------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+-----+----+-----------+
|2018-11-14 19:30:26|     26| free|              null|              null|       583|San Jose-Sunnyval...|"Mozilla/5.0 (X11...|  Nov|2018|          1|
|2018-11-13 19:03:22|     95| paid|              null|              null|       411|   Winston-Salem, NC|"Mozilla/5.0 (iPh...|  Nov|2018|          2|
|2018-11-27 19:00:15|     80| paid|              null|              null|       992|Portland-South Po...|"Mozilla/5.0 (Mac...|  Nov|2018|          3|
|2018-11-04 19:33:12|     69| free|SOBBGQK12AB0183F1E|AR4E4121187FB51F4E|       256|Philadelphia-Cam

In [105]:
time_df = spark.sql("""select 
    ts as ts,
    from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss') as start_time,
    from_unixtime(ts/1000,'H') as hour,
    from_unixtime(ts/1000,'dd') as day,
    from_unixtime(ts/1000,'MMM') as month,
    from_unixtime(ts/1000,'y') as year,
    from_unixtime(ts/1000,'E') as weekday
    from
    log_data
    where page ='NextSong'""")

In [106]:
from datetime import datetime as dt

convertToWeekDay = F.udf(lambda z: dt.fromtimestamp(z/1000).isocalendar()[1], IntegerType())

In [110]:
time_df = time_df.withColumn('week', convertToWeekDay(F.col('ts')))
time_df = time_df.drop('ts')
time_df.show(5)

+-------------------+----+---+-----+----+-------+----+
|         start_time|hour|day|month|year|weekday|week|
+-------------------+----+---+-----+----+-------+----+
|2018-11-14 19:30:26|  19| 14|  Nov|2018|    Wed|  46|
|2018-11-13 19:03:22|  19| 13|  Nov|2018|    Tue|  46|
|2018-11-27 19:00:15|  19| 27|  Nov|2018|    Tue|  48|
|2018-11-04 19:33:12|  19| 04|  Nov|2018|    Sun|  44|
|2018-11-29 19:22:07|  19| 29|  Nov|2018|    Thu|  48|
+-------------------+----+---+-----+----+-------+----+
only showing top 5 rows



In [111]:
user_df.write.parquet("s3a://data-engineering-files/parquet/users.parquet")

time_df.write.parquet("s3a://data-engineering-files/parquet/time.parquet")

In [118]:
song_plays_df.write \
.mode("append") \
.partitionBy("year","month") \
.parquet("s3a://data-engineering-files/parquet/songplays.parquet")