In [19]:
import configparser
import os

In [20]:
config = configparser.ConfigParser()
config.read_file(open('aws/credentials.cfg'))

os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']

In [21]:
from pyspark.sql import SparkSession

def create_spark_session():
    spark = SparkSession \
            .builder \
            .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.5") \
            .config("fs.s3a.awsAccessKeyId", os.environ["AWS_ACCESS_KEY_ID"]) \
            .config("fs.s3a.awsSecretAccessKey", os.environ["AWS_SECRET_ACCESS_KEY"]) \
            .config("fs.s3a.endpoint", 's3.us-west-2.amazonaws.com') \
            .config("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
            .getOrCreate()
    return spark

In [22]:
spark = create_spark_session()

In [24]:
spark

## Import Libraries

In [25]:
import os
import glob, json
import pandas as pd

song_data = "data/song_data"
log_data = "data/log-data"  

### Path for JSON Files

In [26]:
song_data = "data/song_data"
log_data = "data/log-data" 

### Functions

In [8]:
'''
Content function, return all json paths.
    Args: 
        path (str): path for json files
'''

def files_path(path):
    contents = []
    for root, dirs, files in os.walk(path):
        files = glob.glob(os.path.join(root, '*.json'))
        for f in files:
            contents.append(os.path.abspath(f))
        
    return contents

In [9]:
'''
Data Frame function, iterate into every file path from function before.
Create a dataframe for every json file and finally concat all information. 
Return a DataFrame with all json information on it

    Args:
        content (list): A list for every json path file
'''

def data_frame(content):
    dfs = []
    for file in content:
        with open(file) as f:
            dfs.append(pd.DataFrame(json.loads(line) for line in f))
    
    df = pd.concat(dfs, ignore_index=True)
    return df

### Song DataFrame - Pandas

In [10]:
song_content = files_path(song_data)
dfSong_pd = data_frame(song_content)
dfSong_pd.head(1)

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,AR7G5I41187FB4CE6C,,"London, England",,Adam Ant,233.40363,1,SONHOTT12A8C13493C,Something Girls,1982


### Log Events DataFrame - Pandas

In [11]:
log_content = files_path(log_data)
dfLog_pd = data_frame(log_content)
dfLog_pd.head(1)

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Frumpies,Logged In,Anabelle,F,0,Simpson,134.47791,free,"Philadelphia-Camden-Wilmington, PA-NJ-DE-MD",PUT,NextSong,1541044000000.0,455,Fuck Kitty,200,1541903636796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",69


### Spark DataFrames (Songs and Log Events)

In [11]:
# Funtions for schemas 
from pyspark.sql.types import StructType as R, StructField as Fld, StringType as Str, DoubleType as Dlb, IntegerType as Int, DateType as Date, LongType as Lng, DecimalType as Dec 


def song_schema():
    schema = R([
        Fld("artist_id", Str()),
        Fld("artist_latitude", Dlb()),
        Fld("artist_longitude", Dlb()),
        Fld("artist_location", Str()),
        Fld("artist_name", Str()),
        Fld("duration", Dlb()),
        Fld("num_songs", Int()),
        Fld("song_id", Str()),
        Fld("title", Str()),
        Fld("year", Int())    
        ])
    
    return schema


def log_schema():
    schema = R([
        Fld("artist", Str()),
        Fld("auth", Str()),
        Fld("firstName", Str()),
        Fld("gender", Str()),
        Fld("itemInSession", Int()),
        Fld("lastName", Str()),
        Fld("length", Dlb()),
        Fld("level", Str()),
        Fld("location", Str()),
        Fld("method", Str()),
        Fld("page", Str()),
        Fld("registration", Str()),
        Fld("sessionId", Int()),
        Fld("song", Str()),
        Fld("status", Str()),
        Fld("ts", Lng()),
        Fld("userAgent", Str()),
        Fld("userId", Str())
    ])
    
    return schema

In [12]:
song_content = files_path(song_data)

dfSong = spark.read.json(song_content)
dfSong.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [13]:
log_content = files_path(log_data)

dfLog = spark.read.json(log_content)
dfLog.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [19]:
dfSong.limit(5).toPandas()

Unnamed: 0,artist_id,artist_latitude,artist_longitude,artist_location,artist_name,duration,num_songs,song_id,title,year
0,ARDR4AC1187FB371A1,,,,Montserrat Caballé;Placido Domingo;Vicente Sar...,511.16363,1,SOBAYLL12A8C138AF9,Sono andati? Fingevo di dormire,0
1,AREBBGV1187FB523D2,,,"Houston, TX",Mike Jones (Featuring CJ_ Mello & Lil' Bran),173.66159,1,SOOLYAZ12A6701F4A6,Laws Patrolling (Album Version),0
2,ARMAC4T1187FB3FA4C,40.82624,-74.47995,"Morris Plains, NJ",The Dillinger Escape Plan,207.77751,1,SOBBUGU12A8C13E95D,Setting Fire to Sleeping Giants,2004
3,ARPBNLO1187FB3D52F,40.71455,-74.00712,"New York, NY",Tiny Tim,43.36281,1,SOAOIBZ12AB01815BE,I Hold Your Hand In Mine [Live At Royal Albert...,2000
4,ARDNS031187B9924F0,32.67828,-83.22295,Georgia,Tim Wilson,186.48771,1,SONYPOM12A8C13B2D7,I Think My Wife Is Running Around On Me (Taco ...,2005


In [15]:
dfLog.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541016707796.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541016707796.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541016707796.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
3,,Logged In,Wyatt,M,0,Scott,,free,"Eureka-Arcata-Fortuna, CA",GET,Home,1540872073796.0,563,,200,1542247071796,Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7....,9
4,,Logged In,Austin,M,0,Rosales,,free,"New York-Newark-Jersey City, NY-NJ-PA",GET,Home,1541059521796.0,521,,200,1542252577796,Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20...,12


## Process Song-Data

In [77]:
''' 
Tables for start-schema
'''

song_table = dfSong.select('song_id', 'title', 'artist_id', 'year', 'duration').dropDuplicates()
artist_table = dfSong.select('artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude').dropDuplicates()

### Renaming columns for artist_table

In [15]:
from functools import reduce

column_artist = artist_table.schema.names
new_columns = ['artist_id', 'artist_name', 'location', 'latitude', 'longitude']
artist_table = reduce(lambda artist_table, idx: artist_table.withColumnRenamed(column_artist[idx], new_columns[idx]), range(len(column_artist)), artist_table)


In [16]:
artist_table.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)



In [17]:
song_table.printSchema()

root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)



### Parquet files Song-Data

In [18]:
#song_table.write.partitionBy("year","artist_id").parquet('s3a://bucket-etl/song/')

KeyboardInterrupt: 

In [None]:
#artist_table.write.partitionBy("artist_id").parquet('s3a://bucket-etl/artist/')

## Process Song-Data

In [79]:
# filter dataframe for NextSong action
dfLog = dfLog.filter("page = 'NextSong'")

users_table = dfLog.select("userId", "firstName", "lastName", "gender", "level").dropDuplicates()

In [80]:
users_table.count()

104

In [28]:
users_table.printSchema()

root
 |-- userId: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)



