In [1]:
import os
import configparser
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.types import StructType as StructType, DoubleType as DoubleType, StructField as StructField
from pyspark.sql.types import StringType as StringType, IntegerType as IntegerType, TimestampType

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['KEYS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['KEYS']['AWS_SECRET_ACCESS_KEY']

In [3]:
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [4]:
input_data = "s3a://udacity-dend/"
song_data = input_data + 'song_data/A/B/C/*.json'
    
songSchema = StructType([
    StructField("artist_id",StringType()),
    StructField("artist_latitude",DoubleType()),
    StructField("artist_location",StringType()),
    StructField("artist_longitude",DoubleType()),
    StructField("artist_name",StringType()),
    StructField("duration",DoubleType()),
    StructField("num_songs",IntegerType()),
    StructField("title",StringType()),
    StructField("year",IntegerType()),
])
    
    # read song data file
df = spark.read.json(song_data, schema=songSchema)

In [10]:
output_data = "s3a://sparkify-mybucket/"
song_fields = ["title", "artist_id","year", "duration"]
songs_table = df.select(song_fields).dropDuplicates() \
.withColumn("song_id", monotonically_increasing_id())
    # Write songs table to parquet files partitioned by year and artist
songs_table.write.mode("overwrite").partitionBy('year', 'artist_id')\
.parquet(output_data + 'songs/')

KeyboardInterrupt: 