In [23]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, to_timestamp
from pyspark.sql.types import DoubleType, TimestampType
import pandas as pd


config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config.get('CREDENTIAL','AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY']=config.get('CREDENTIAL','AWS_SECRET_ACCESS_KEY')

In [2]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [3]:
song_data = spark.read.json("s3a://udacity-dend/"+'song_data/A/B/*/*.json')

In [4]:
song_data.createOrReplaceTempView("song_data")

In [5]:
song_data.write.mode('overwrite').parquet('/song_data')

In [6]:
songs = spark.sql('''
          SELECT DISTINCT song_id, title, artist_id, year, duration 
          FROM song_data
          '''
          )

In [7]:
songs.take(2)

[Row(song_id='SONIRVF12AB0182AE1', title='Soulful Garage', artist_id='ARVAQTH1187FB3AF44', year=2004, duration=212.24444),
 Row(song_id='SOALFUO12AF72A4B98', title='Long Time Gone', artist_id='ARSFKG71187FB3D66F', year=1969, duration=257.01832)]

In [8]:
songs.write.partitionBy('year','artist_id').mode('overwrite').parquet("/songs")

In [9]:
artist = spark.sql('''
SELECT DISTINCT artist_id, 
       artist_name as name, 
       artist_location as location,
       artist_latitude as latitude,
       artist_longitude as longitude
FROM song_data
''')

In [10]:
artist.write.mode('overwrite').parquet("/artists")

Parse Log Data

In [11]:
log_data = spark.read.json("s3a://udacity-dend/"+'log_data/2018/11/*.json')

In [12]:
log_data.take(3)

[Row(artist='Harmonia', auth='Logged In', firstName='Ryan', gender='M', itemInSession=0, lastName='Smith', length=655.77751, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='Sehr kosmisch', status=200, ts=1542241826796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26'),
 Row(artist='The Prodigy', auth='Logged In', firstName='Ryan', gender='M', itemInSession=1, lastName='Smith', length=260.07465, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='The Big Gundown', status=200, ts=1542242481796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26'),
 Row(artist='Train', auth='Logged In'

In [13]:
df = log_data.where("page=='NextSong'")
df.createOrReplaceTempView("log_data")

In [14]:
users_table = spark.sql('''
        SELECT DISTINCT userid as user_id, 
           firstname as first_name,
           lastname as last_name,
           gender as gender,
           level as level
        FROM log_data
    ''')

In [15]:
users_table.write.parquet("/users")

get time table

In [25]:
get_timestamp = udf(lambda x: x/1000, DoubleType())
df = df.withColumn("ts_timestamp", get_timestamp('ts'))

In [26]:
get_datetime = udf(lambda x: pd.datetime(1970,1,1) + pd.to_timedelta(x, unit='ms'), TimestampType())
df = df.withColumn('ts_datetime', get_datetime('ts'))

In [32]:
df.createOrReplaceTempView("log_data")

In [28]:
time = df.select(
    col('ts_timestamp').alias('start_time'),
    hour('ts_datetime').alias('hour'),
    dayofmonth('ts_datetime').alias('day'),
    weekofyear('ts_datetime').alias('week'),
    month('ts_datetime').alias('month'),
    year('ts_datetime').alias('year'), 
    date_format('ts_datetime', 'u').alias('weekday')
)

In [29]:
time.write.partitionBy('year','month').mode('overwrite').parquet(""+"/time")

songplay data

In [30]:
song_df = spark.read.parquet(""+"/song_data")
song_df.createOrReplaceTempView("song_data")

In [35]:
songplays_table = spark.sql('''
    SELECT log.ts_timestamp as start_time,
           year(ts_datetime) as year,
           month(ts_datetime) as month,
           log.userid as user_id, 
           log.level as level,
           s.song_id as song_id,
           s.artist_id as artist_id,
           log.sessionid as session_id,
           log.location as location,
           log.useragent as user_agent
    FROM log_data log
    JOIN song_data s
    ON log.song = s.title
    AND log.artist = s.artist_name
    AND log.length = s.duration
    ''')

In [37]:
songplays_table.take(1)

[Row(start_time=1542278669.796, year=2018, month=11, user_id='80', level='paid', song_id='SOSDZFY12A8C143718', artist_id='AR748W61187B9B6AB8', session_id=611, location='Portland-South Portland, ME', user_agent='"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"')]

In [38]:
songplays_table.write.partitionBy('year','month').parquet(""+"/songplays")