In [3]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

In [4]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [5]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

spark = create_spark_session()
input_data = 's3a://dend/'
# input_data = "s3a://udacity-dend/"
output_data = "s3a://dend-bucket-twd/analytics/"
# input_data = 'data/'
# output_data = 'data/tmp/'

In [6]:
# get filepath to song data file
song_data = input_data+'song_data/*/*/*/*.json' 

song_data

's3a://dend/song_data/*/*/*/*.json'

In [7]:
# read song data file
df = spark.read.json(song_data)

In [7]:
# extract columns to create songs table
songs_table = df.select("song_id", "title", "artist_id", "year", "duration")

In [10]:
# write songs table to parquet files partitioned by year and artist
songs_table.write.partitionBy('year', 'artist_id').parquet(output_data+'songs', 'overwrite')

In [11]:
# extract columns to create artists table
artists_table = df.select("artist_id", "artist_name", "artist_location", "artist_latitude", "artist_longitude")

In [12]:
# write artists table to parquet files
artists_table.write.parquet(output_data+'artists', 'overwrite')

## Log Data

In [23]:
# def process_log_data(spark, input_data, output_data):
# get filepath to log data file

# log_data = input_data+'*.json'
log_data = os.path.join(input_data, "log_data/*/*/*.json")

# read log data file
df = spark.read.json(log_data)

In [24]:
# filter by actions for song plays
df = df.where(df.page=='NextSong')

In [25]:
# extract columns for users table    
users_table = df.select(col('userId').alias('user_id'), col('firstName').alias('first_name'), 
                        col('lastName').alias('last_name'), 'gender', 'level')

In [21]:
# write users table to parquet files
users_table.write.parquet(output_data+'users', 'overwrite')

In [22]:
# create timestamp column from original timestamp column
# import pandas as pd
# from pyspark.sql.types import TimestampType

# get_timestamp = udf(lambda ts: pd.to_datetime(ts, unit='ms'), TimestampType())
# df = df.withColumn('timestamp', get_timestamp(df.ts))

get_timestamp = udf(lambda x: str(int(int(x) / 1000)))
df = df.withColumn('timestamp', get_timestamp(df.ts))

In [23]:
# create datetime column from original timestamp column
# get_datetime = udf(lambda ts: pd.to_datetime(ts, unit='ms'), TimestampType())
# df = df.withColumn('datetime', get_datetime(df.ts))

get_datetime = udf(lambda x: str(datetime.fromtimestamp(int(x) / 1000.0)))
df = df.withColumn('datetime', get_datetime(df.ts))

In [24]:
# extract columns to create time table
# time_table = df.select(date_format('timestamp', 'H:m:s').alias('start_time'), hour('datetime').alias('hour'), 
#           dayofmonth('datetime').alias('day'), weekofyear('datetime').alias('week'), month('datetime').alias('month'), 
#           year('datetime').alias('year'), date_format('datetime', 'u').alias('weekday'))

time_table = df.select(
    'timestamp',
    hour('datetime').alias('hour'),
    dayofmonth('datetime').alias('day'),
    weekofyear('datetime').alias('week'),
    month('datetime').alias('month'),
    year('datetime').alias('year'),
    date_format('datetime', 'u').alias('weekday')
  )

In [25]:
# write time table to parquet files partitioned by year and month
time_table.write.partitionBy('year', 'month').parquet(output_data+'time', 'overwrite')

In [27]:
# read in song data to use for songplays table
song_df = spark.read.json(input_data+'song_data/*/*/*/*.json')


In [28]:
# extract columns from joined song and log datasets to create songplays table 
songplays_table = df.join(song_df, (df.song == song_df.title) & 
                          (df.artist == song_df.artist_name) & (df.length == song_df.duration),
                         'left_outer').select(
                             df.timestamp,
                             col('userId').alias('user_id'),
                             df.level,
                             song_df.song_id,
                             song_df.artist_id,
                             col('sessionId').alias('session_id'),
                             df.location,
                             col('userAgent').alias('user_agent'),
                             year('datetime').alias('year'),
                             month('datetime').alias('month')
                         )

In [30]:
# write songplays table to parquet files partitioned by year and month
songplays_table.write.partitionBy('year', 'month').parquet(output_data+'songplays', 'overwrite')

In [44]:
import boto3

s3 = boto3.resource('s3',
                        region_name='us-west-2',
                        aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'],
                        aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY']
                       )

In [59]:
udacity = s3.Bucket('udacity-dend')

In [72]:

list(udacity.objects.filter(Prefix="song_data/A/A/A/"))

[s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAAK128F9318786.json'),
 s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAAV128F421A322.json'),
 s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAABD128F429CF47.json'),
 s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAACN128F9355673.json'),
 s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAEA128F935A30D.json'),
 s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAED128E0783FAB.json'),
 s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAEM128F93347B9.json'),
 s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAEW128F42930C0.json'),
 s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAFD128F92F423A.json'),
 s3.ObjectSummary(bucket_name='udacity-dend', key='song_data/A/A/A/TRAAAGR128F425B14B.json'),
 s3.ObjectSummary(bucket_name='udacity-dend', key='song_data

In [4]:
spark.stop

AttributeError: 'SparkSession' object has no attribute 'close'