In [None]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

In [None]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [None]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark
print('Creating spark session on AWS')
spark = create_spark_session()


input_data = "s3a://udacity-dend/"
song_input_data = "data/songdata/song_data/A/A/A/*.json"
log_input_data = "data/logdata/*.json"
output_data = "data/outputdata/"

In [None]:
print('Read song data from json file')
song_data = spark.read.json(song_input_data)
    
# read song data file
print('Print song data schema')
df = song_data
print(df.count())
df.printSchema()

In [None]:
print('Extract columns to create song table')
artist_id = "artist_id"
artist_latitude = "artist_latitude"
artist_location = "artist_location"
artist_longitude = "artist_longitude"
artist_name = "artist_name"
duration = "duration"
num_songs = "num_songs"
song_id = "song_id"
title = "title"
year = "year"
    
#print('Songs table: ')
songs_table = df.select("song_id", "title", "artist_id", "year", "duration")
print(songs_table.limit(5).toPandas())

In [None]:
df_songs_table = songs_table.toPandas()
year_list = list(set(df_songs_table['year'].tolist()))
type(year_list[1])

In [None]:
artist_id_list = list(set(df_songs_table['artist_id'].tolist()))
type(artist_id_list[1])

In [None]:
df_songs_table.loc[(df_songs_table['year']==int(1982)) & (df_songs_table['artist_id']==str('AR7G5I41187FB4CE6C'))]

In [None]:
!pip install --upgrade setuptools
!pip install --upgrade pip

In [None]:
!pip install python-snappy
!pip install pyarrow

In [None]:
# write songs table to parquet files partitioned by year and artist
print('Writing to parquet')
songs_table.write.parquet("{}songs_table2.parquet".format(output_data))
for year in year_list:
    for artist_id in artist_id_list:
        df_to_parquet = df_songs_table.loc[(df_songs_table['year']==int(year)) & (df_songs_table['artist_id']==str(artist_id))]
        df_to_parquet.to_parquet("{}/songs_table/{}/{}/songs_table.parquet".format(output_data,year,artist_id))

In [None]:
print('Artist table: ')
artists_table = df.select("artist_id", "artist_name", "artist_location", "artist_latitude", "artist_longitude")
print(artists_table.limit(5).toPandas())

In [None]:
def main():
    spark = create_spark_session()
    input_data = "s3a://udacity-dend/"
    output_data = ""
    
    process_song_data(spark, input_data, output_data)    
    process_log_data(spark, input_data, output_data)


if __name__ == "__main__":
    main()