In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek
from pyspark.sql.types import TimestampType, DateType


In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config.get('AWS','AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY']=config.get('AWS','AWS_SECRET_ACCESS_KEY')

In [3]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

In [4]:

spark = create_spark_session()
input_data = "s3a://udacity-dend/"
output_data = "s3a://output0033/"


In [5]:
song_data =input_data+ 'song_data/A/A/A/*.json' 

# read song data file
df = spark.read.json(song_data)

df.limit(10).toPandas()

df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [12]:
# extract columns to create songs table
songs_table = df.select("song_id", "title", "artist_id", "year", "duration")

songs_table.createOrReplaceTempView("song_data_table")

songs_table = spark.sql("""
                            SELECT DISTINCT song_id, 
                            title,
                            artist_id,
                            year,
                            duration
                            FROM song_data_table 
                            WHERE song_id IS NOT NULL
                        """)

songs_table.limit(10).toPandas()



Unnamed: 0,song_id,title,artist_id,year,duration
0,SOHOZBI12A8C132E3C,Smash It Up,AR0MWD61187B9B2B12,2000,195.39546
1,SOXZYWX12A6310ED0C,It's About Time,ARC1IHZ1187FB4E920,0,246.9873
2,SOHKNRJ12A6701D1F8,Drop of Rain,AR10USD1187B99F3F1,0,189.57016
3,SOIGICF12A8C141BC5,Game & Watch,AREWD471187FB49873,2004,580.54485
4,SOAPERH12A58A787DC,The One And Only (Edited),ARZ5H0P1187B98A1DD,0,230.42567
5,SOBLFFE12AF72AA5BA,Scream,ARJNIUY12298900C91,2009,213.9424
6,SOABWAP12A8C13F82A,Take Time,AR5LMPY1187FB573FE,1978,258.89914
7,SODZYPO12A8C13A91E,Burn My Body (Album Version),AR1C2IX1187B99BF74,0,177.99791
8,SOFSOCN12A8C143F5D,Face the Ashes,ARXR32B1187FB57099,2007,209.60608
9,SOCIWDW12A8C13D406,Soul Deep,ARMJAGH1187FB546F3,1969,148.03546


In [7]:
## write songs table to parquet files partitioned by year and artist
songs_table.write.partitionBy("year", "artist_id").mode('overwrite').parquet(os.path.join(output_data, 'songs'))

In [13]:
# write users table to parquet files
artists_table=df.select("artist_id", "artist_name", "artist_location", "artist_latitude", "artist_longitude")
artists_table=artists_table.withColumnRenamed("artist_name","name").withColumnRenamed("artist_location","location").withColumnRenamed("artist_latitude","lattitude").withColumnRenamed("artist_longitude","longitude")

artists_table.createOrReplaceTempView("artist_data_table")

artists_table = spark.sql("""
                                SELECT DISTINCT artist_id, 
                                name,
                                location,
                                lattitude,
                                longitude
                                FROM artist_data_table 
                                WHERE artist_id IS NOT NULL
                            """)

artists_table.limit(10).toPandas()


Unnamed: 0,artist_id,name,location,lattitude,longitude
0,ARJNIUY12298900C91,Adelitas Way,,,
1,AR5LMPY1187FB573FE,Chaka Khan_ Rufus,"Chicago, IL",41.88415,-87.63241
2,AR1C2IX1187B99BF74,Broken Spindles,,,
3,ARC1IHZ1187FB4E920,Jamie Cullum,,,
4,ARKYKXP11F50C47A6A,The Supersuckers,,,
5,ARSVTNL1187B992A91,Jonathan King,"London, England",51.50632,-0.12714
6,ARXR32B1187FB57099,Gob,,,
7,ARZ5H0P1187B98A1DD,Snoop Dogg,"Long Beach, CA",33.76672,-118.1924
8,ARY589G1187B9A9F4E,Talkdemonic,"Portland, OR",45.51179,-122.67563
9,AR9Q9YC1187FB5609B,Quest_ Pup_ Kevo,New Jersey,,


In [17]:
artists_table.write.mode('overwrite').parquet(os.path.join(output_data, 'artists'))

In [14]:
log_data =input_data+ 'log_data/2018/11/*.json' 

# read log data file
df = spark.read.json(log_data)

