#test etl
# import libraries

In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, to_timestamp, to_date, expr
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek
from pyspark.sql.types import TimestampType,IntegerType

In [2]:
#set configs from aws keypair
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [3]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.3") \
        .getOrCreate()

In [4]:
#add song data
song_data = os.path.join('data', 'song_data/*/*/*/*.json')

In [5]:
#spark dataframe
df = spark.read.json(song_data)
df.show()

+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|     artist_location|artist_longitude|         artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|ARDR4AC1187FB371A1|           null|                    |            null|Montserrat Caball...|511.16363|        1|SOBAYLL12A8C138AF9|Sono andati? Fing...|   0|
|AREBBGV1187FB523D2|           null|         Houston, TX|            null|Mike Jones (Featu...|173.66159|        1|SOOLYAZ12A6701F4A6|Laws Patrolling (...|   0|
|ARMAC4T1187FB3FA4C|       40.82624|   Morris Plains, NJ|       -74.47995|The Dillinger Esc...|207.77751|        1|SOBBUGU12A8C13E95D|Setting Fire to S...|2004|
|ARPBNLO1187FB3D52F|       40.7145

In [6]:
## extract columns to create songs table
#df.createOrReplaceTempView("payment")
songs_table = df.select("song_id","title","artist_id","year","duration").distinct()

In [7]:
## write songs table to parquet files partitioned by year and artist
songs_table.write.partitionBy("year","artist_id").parquet('output/songs',mode='overwrite')

In [8]:
# extract columns to create artists table
artists_table = df.select("artist_id", col("artist_name").alias("name"), col("artist_location").alias("location"), col("artist_latitude").alias("latitude"), col("artist_longitude").alias("longitude")).distinct()

In [9]:
# write artists table to parquet files
artists_table.write.parquet('output/artists',mode='overwrite')

In [10]:
# get filepath to log data file
log_data =os.path.join('data', 'log_data')

In [11]:
# read log data file
df = spark.read.json(log_data)
df.show()

+--------------------+----------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|              artist|      auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|
+--------------------+----------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|            Harmonia| Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|         The Prodigy| Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Su

In [12]:
# filter by actions for song plays
#df =

In [13]:
# extract columns for users table    
users_table = df.select(col("userId").alias("user_id"), col("firstName").alias("first_name"), col("lastName").alias("last_name"), "gender", "level").distinct()

In [14]:
# write users table to parquet files
users_table.write.parquet('output/users',mode='overwrite')

In [15]:
# create timestamp column from original timestamp column
#get_timestamp = udf(lambda ts: ts, IntegerType())
#df = df.withColumn("timestamp", get_timestamp("ts"))
df = df.withColumn("timestamp", to_timestamp(col("ts")/1000))

In [16]:
# create datetime column from original timestamp column
#get_datetime = udf()
df = df.withColumn("datetime", to_date(col('timestamp')))

In [17]:
# extract columns to create time table
#start_time, hour, day, week, month, year, weekday
time_table = df.select(col('timestamp').alias('start_time'),hour(col('timestamp')).alias('hour'),dayofmonth(col('timestamp')).alias('day'),weekofyear(col('timestamp')).alias('week'),month(col('timestamp')).alias('month'),year(col('timestamp')).alias('year'),dayofweek(col('timestamp')).alias('weekday')).distinct()

In [18]:
# write time table to parquet files partitioned by year and month
time_table.write.partitionBy("year","month").parquet('output/time',mode='overwrite')

In [19]:
# read in song data to use for songplays table
song_df = spark.read.json(os.path.join('data', 'song_data/*/*/*/*.json'))

In [20]:
# extract columns from joined song and log datasets to create songplays table 
#*songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent*
songplays_table = df.join(song_df, [song_df.title == df.song,song_df.duration == df.length], 'left').select(df.timestamp.alias('start_time'),year(df.timestamp).alias('year'),month(df.timestamp).alias('month'),df.userId.alias('user_id'),df.level,song_df.song_id,song_df.artist_id,df.sessionId.alias('session_id'),df.location,df.userAgent.alias('user_agent')).withColumn('songplays_id', expr("uuid()")).distinct()

In [21]:
# write songplays table to parquet files partitioned by year and month
#songplays_table.filter(col("timestamp").isNull()).count()
songplays_table.write.parquet('output/songplays',mode='overwrite')