In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql import functions as F
from pyspark.sql import functions as T

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['KEYS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['KEYS']['AWS_SECRET_ACCESS_KEY']


def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark


def process_song_data(spark, input_data, output_data):
    # get filepath to song data file
    song_data = os.path.join(input_data, 'song_data','A','A','B','*.json')

    # read song data file
    df = spark.read.json(song_data)

    # extract columns to create songs table
    songs_table = df.select(df['song_id'], df['title'],df['artist_id'],df['year'],df['duration']).distinct()

    # write songs table to parquet files partitioned by year and artist
    songs_table.write.partitionBy('year', 'artist_id').parquet(os.path.join(output_data, 'songs', 'songs_table.parquet')).mode('overwrite')

    # extract columns to create artists table
    artists_table = df.select(df['artist_id'],df['artist_name'],df['artist_location'],df['artist_latitude'],df['artist_longitude']).distinct()

    # write artists table to parquet files
    artists_table.write.parquet(os.path.join(output_data, 'artists','artists_table.parquet')).mode('overwrite')


def process_log_data(spark, input_data, output_data):
    # get filepath to log data file
    log_data = os.path.join(input_data, 'log_data','2018','11','*events.json')

    # read log data file
    df = spark.read.json(log_data)

    # filter by actions for song plays
    df = df.filter(df.page == 'NextSong').cache()

    # extract columns for users table    
    users_table = df.select(df['userId'],df['firstName'],df['lastName'],df['gender'],df['level']).distinct()

    # write users table to parquet files
    users_table.write.parquet(os.path.join(output_data,'users', 'users_table.parquet')).mode('overwrite')

    # create timestamp column from original timestamp column
    get_timestamp = udf(lambda x: str(int(int(x)/ 1000)))
    df = df.withColumn("timestamp", get_timestamp(df.ts))

    # create datetime column from original timestamp column
    get_datetime = udf(lambda x: str(datetime.fromtimestamp(int(x)/ 1000)))
    df = df.withColumn("datetime", get_datetime(df.ts))

    # extract columns to create time table
    time_table = df.select("datetime").withColumn("start_time", df.datetime). withColumn("hour", hour("datetime")).withColumn("day", dayofmonth("datetime")). withColumn("week", weekofyear("datetime")).withColumn("month", month("datetime")).withColumn("year", year("datetime")). withColumn("weekday", dayofweek("datetime")).distinct()

    # write time table to parquet files partitioned by year and month
    time_table.write.partitionBy("year","month").parquet(os.path.join(output_data,'time', 'time_table.parquet')).mode('overwrite')

    # read in song data to use for songplays table
    song_df = os.path.join(input_data, 'song_data','A','A','B','*.json')

    # extract columns from joined song and log datasets to create songplays table 
    songplays_table = df.join(song_df, col('df.artist') == col('song_df.artist_name'), 'inner').distinct().select(
    col('start_time'),
    col('userId'),
    col('level'),
    col('sessionId'),
    col('location'),
    col('userAgent'),
    col('song_id'),
    col('artist_id')).withColumn('songplay_id', monotonically_increasing_id())

    # write songplays table to parquet files partitioned by year and month
    songplays_table.write.partitionBy('year','month').parquet(os.path.join(output_data,'songplays', 'songplays_table.parquet')).mode('overwrite')


def main():
    spark = create_spark_session()
    input_data = "s3a://udacity-dend"
    output_data = "s3a://udacity-data-lake-proj"

    process_song_data(spark, input_data, output_data)    
    process_log_data(spark, input_data, output_data)


if __name__ == "__main__":
    main()

In [3]:
# get filepath to song data file
song_data = "s3a://udacity-dend/song_data/A/A/*/*.json"

# read song data file
df = spark.read.json(song_data)

'''
# extract columns to create songs table
songs_table = df.select(df['song_id'], df['title'],df['artist_id'],df['year'],df['duration']).show() 

# write songs table to parquet files partitioned by year and artist
songs_table.write.parquet("s3a://udacity-dend")

# extract columns to create artists table
artists_table = df.select(df['artist_id'],df['artist_name'],df['artist_location'],df['artist_latitude'],df['artist_longitude']).show()

# write artists table to parquet files
artists_table
'''

'\n# extract columns to create songs table\nsongs_table = df.select(df[\'song_id\'], df[\'title\'],df[\'artist_id\'],df[\'year\'],df[\'duration\']).show() \n\n# write songs table to parquet files partitioned by year and artist\nsongs_table.write.parquet("s3a://udacity-dend")\n\n# extract columns to create artists table\nartists_table = df.select(df[\'artist_id\'],df[\'artist_name\'],df[\'artist_location\'],df[\'artist_latitude\'],df[\'artist_longitude\']).show()\n\n# write artists table to parquet files\nartists_table\n'

In [4]:
log_data = "s3a://udacity-dend/log_data/2018/11/2018-11-12-events.json"
df = spark.read.json(log_data)
df.printSchema()

df = df.filter(df.page == 'NextSong').cache()

# extract columns for users table    
users_table = df.select(df['userId'],df['firstName'],df['lastName'],df['gender'],df['level'])
users_table

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



DataFrame[userId: string, firstName: string, lastName: string, gender: string, level: string]

In [5]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [6]:
df = df.withColumn("ts_seconds", (col("ts") / 1000).cast("timestamp"))


In [7]:
os.path.join('hello','byebye')

'hello/byebye'

In [8]:
songs_table.write.parquet('s3a://udacity-dend/songs/songs.parquet')

Py4JJavaError: An error occurred while calling o142.parquet.
: com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 403, AWS Service: Amazon S3, AWS Request ID: NB1MBKQ51S8MT769, AWS Error Code: AccessDenied, AWS Error Message: Access Denied, S3 Extended Request ID: idelk4GtJoCCiwG8XZaUsEm+4xOWNTAvEZ60xlwT86UQTN4gxMp4Bod/beLz5Vr+xv++Xr45N/s=
	at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
	at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
	at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1393)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.createEmptyObject(S3AFileSystem.java:1191)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.createFakeDirectory(S3AFileSystem.java:1168)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.mkdirs(S3AFileSystem.java:871)
	at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1881)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:313)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:162)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:139)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:566)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [None]:
test = "test"
test2 = " test2"
test3 = test+test2
test3