df.limit(10).toPandas()


Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26.0
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26.0
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26.0
3,,Logged In,Wyatt,M,0,Scott,,free,"Eureka-Arcata-Fortuna, CA",GET,Home,1540872000000.0,563,,200,1542247071796,Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7....,9.0
4,,Logged In,Austin,M,0,Rosales,,free,"New York-Newark-Jersey City, NY-NJ-PA",GET,Home,1541060000000.0,521,,200,1542252577796,Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20...,12.0
5,Sony Wonder,Logged In,Samuel,M,0,Gonzalez,218.06975,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540493000000.0,597,Blackbird,200,1542253449796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",61.0
6,,Logged In,Samuel,M,1,Gonzalez,,free,"Houston-The Woodlands-Sugar Land, TX",GET,About,1540493000000.0,597,,200,1542253460796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",61.0
7,,Logged Out,,,0,,,paid,,PUT,Login,,602,,307,1542260074796,,
8,,Logged In,Tegan,F,1,Levine,,paid,"Portland-South Portland, ME",GET,Home,1540794000000.0,602,,200,1542260277796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80.0
9,Van Halen,Logged In,Tegan,F,2,Levine,289.38404,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Best Of Both Worlds (Remastered Album Version),200,1542260935796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80.0


In [15]:
df = df.filter(df.page == 'NextSong')

df.limit(10).toPandas()    

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
3,Sony Wonder,Logged In,Samuel,M,0,Gonzalez,218.06975,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540493000000.0,597,Blackbird,200,1542253449796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",61
4,Van Halen,Logged In,Tegan,F,2,Levine,289.38404,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Best Of Both Worlds (Remastered Album Version),200,1542260935796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80
5,Magic Sam,Logged In,Tegan,F,3,Levine,132.04853,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Call Me If You Need Me,200,1542261224796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80
6,Edward Sharpe & The Magnetic Zeros,Logged In,Tegan,F,4,Levine,306.31138,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Home,200,1542261356796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80
7,Usher featuring will.i.am,Logged In,Tegan,F,5,Levine,395.72853,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,OMG,200,1542261662796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80
8,Helen Reddy,Logged In,Tegan,F,7,Levine,176.50893,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Candle On The Water,200,1542262057796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80
9,Taylor Swift,Logged In,Tegan,F,8,Levine,201.06404,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Our Song,200,1542262233796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80


In [17]:
# extract columns for users table    
users_table = df.select("userid", "firstName", "lastName", "gender", "level").withColumnRenamed("userid","user_id").withColumnRenamed("firstName","first_name").withColumnRenamed("lastName","last_name")

users_table.limit(10).toPandas()

users_table.createOrReplaceTempView("users_data_table")

# extract columns for users table    
users_table = spark.sql("""
                        SELECT DISTINCT user_id, 
                        first_name,
                        last_name,
                        gender,
                        level
                        FROM users_data_table 
                        WHERE user_id IS NOT NULL """)

# write users table to parquet files
users_table.write.mode('overwrite').parquet(os.path.join(output_data, 'users'))

In [8]:
# create timestamp column from original timestamp column
get_time = udf(lambda x: datetime.fromtimestamp(x / 1000.0), TimestampType())
df = df.withColumn("start_time", get_time("ts"))

# # create datetime column from original timestamp column
get_date = udf(lambda x: datetime.fromtimestamp(x / 1000.0), DateType())
df = df.withColumn("date", get_date("ts"))

df.limit(10).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,start_time,date
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:30:26.796,2018-11-15
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:41:21.796,2018-11-15
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:45:41.796,2018-11-15
3,Sony Wonder,Logged In,Samuel,M,0,Gonzalez,218.06975,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540493000000.0,597,Blackbird,200,1542253449796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",61,2018-11-15 03:44:09.796,2018-11-15
4,Van Halen,Logged In,Tegan,F,2,Levine,289.38404,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Best Of Both Worlds (Remastered Album Version),200,1542260935796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,2018-11-15 05:48:55.796,2018-11-15
5,Magic Sam,Logged In,Tegan,F,3,Levine,132.04853,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Call Me If You Need Me,200,1542261224796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,2018-11-15 05:53:44.796,2018-11-15
6,Edward Sharpe & The Magnetic Zeros,Logged In,Tegan,F,4,Levine,306.31138,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Home,200,1542261356796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,2018-11-15 05:55:56.796,2018-11-15
7,Usher featuring will.i.am,Logged In,Tegan,F,5,Levine,395.72853,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,OMG,200,1542261662796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,2018-11-15 06:01:02.796,2018-11-15
8,Helen Reddy,Logged In,Tegan,F,7,Levine,176.50893,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Candle On The Water,200,1542262057796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,2018-11-15 06:07:37.796,2018-11-15
9,Taylor Swift,Logged In,Tegan,F,8,Levine,201.06404,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Our Song,200,1542262233796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,2018-11-15 06:10:33.796,2018-11-15