### Convert `ts` column to Timestamp

In [29]:
from pyspark.sql import functions as F
from pyspark.sql import types as T
import datetime

get_datetime = F.udf(lambda x: datetime.datetime.fromtimestamp((x/1000)), T.TimestampType())

In [30]:
dfLog = dfLog.withColumn("ts", get_datetime(dfLog.ts))
dfLog.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: timestamp (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



### Time DataFrame

In [31]:
time_table = dfLog.select(F.col("ts").alias("start_time"), 
                     F.hour("ts").alias("hour"), 
                     F.dayofmonth("ts").alias("day"), 
                     F.weekofyear("ts").alias("week"), 
                     F.month("ts").alias("month"), 
                     F.year("ts").alias("year"), 
                     F.dayofweek("ts").alias("weekday"))

In [82]:
time_table.dropDuplicates().count()

6813

In [32]:
time_table.printSchema()

root
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)



In [33]:
time_table.limit(5).toPandas()

Unnamed: 0,start_time,hour,day,week,month,year,weekday
0,2018-11-14 21:30:26.796,21,14,46,11,2018,4
1,2018-11-14 21:41:21.796,21,14,46,11,2018,4
2,2018-11-14 21:45:41.796,21,14,46,11,2018,4
3,2018-11-15 00:44:09.796,0,15,46,11,2018,5
4,2018-11-15 02:48:55.796,2,15,46,11,2018,5


### SongPlays Table 

In [56]:
from pyspark.sql import Window

join_condition  = (dfSong.artist_name == dfLog.artist) & (dfSong.title == dfLog.song)
window = Window.orderBy(F.monotonically_increasing_id())


songplays_table = dfLog.join(dfSong, join_condition)\
                    .select(F.row_number().over(window).alias("songplay_id"), \
                               dfLog.ts.alias("start_time"), dfLog.userId.alias("user_id"), \
                               dfLog.level, \
                               dfSong.song_id, \
                               dfSong.artist_id, \
                               dfLog.sessionId.alias("session_id"), \
                               dfLog.location, \
                               dfLog.userAgent.alias("user_agent"))

In [57]:
songplays_table.printSchema()

root
 |-- songplay_id: integer (nullable = true)
 |-- start_time: timestamp (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)



In [58]:
songplays_table.toPandas()

Unnamed: 0,songplay_id,start_time,user_id,level,song_id,artist_id,session_id,location,user_agent
0,1,2018-11-21 18:56:47.796,15,paid,SOZCTXZ12AB0182364,AR5KOSW1187FB35FF4,818,"Chicago-Naperville-Elgin, IL-IN-WI","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."


In [60]:
df = dfLog
dfsong = dfSong

join_condition = (dfsong.artist_name == df.artist) & (dfsong.title == df.song)

# Window to create a rownumber column (songplay_id on songplays table)
window = Window.orderBy(F.monotonically_increasing_id())

# extract columns from joined song and log datasets to create songplays table
songplays_table = df.join(dfsong, join_condition)\
    .select(F.row_number().over(window).alias("songplay_id"),
            df.ts.alias("start_time"), df.userId.alias("user_id"),
            df.level,
            dfsong.song_id,
            dfsong.artist_id,
            df.sessionId.alias("session_id"),
            df.location,
            df.userAgent.alias("user_agent"))

In [61]:
songplays_table.toPandas()

Unnamed: 0,songplay_id,start_time,user_id,level,song_id,artist_id,session_id,location,user_agent
0,1,2018-11-21 18:56:47.796,15,paid,SOZCTXZ12AB0182364,AR5KOSW1187FB35FF4,818,"Chicago-Naperville-Elgin, IL-IN-WI","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."


In [71]:
songplays_table.withColumn("year", F.year("start_time")) \
                .withColumn("month", F.month("start_time")) \
                .write.partitionBy("year", "month") \
                .parquet("s3a://bucket-etl/songplays.parquet")

In [75]:
import os 

dfparquet =  spark.read.parquet("s3a://bucket-etl/songplays.parquet")

In [76]:
dfparquet.printSchema()

root
 |-- songplay_id: integer (nullable = true)
 |-- start_time: timestamp (nullable = true)
 |-- user_id: string (nullable = true)
 |-- level: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- session_id: long (nullable = true)
 |-- location: string (nullable = true)
 |-- user_agent: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)