In [9]:
# extract columns to create time table
# columns = start_time, hour, day, week, month, year, weekday
time_table = df.select("start_time", 
                       hour("date").alias("hour"), 
                       dayofmonth("date").alias("day"), 
                       weekofyear("date").alias("week"), 
                       month("date").alias("month"),
                       year("date").alias("year"),
                       dayofweek("date").alias("weekday")
                    ).distinct()

In [10]:
df.limit(10).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,start_time,date
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:30:26.796,2018-11-15
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:41:21.796,2018-11-15
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:45:41.796,2018-11-15
3,Sony Wonder,Logged In,Samuel,M,0,Gonzalez,218.06975,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540493000000.0,597,Blackbird,200,1542253449796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",61,2018-11-15 03:44:09.796,2018-11-15
4,Van Halen,Logged In,Tegan,F,2,Levine,289.38404,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Best Of Both Worlds (Remastered Album Version),200,1542260935796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,2018-11-15 05:48:55.796,2018-11-15
5,Magic Sam,Logged In,Tegan,F,3,Levine,132.04853,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Call Me If You Need Me,200,1542261224796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,2018-11-15 05:53:44.796,2018-11-15
6,Edward Sharpe & The Magnetic Zeros,Logged In,Tegan,F,4,Levine,306.31138,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Home,200,1542261356796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,2018-11-15 05:55:56.796,2018-11-15
7,Usher featuring will.i.am,Logged In,Tegan,F,5,Levine,395.72853,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,OMG,200,1542261662796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,2018-11-15 06:01:02.796,2018-11-15
8,Helen Reddy,Logged In,Tegan,F,7,Levine,176.50893,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Candle On The Water,200,1542262057796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,2018-11-15 06:07:37.796,2018-11-15
9,Taylor Swift,Logged In,Tegan,F,8,Levine,201.06404,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Our Song,200,1542262233796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80,2018-11-15 06:10:33.796,2018-11-15


In [18]:
songs_table.createOrReplaceTempView("songs")
artists_table.createOrReplaceTempView("artists")

song_df = df.select("song", "artist",
                         "length", 
                         "page", 
                         "start_time",
                         "userId", 
                         "level", 
                         "sessionId",
                         "location", 
                         "userAgent",
                         "date",
                        month("date").alias("month"),
                        year("date").alias("year"),
                        )


song_df.limit(10).toPandas()

AnalysisException: "cannot resolve '`start_time`' given input columns: [lastName, status, firstName, artist, itemInSession, location, sessionId, userId, method, length, song, level, page, ts, userAgent, registration, gender, auth];;\n'Project [song#168, artist#155, length#161, page#165, 'start_time, userId#172, level#162, sessionId#167L, location#163, userAgent#171, 'date, month('date) AS month#301, year('date) AS year#302]\n+- Filter (page#165 = NextSong)\n   +- Relation[artist#155,auth#156,firstName#157,gender#158,itemInSession#159L,lastName#160,length#161,level#162,location#163,method#164,page#165,registration#166,sessionId#167L,song#168,status#169L,ts#170L,userAgent#171,userId#172] json\n"

In [None]:

song_df.createOrReplaceTempView("logs")

songplays_table = spark.sql(
            """
            SELECT row_number() OVER (PARTITION BY start_time ORDER BY start_time) as songplay_id,
                   e.start_time, 
                   e.userId AS user_id, 
                   e.level AS level, 
                   s.song_id AS song_id, 
                   s.artist_id AS artist_id, 
                   e.sessionId AS session_id, 
                   e.location AS location, 
                   e.userAgent AS user_agent,
                   e.year,
                   e.month
            FROM logs e
            LEFT JOIN songs s 
                   ON e.song=s.title
            LEFT JOIN artists a 
                   ON e.artist=a.name
            """
        )

In [None]:
songplays_table.limit(10).toPandas()


In [None]:
songplays_table.write.partitionBy("year", "month").mode('overwrite').parquet(os.path.join(output_data, 'songplays'))
